From 8d2fbcb1647ce390da2aa5b9ee8fa4493e310e1b Mon Sep 17 00:00:00 2001 From: Michaela Date: Sun, 1 Aug 2021 23:47:51 +1000 Subject: [PATCH] started on historic work. get_telem tweaks and predict to our own hosted --- historic.tf | 0 historic/historic_es_to_s3/index.py | 137 ++++++++++++++++++++++++++++ historic/queue_data_update/index.py | 87 ++++++++++++++++++ predict/lambda_function.py | 2 +- query/lambda_function.py | 30 +++--- 5 files changed, 239 insertions(+), 17 deletions(-) create mode 100644 historic.tf create mode 100644 historic/historic_es_to_s3/index.py create mode 100644 historic/queue_data_update/index.py diff --git a/historic.tf b/historic.tf new file mode 100644 index 0000000..e69de29 diff --git a/historic/historic_es_to_s3/index.py b/historic/historic_es_to_s3/index.py new file mode 100644 index 0000000..f9948c5 --- /dev/null +++ b/historic/historic_es_to_s3/index.py @@ -0,0 +1,137 @@ +import json +from botocore.awsrequest import AWSRequest +from botocore.endpoint import URLLib3Session +from botocore.auth import SigV4Auth +import boto3 +import botocore.credentials +import os +import gzip +from botocore.exceptions import ClientError + +HOST = os.getenv("ES") +BUCKET = "sondehub-history" + +s3 = boto3.resource('s3') + +def es_request(payload, path, method, params=None): + # get aws creds + session = boto3.Session() + + headers = {"Host": HOST, "Content-Type": "application/json"} + request = AWSRequest( + method="POST", url=f"https://{HOST}/{path}", data=payload, headers=headers, params=params + ) + SigV4Auth(boto3.Session().get_credentials(), + "es", "us-east-1").add_auth(request) + + session = URLLib3Session() + r = session.send(request.prepare()) + return json.loads(r.text) + + +def fetch_es(serial): + payload = { + "size": 10000, + "sort": [ + { + "datetime": { + "order": "desc", + "unmapped_type": "boolean" + } + } + ], + + "query": { + "bool": { + "filter": [ + { + "match_phrase": { + "serial.keyword": serial + } + } + ] + } + } + } + data = [] + response = es_request(json.dumps(payload), + "telm-*/_search", "POST", params={"scroll": "1m"}) + data += [x["_source"] for x in response['hits']['hits']] + scroll_id = response['_scroll_id'] + while response['hits']['hits']: + response = es_request(json.dumps({"scroll": "1m", "scroll_id": scroll_id }), + "_search/scroll", "POST") + scroll_id = response['_scroll_id'] + data += [x["_source"] for x in response['hits']['hits']] + return data + +def fetch_s3(serial): + try: + object = s3.Object(BUCKET,f'serial/{serial}.json.gz') + with gzip.GzipFile(fileobj=object.get()["Body"]) as gzipfile: + return json.loads(gzipfile.read().decode("utf-8")) + except ClientError as ex: + if ex.response['Error']['Code'] == 'NoSuchKey': + return [] + else: + raise + +def write_s3(serial, data): + #get max alt + max_alt = sorted(data, key=lambda k: int(k['alt']))[-1] + summary = [ + data[0], + max_alt, + data[-1] + ] + dates = set([x['datetime'].split("T")[0].replace("-","/") for x in data]) + for date in dates: + object = s3.Object(BUCKET,f'date/{date}/{serial}.json') + object.put( + Body=json.dumps(summary).encode("utf-8"), + Metadata={ + "first-lat": str(summary[0]['lat']), + "first-lon": str(summary[0]['lon']), + "first-alt": str(summary[0]['alt']), + "max-lat": str(summary[1]['lat']), + "max-lon": str(summary[1]['lon']), + "max-alt": str(summary[1]['alt']), + "last-lat": str(summary[2]['lat']), + "last-lon": str(summary[2]['lon']), + "last-alt": str(summary[2]['alt']) + } + ) + gz_data = gzip.compress(json.dumps(data).encode('utf-8')) + object = s3.Object(BUCKET,f'serial/{serial}.json.gz') + object.put( + Body=gz_data, + ContentType='application/x-gzip', + Metadata={ + "first-lat": str(summary[0]['lat']), + "first-lon": str(summary[0]['lon']), + "first-alt": str(summary[0]['alt']), + "max-lat": str(summary[1]['lat']), + "max-lon": str(summary[1]['lon']), + "max-alt": str(summary[1]['alt']), + "last-lat": str(summary[2]['lat']), + "last-lon": str(summary[2]['lon']), + "last-alt": str(summary[2]['alt']) + } + ) + +def lambda_handler(event, context): + payloads = {} + for record in event['Records']: + sns_message = json.loads(record["body"]) + serial = sns_message["Message"] + s3_data = fetch_s3(serial) + es = fetch_es(serial) + data = s3_data + es + data = [dict(t) for t in {tuple(d.items()) for d in data}] + data = sorted(data, key=lambda k: k['datetime']) # sort by datetime + write_s3(serial, data) + + +if __name__ == "__main__": + print(lambda_handler( + {'Records': [{"body": "{\"Message\":\"S4520727\"}"}]}, {})) diff --git a/historic/queue_data_update/index.py b/historic/queue_data_update/index.py new file mode 100644 index 0000000..159bdd7 --- /dev/null +++ b/historic/queue_data_update/index.py @@ -0,0 +1,87 @@ +import json +import boto3 +import botocore.credentials +from botocore.awsrequest import AWSRequest +from botocore.endpoint import URLLib3Session +from botocore.auth import SigV4Auth + +import zlib +import base64 +import datetime +import os + +HOST = os.getenv("ES") + +sqs = boto3.client('sqs', region_name="us-east-1") + +def batch(iterable, n=1): + l = len(iterable) + for ndx in range(0, l, n): + yield iterable[ndx:min(ndx + n, l)] + +def es_request(payload, path, method): + session = boto3.Session() + + params = json.dumps(payload) + headers = {"Host": HOST, "Content-Type": "application/json"} + request = AWSRequest( + method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers + ) + SigV4Auth(boto3.Session().get_credentials(), + "es", "us-east-1").add_auth(request) + + session = URLLib3Session() + r = session.send(request.prepare()) + return json.loads(r.text) + + +def handler(event, context): + query = { + "aggs": { + "serials": { + "terms": { + "field": "serial.keyword", + "size": 5000 + } + } + }, + "size": 0, + "_source": { + "excludes": [] + }, + "query": { + "bool": { + "must_not": [{"match_phrase": {"serial": "xxxxxxxx"}}], + "filter": [ + { + "range": { + "datetime": { + "gte": "2021-07-23T06:45:17.308Z", + "lte": "2021-07-25T06:45:17.308Z", + "format": "strict_date_optional_time" + } + } + } + ] + } + } + } + results = es_request(query, "telm-*/_search", "POST") + serials = [ x['key'] for x in results['aggregations']['serials']['buckets'] ] + for serial_batch in batch(serials, 10): + sqs.send_message_batch( + QueueUrl="https://sqs.us-east-1.amazonaws.com/143841941773/update-history", + Entries=[ + { + "Id": x, + "MessageBody": x + } + for x in serial_batch] + ) + return [ x['key'] for x in results['aggregations']['serials']['buckets'] ] + #TODO add to SQS queue + +if __name__ == "__main__": + print(handler({}, {})) + +# this script will find list of sondes seen in the last 48 hours and add them to the queue to be updated (including the first and last date they were seen) diff --git a/predict/lambda_function.py b/predict/lambda_function.py index bb8d819..d441afd 100644 --- a/predict/lambda_function.py +++ b/predict/lambda_function.py @@ -253,7 +253,7 @@ def predict(event, context): except: pass - conn = http.client.HTTPSConnection("predict.cusf.co.uk") + conn = http.client.HTTPSConnection("tawhiri.v2.sondehub.org") serial_data={} logging.debug("Start Predict") for serial in serials: diff --git a/query/lambda_function.py b/query/lambda_function.py index 2d33e93..afaaaeb 100644 --- a/query/lambda_function.py +++ b/query/lambda_function.py @@ -94,9 +94,9 @@ def get_telem(event, context): "1d": (86400, 600), # 1d, 10m "12h": (43200, 600), # 1d, 10m "6h": (21600, 60), # 6h, 1m - "3h": (10800, 15), # 3h, 10s - "1h": (3600, 15), - "30m": (3600, 5), + "3h": (10800, 30), # 3h, 10s + "1h": (3600, 20), + "30m": (1800, 10), "1m": (60, 1), "15s": (15, 1), "0": (0, 1) # for getting a single time point @@ -127,9 +127,10 @@ def get_telem(event, context): lt = requested_time + timedelta(0, 1) gte = requested_time - timedelta(0, duration) - path = "telm-*/_search" + path = f"telm-{lt.year:2}-{lt.month:02},telm-{gte.year:2}-{gte.month:02}/_search" payload = { "timeout": "30s", + "size": 0, "aggs": { "2": { "terms": { @@ -153,13 +154,10 @@ def get_telem(event, context): # {"field": "datetime"}, # ], # "_source": "position", - "size": 5, + "size": 10 if (duration == 0 ) else 1, "sort": [ {"datetime": {"order": "desc"}}, - {"pressure": {"order": "desc","mode" : "median"}}, - {"humidity": {"order": "desc","mode" : "median"}}, - {"temp": {"order": "desc","mode" : "median"}}, - {"alt": {"order": "desc","mode" : "median"}} + {"pressure": {"order": "desc","mode" : "median"}} ], } } @@ -228,10 +226,10 @@ def get_listener_telemetry(event, context): "3d": (259200, 1200), # 3d, 20m "1d": (86400, 600), # 1d, 10m "12h": (43200, 600), # 1d, 10m - "6h": (21600, 60), # 6h, 1m - "3h": (10800, 15), # 3h, 10s - "1h": (3600, 15), - "30m": (3600, 5), + "6h": (21600, 120), # 6h, 1m + "3h": (10800, 90), # 3h, 10s + "1h": (3600, 30), + "30m": (3600, 15), "1m": (60, 1), "15s": (15, 1) } @@ -827,9 +825,9 @@ if __name__ == "__main__": get_telem( { "queryStringParameters":{ - "serial": "S3210639", - "duration": "0", - "datetime": "2021-07-26T06:49:29.001000Z" + # "serial": "S3210639", + "duration": "3h", + # "datetime": "2021-07-26T06:49:29.001000Z" } }, {} )