sondehub-infra/lambda/reverse_predict/__init__.py

145 lines
4.4 KiB
Python

import json
from datetime import datetime, timedelta, timezone
import logging
import gzip
from io import BytesIO
import base64
import es
def predict(event, context):
path = "reverse-prediction-*/_search"
durations = { # ideally we shouldn't need to predefine these, but it's a shit load of data and we don't need want to overload ES
"3d": (259200, 1200), # 3d, 20m
"1d": (86400, 600), # 1d, 10m
"12h": (43200, 600), # 1d, 10m
"6h": (21600, 120), # 6h, 1m
"3h": (10800, 60), # 3h, 10s
"1h": (3600, 40),
"30m": (1800, 20),
"1m": (60, 1),
"15s": (15, 1),
"0": (0, 1) # for getting a single time point
}
duration_query = "6h"
if (
"queryStringParameters" in event
and "duration" in event["queryStringParameters"]
):
if event["queryStringParameters"]["duration"] in durations:
duration_query = event["queryStringParameters"]["duration"]
else:
return f"Duration must be either {', '.join(durations.keys())}"
if (
"queryStringParameters" in event
and "datetime" in event["queryStringParameters"]
):
requested_time = datetime.fromisoformat(
event["queryStringParameters"]["datetime"].replace("Z", "+00:00")
)
else:
requested_time = datetime.now(timezone.utc)
(duration, interval) = durations[duration_query]
lt = requested_time + timedelta(0, 1)
gte = requested_time - timedelta(0, duration)
payload = {
"aggs": {
"2": {
"terms": {
"field": "serial.keyword",
"order": {
"_key": "desc"
},
"size": 1000
},
"aggs": {
"1": {
"top_hits": {
"_source": True,
"size": 1,
"sort": [
{
"datetime": {
"order": "desc"
}
}
]
}
}
}
}
},
"size": 0,
"stored_fields": [
"*"
],
"script_fields": {},
"docvalue_fields": [
{
"field": "datetime",
"format": "date_time"
}
],
"_source": {
"excludes": []
},
"query": {
"bool": {
"must": [],
"filter": [
{
"range": {
"datetime": {"gte": gte.isoformat(), "lt": lt.isoformat()}
}
}
],
"should": [
],
"must_not": []
}
}
}
if "queryStringParameters" in event:
if "vehicles" in event["queryStringParameters"] and event["queryStringParameters"]["vehicles"] != "RS_*;*chase" and event["queryStringParameters"]["vehicles"] != "":
for serial in event["queryStringParameters"]["vehicles"].split(","):
payload["query"]["bool"]["should"].append(
{
"term": {
"serial.keyword": serial
}
}
)
# for single sonde allow longer predictions
payload['query']['bool']['filter'].pop(0)
logging.debug("Start ES Request")
results = es.request(json.dumps(payload), path, "GET")
logging.debug("Finished ES Request")
output = {x['1']['hits']['hits'][0]['_source']['serial']: x['1']['hits']['hits'][0]['_source'] for x in results['aggregations']['2']['buckets']}
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
json_response = json.dumps(output)
f.write(json_response.encode('utf-8'))
gzippedResponse = compressed.getvalue()
return {
"body": base64.b64encode(gzippedResponse).decode(),
"isBase64Encoded": True,
"statusCode": 200,
"headers": {
"Content-Encoding": "gzip",
"content-type": "application/json"
}
}