sondehub-infra/lambda/predict/__init__.py

124 lines
3.8 KiB
Python
Raw Permalink Normal View History

2021-02-22 06:13:30 +00:00
import json
2021-04-04 00:47:31 +00:00
import logging
import gzip
from io import BytesIO
2021-09-17 13:50:36 +00:00
import base64
2021-12-20 06:02:02 +00:00
import es
2021-02-22 06:13:30 +00:00
2021-04-04 00:47:31 +00:00
2021-02-22 06:13:30 +00:00
def predict(event, context):
2021-09-13 05:41:23 +00:00
path = "predictions-*/_search"
2021-02-22 06:13:30 +00:00
payload = {
2021-09-13 05:41:23 +00:00
"aggs": {
"2": {
"terms": {
"field": "serial.keyword",
"order": {
2021-02-22 06:13:30 +00:00
"_key": "desc"
},
2021-09-13 05:41:23 +00:00
"size": 1000
},
"aggs": {
"1": {
"top_hits": {
"_source": True,
"size": 1,
"sort": [
2021-02-22 06:13:30 +00:00
{
"datetime": {
2021-09-13 05:41:23 +00:00
"order": "desc"
2021-02-22 06:13:30 +00:00
}
}
2021-09-13 05:41:23 +00:00
]
2021-02-22 06:13:30 +00:00
}
}
2021-09-13 05:41:23 +00:00
}
}
},
"size": 0,
"stored_fields": [
"*"
],
"script_fields": {},
"docvalue_fields": [
{
"field": "datetime",
"format": "date_time"
}
],
"_source": {
"excludes": []
},
"query": {
"bool": {
"must": [],
"filter": [
2021-02-22 06:13:30 +00:00
{
"range": {
"datetime": {
2021-09-17 13:50:36 +00:00
"gte": "now-6h",
2021-09-13 05:41:23 +00:00
"lte": "now",
"format": "strict_date_optional_time"
2021-02-22 06:13:30 +00:00
}
}
}
2021-09-13 05:41:23 +00:00
],
"should": [
],
"must_not": []
2021-02-22 06:13:30 +00:00
}
2021-09-13 05:41:23 +00:00
}
}
2021-02-22 06:13:30 +00:00
if "queryStringParameters" in event:
2021-09-13 05:41:23 +00:00
if "vehicles" in event["queryStringParameters"] and event["queryStringParameters"]["vehicles"] != "RS_*;*chase" and event["queryStringParameters"]["vehicles"] != "":
for serial in event["queryStringParameters"]["vehicles"].split(","):
payload["query"]["bool"]["should"].append(
{
"term": {
2021-09-13 05:41:23 +00:00
"serial.keyword": serial
}
2021-02-22 06:13:30 +00:00
}
2021-09-13 05:41:23 +00:00
)
# for single sonde allow longer predictions
payload['query']['bool']['filter'].pop(0)
2021-04-04 00:47:31 +00:00
logging.debug("Start ES Request")
2021-12-20 06:02:02 +00:00
results = es.request(json.dumps(payload), path, "GET")
2021-04-04 00:47:31 +00:00
logging.debug("Finished ES Request")
2021-02-22 06:13:30 +00:00
output = []
2021-09-13 05:41:23 +00:00
for sonde in results['aggregations']['2']['buckets']:
data = sonde['1']['hits']['hits'][0]['_source']
output.append({
"vehicle": data["serial"],
"time": data['datetime'],
2021-09-14 00:56:26 +00:00
"latitude": data['position'][1],
"longitude": data['position'][0],
2021-09-13 05:41:23 +00:00
"altitude": data['altitude'],
"ascent_rate": data['ascent_rate'],
"descent_rate": data['descent_rate'],
"burst_altitude": data['burst_altitude'],
"descending": 1 if data['descending'] == True else 0,
"landed": 1 if data['landed'] == True else 0,
"data": json.dumps(data['data'])
})
2021-02-22 06:13:30 +00:00
2021-09-17 13:50:36 +00:00
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
json_response = json.dumps(output)
f.write(json_response.encode('utf-8'))
gzippedResponse = compressed.getvalue()
return {
"body": base64.b64encode(gzippedResponse).decode(),
"isBase64Encoded": True,
"statusCode": 200,
"headers": {
"Content-Encoding": "gzip",
"content-type": "application/json"
}
}