import boto3 import botocore.credentials from botocore.awsrequest import AWSRequest from botocore.endpoint import URLLib3Session from botocore.auth import SigV4Auth import json import os from datetime import datetime, timedelta, timezone import sys, traceback HOST = os.getenv("ES") # get current sondes, filter by date, location def get_sondes(event, context): path = "telm-*/_search" payload = { "aggs": { "2": { "terms": { "field": "serial.keyword", "order": {"_key": "desc"}, "size": 10000, }, "aggs": { "1": { "top_hits": { "size": 1, "sort": [{"datetime": {"order": "desc"}}], } } }, } }, "query": {"bool": {"filter": [{"match_all": {}}]}}, } # add filters if "queryStringParameters" in event: if "last" in event["queryStringParameters"]: payload["query"]["bool"]["filter"].append( { "range": { "datetime": { "gte": f"now-{int(event['queryStringParameters']['last'])}s", "lte": "now", } } } ) if ( "lat" in event["queryStringParameters"] and "lon" in event["queryStringParameters"] and "distance" in event["queryStringParameters"] ): payload["query"]["bool"]["filter"].append( { "geo_distance": { "distance": f"{int(event['queryStringParameters']['distance'])}m", "position": { "lat": float(event["queryStringParameters"]["lat"]), "lon": float(event["queryStringParameters"]["lon"]), }, } } ) # if the user doesn't specify a range we should add one - 24 hours is probably a good start if "range" not in payload["query"]["bool"]["filter"]: payload["query"]["bool"]["filter"].append( {"range": {"datetime": {"gte": "now-1d", "lte": "now"}}} ) results = es_request(payload, path, "POST") buckets = results["aggregations"]["2"]["buckets"] sondes = { bucket["1"]["hits"]["hits"][0]["_source"]["serial"]: bucket["1"]["hits"][ "hits" ][0]["_source"] for bucket in buckets } return json.dumps(sondes) def get_telem(event, context): durations = { # ideally we shouldn't need to predefine these, but it's a shit load of data and we don't need want to overload ES "3d": (259200, 1200), # 3d, 20m "1d": (86400, 600), # 1d, 10m "6h": (21600, 60), # 6h, 1m "3h": (10800, 10), # 3h, 10s } duration_query = "3h" requested_time = datetime.now() if ( "queryStringParameters" in event and "duration" in event["queryStringParameters"] ): if event["queryStringParameters"]["duration"] in durations: duration_query = event["queryStringParameters"]["duration"] else: return f"Duration must be either {', '.join(durations.keys())}" if ( "queryStringParameters" in event and "datetime" in event["queryStringParameters"] ): requested_time = datetime.fromisoformat(event["queryStringParameters"]["datetime"].replace("Z", "+00:00")) (duration, interval) = durations[duration_query] lt = requested_time gte = requested_time - timedelta(0,duration) path = "telm-*/_search" payload = { "aggs": { "2": { "terms": { "field": "serial.keyword", "order": {"_key": "desc"}, "size": 10000, }, "aggs": { "3": { "date_histogram": { "field": "datetime", "fixed_interval": f"{str(interval)}s", "time_zone": "Australia/Brisbane", "min_doc_count": 1, }, "aggs": { "1": { "top_hits": { # "docvalue_fields": [ # {"field": "position"}, # {"field": "alt"}, # {"field": "datetime"}, # ], # "_source": "position", "size": 1, "sort": [{"datetime": {"order": "desc"}}], } } }, } }, } }, "query": { "bool": { "filter": [ {"match_all": {}}, { "range": { "datetime": { "gte": gte.isoformat(), "lt": lt.isoformat() } } }, ] } }, } if "queryStringParameters" in event: if "serial" in event["queryStringParameters"]: payload["query"]["bool"]["filter"].append( { "match_phrase": { "serial": str(event["queryStringParameters"]["serial"]) } } ) results = es_request(payload, path, "POST") output = { sonde["key"]: { data["key_as_string"]: data["1"]["hits"]["hits"][0]["_source"] for data in sonde["3"]["buckets"] } for sonde in results["aggregations"]["2"]["buckets"] } return json.dumps(output) def datanew(event, context): durations = { # ideally we shouldn't need to predefine these, but it's a shit load of data and we don't need want to overload ES "3days": (259200, 1200), # 3d, 20m "1day": (86400, 600), # 1d, 10m "12hours": (43200, 120), # 12h, 2m "6hours": (21600, 60), # 6h, 1m "3hours": (10800, 10), # 3h, 10s "1hour": (3600, 1), # 1h, 1s } duration_query = "1hour" requested_time = datetime.now() if event["queryStringParameters"]['type'] != 'positions': raise ValueError max_positions = int(event["queryStringParameters"]["max_positions"]) if "max_positions" in event["queryStringParameters"] else 10000 if event["queryStringParameters"]["mode"] in durations: duration_query = event["queryStringParameters"]["mode"] else: return f"Duration must be either {', '.join(durations.keys())}" (duration, interval) = durations[duration_query] if "vehicles" in event["queryStringParameters"] and (event["queryStringParameters"]['vehicles'] != "RS_*;*chase" or event["queryStringParameters"]['vehicles'] != ""): interval = 5 if event["queryStringParameters"]["position_id"] != "0": requested_time = datetime.fromisoformat(event["queryStringParameters"]["position_id"].replace("Z", "+00:00")) lt = datetime.now() gte = requested_time else: lt = datetime.now() gte = datetime.now() - timedelta(0,duration) path = "telm-*/_search" payload = { "aggs": { "2": { "terms": { "field": "serial.keyword", "order": {"_key": "desc"}, "size": 10000, }, "aggs": { "3": { "date_histogram": { "field": "datetime", "fixed_interval": f"{str(interval)}s", "time_zone": "Australia/Brisbane", "min_doc_count": 1, }, "aggs": { "1": { "top_hits": { "size": 1, "sort": [{"datetime": {"order": "desc"}}], } } }, } }, } }, "query": { "bool": { "filter": [ {"match_all": {}}, { "range": { "datetime": { "gte": gte.isoformat(), "lt": lt.isoformat() } } }, ] } }, } if "vehicles" in event["queryStringParameters"] and event["queryStringParameters"]['vehicles'] != "RS_*;*chase" and event["queryStringParameters"]['vehicles'] != "": payload["query"]["bool"]["filter"].append( { "match_phrase": { "serial": str(event["queryStringParameters"]["vehicles"]) } } ) results = es_request(payload, path, "POST") output = { "positions": { "position": [] } } for sonde in results["aggregations"]["2"]["buckets"]: for frame in sonde["3"]["buckets"]: try: frame_data = frame['1']['hits']['hits'][0]['_source'] frequency = f'{frame_data["frequency"]} MHz' if "frequency" in frame_data else "" pressure = f'{frame_data["pressure"]}hPa' if "pressure" in frame_data else "" bt = f'BT {frame_data["burst_timer"]}' if "burst_timer" in frame_data else "" batt = f'{frame_data["batt"]}V' if "batt" in frame_data else "" subtype = frame_data["subtype"] if "subtype" in frame_data else "" output["positions"]["position"].append({ "position_id": f'{frame_data["serial"]}-{frame_data["datetime"]}', "mission_id": "0", "vehicle": frame_data["serial"], "server_time": frame_data["datetime"], "gps_time": frame_data["datetime"], "gps_lat": frame_data["lat"], "gps_lon": frame_data["lon"], "gps_alt": frame_data["alt"], "gps_heading": "", "gps_speed": frame_data["vel_h"], "picture": "", "temp_inside": "", "data": { "comment": f"{subtype} {frame_data['serial']} {frequency} {pressure} {bt} {batt}", "temperature_external": "-34.2", "humidity": "17.9" }, "callsign": frame_data["uploader_callsign"], "sequence": "0" }) except: pass output["positions"]["position"] = sorted(output["positions"]["position"], key=lambda k: k['position_id']) return json.dumps(output) def get_listeners(event, context): path = "telm-*/_search" payload = { "aggs": { "2": { "terms": { "field": "uploader_callsign.keyword", "order": { "_key": "desc" }, "size": 500 }, "aggs": { "1": { "top_hits": { "_source": False, "size": 1, "docvalue_fields": ["uploader_position", "uploader_alt", "uploader_antenna.keyword","software_name.keyword", "software_version.keyword", "datetime" ], "sort": [ { "datetime": { "order": "desc" } } ] } } } } }, "size": 0, "query": { "bool": { "must": [], "filter": [ { "match_all": {} }, { "exists": { "field": "uploader_position" }, }, { "exists": { "field": "uploader_alt" }, }, { "exists": { "field": "uploader_antenna.keyword" }, }, { "exists": { "field": "software_name.keyword" }, }, { "exists": { "field": "software_version.keyword" }, }, { "exists": { "field": "datetime" }, }, { "range": { "datetime": { "gte": "now-7d", "lte": "now", "format": "strict_date_optional_time" } } } ], "should": [], "must_not": [ { "match_phrase": { "type": "SondehubV1" } } ] } } } results = es_request(payload, path, "GET") output = [ { "name": listener['key'], "tdiff_hours": (datetime.now(timezone.utc) - datetime.fromisoformat(listener["1"]["hits"]["hits"][0]["fields"]['datetime'][0].replace("Z", "+00:00"))).seconds/60/60, "lon": float(listener["1"]["hits"]["hits"][0]["fields"]['uploader_position'][0].replace(" ","").split(",")[1]), "lat": float(listener["1"]["hits"]["hits"][0]["fields"]['uploader_position'][0].replace(" ","").split(",")[0]), "alt": float(listener["1"]["hits"]["hits"][0]["fields"]['uploader_alt'][0]), "description": f"""\n
\n Radio: {listener["1"]["hits"]["hits"][0]["fields"]["software_name.keyword"][0]}-{listener["1"]["hits"]["hits"][0]["fields"]["software_version.keyword"][0]}
\n Antenna: {listener["1"]["hits"]["hits"][0]["fields"]["uploader_antenna.keyword"][0]}
\n Last Contact: {listener["1"]["hits"]["hits"][0]["fields"]["datetime"][0]}
\n
\n """ } for listener in results["aggregations"]["2"]["buckets"] ] return json.dumps(output) def es_request(payload, path, method): # get aws creds session = boto3.Session() params = json.dumps(payload) headers = {"Host": HOST, "Content-Type": "application/json"} request = AWSRequest( method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers ) SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request) session = URLLib3Session() r = session.send(request.prepare()) return json.loads(r.text) if __name__ == "__main__": # print(get_sondes({"queryStringParameters":{"lat":"-28.22717","lon":"153.82996","distance":"50000"}}, {})) # mode: 6hours # type: positions # format: json # max_positions: 0 # position_id: 0 # vehicles: RS_*;*chase print( datanew( {"queryStringParameters":{ "mode": "1day", "type": "positions", "format": "json", "max_positions": "0", "position_id": "0", "vehicles": "S2720104" }},{} ) )