sondehub-infra/query/lambda_function.py

195 lines
6.5 KiB
Python
Raw Normal View History

2021-02-02 07:14:38 +00:00
import boto3
import botocore.credentials
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import json
import os
2021-02-02 10:44:39 +00:00
HOST = os.getenv("ES")
2021-02-02 07:14:38 +00:00
# get current sondes, filter by date, location
2021-02-02 10:44:39 +00:00
2021-02-02 07:14:38 +00:00
def get_sondes(event, context):
path = "telm-*/_search"
payload = {
"aggs": {
"2": {
2021-02-02 10:44:39 +00:00
"terms": {
"field": "serial.keyword",
"order": {"_key": "desc"},
"size": 10000,
2021-02-02 07:14:38 +00:00
},
2021-02-02 10:44:39 +00:00
"aggs": {
"1": {
"top_hits": {
"size": 1,
"sort": [{"datetime": {"order": "desc"}}],
2021-02-02 07:14:38 +00:00
}
}
2021-02-02 10:44:39 +00:00
},
2021-02-02 07:14:38 +00:00
}
},
2021-02-02 10:44:39 +00:00
"query": {"bool": {"filter": [{"match_all": {}}]}},
2021-02-02 07:14:38 +00:00
}
# add filters
if "queryStringParameters" in event:
if "last" in event["queryStringParameters"]:
payload["query"]["bool"]["filter"].append(
{
"range": {
"datetime": {
"gte": f"now-{int(event['queryStringParameters']['last'])}s",
2021-02-02 10:44:39 +00:00
"lte": "now",
2021-02-02 07:14:38 +00:00
}
}
}
)
2021-02-02 10:44:39 +00:00
if (
"lat" in event["queryStringParameters"]
and "lon" in event["queryStringParameters"]
and "distance" in event["queryStringParameters"]
):
2021-02-02 07:14:38 +00:00
payload["query"]["bool"]["filter"].append(
{
"geo_distance": {
"distance": f"{int(event['queryStringParameters']['distance'])}m",
"position": {
2021-02-02 10:44:39 +00:00
"lat": float(event["queryStringParameters"]["lat"]),
"lon": float(event["queryStringParameters"]["lon"]),
},
2021-02-02 07:14:38 +00:00
}
}
)
# if the user doesn't specify a range we should add one - 24 hours is probably a good start
if "range" not in payload["query"]["bool"]["filter"]:
payload["query"]["bool"]["filter"].append(
2021-02-02 10:44:39 +00:00
{"range": {"datetime": {"gte": "now-1d", "lte": "now"}}}
)
results = es_request(payload, path, "POST")
buckets = results["aggregations"]["2"]["buckets"]
sondes = {
bucket["1"]["hits"]["hits"][0]["_source"]["serial"]: bucket["1"]["hits"][
"hits"
][0]["_source"]
for bucket in buckets
}
return json.dumps(sondes)
def get_telem(event, context):
durations = { # ideally we shouldn't need to predefine these, but it's a shit load of data and we don't need want to overload ES
"3d": (259200, 1200), # 3d, 20m
"1d": (86400, 600), # 1d, 10m
"6h": (21600, 60), # 6h, 1m
"3h": (10800, 1), # 3h, 1s
}
duration_query = "3h"
if (
"queryStringParameters" in event
and "duration" in event["queryStringParameters"]
):
if event["queryStringParameters"]["duration"] in durations:
duration_query = event["queryStringParameters"]["duration"]
else:
return f"Duration must be either {', '.join(durations.keys())}"
(duration, interval) = durations[duration_query]
path = "telm-*/_search"
payload = {
"aggs": {
"2": {
"terms": {
"field": "serial.keyword",
"order": {"_key": "desc"},
"size": 10000,
},
"aggs": {
"3": {
"date_histogram": {
"field": "datetime",
"fixed_interval": f"{str(interval)}s",
"time_zone": "Australia/Brisbane",
"min_doc_count": 1,
},
"aggs": {
"1": {
"top_hits": {
"docvalue_fields": [
{"field": "position"},
{"field": "alt"},
{"field": "datetime"},
],
"_source": "position",
"size": 1,
"sort": [{"datetime": {"order": "desc"}}],
}
}
},
}
},
}
},
"query": {
"bool": {
"filter": [
{"match_all": {}},
{
"range": {
"datetime": {"gte": f"now-{str(duration)}s", "lt": "now"}
2021-02-02 07:14:38 +00:00
}
}
2021-02-02 10:44:39 +00:00
]
}
},
}
if "queryStringParameters" in event:
if "serial" in event["queryStringParameters"]:
print("test")
payload["query"]["bool"]["filter"].append(
{
"match_phrase": {
"serial": str(event["queryStringParameters"]["serial"])
}
2021-02-02 07:14:38 +00:00
}
)
2021-02-02 10:44:39 +00:00
print(payload)
2021-02-02 07:14:38 +00:00
results = es_request(payload, path, "POST")
2021-02-02 10:44:39 +00:00
output = {
sonde["key"]: {
data["key_as_string"]: {
field: data["1"]["hits"]["hits"][0]["fields"][field][0]
for field in data["1"]["hits"]["hits"][0]["fields"]
}
for data in sonde["3"]["buckets"]
}
for sonde in results["aggregations"]["2"]["buckets"]
}
return json.dumps(output)
2021-02-02 07:14:38 +00:00
def es_request(payload, path, method):
2021-02-02 10:44:39 +00:00
# get aws creds
2021-02-02 07:14:38 +00:00
session = boto3.Session()
params = json.dumps(payload)
2021-02-02 10:44:39 +00:00
headers = {"Host": HOST, "Content-Type": "application/json"}
request = AWSRequest(
method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
2021-02-02 07:14:38 +00:00
session = URLLib3Session()
r = session.send(request.prepare())
return json.loads(r.text)
if __name__ == "__main__":
2021-02-02 10:44:39 +00:00
# print(get_sondes({"queryStringParameters":{"lat":"-28.22717","lon":"153.82996","distance":"50000"}}, {}))
print(get_telem({"queryStringParameters":{"serial": "R4450388", "duration": "3h"}}, {}))