diff --git a/ham_query.tf b/ham_query.tf new file mode 100644 index 0000000..295a566 --- /dev/null +++ b/ham_query.tf @@ -0,0 +1,100 @@ + +resource "aws_lambda_function" "ham_get" { + function_name = "ham_get" + handler = "query_ham.get" + s3_bucket = aws_s3_bucket_object.lambda.bucket + s3_key = aws_s3_bucket_object.lambda.key + source_code_hash = data.archive_file.lambda.output_base64sha256 + publish = true + memory_size = 256 + role = aws_iam_role.basic_lambda_role.arn + runtime = "python3.9" + timeout = 30 + architectures = ["arm64"] + environment { + variables = { + "ES" = "es.${local.domain_name}" + } + } + tags = { + Name = "ham_get" + } +} + + + + + + +resource "aws_lambda_function" "ham_telem" { + function_name = "ham_get_telem" + handler = "query_ham.get_telem" + s3_bucket = aws_s3_bucket_object.lambda.bucket + s3_key = aws_s3_bucket_object.lambda.key + source_code_hash = data.archive_file.lambda.output_base64sha256 + publish = true + memory_size = 256 + role = aws_iam_role.basic_lambda_role.arn + runtime = "python3.9" + timeout = 30 + architectures = ["arm64"] + environment { + variables = { + "ES" = "es.${local.domain_name}" + } + } + tags = { + Name = "ham_get_telem" + } +} + +resource "aws_lambda_permission" "ham_get" { + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.ham_get.arn + principal = "apigateway.amazonaws.com" + source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.main.id}/*/*/amateur" +} + +resource "aws_lambda_permission" "ham_telem" { + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.ham_telem.arn + principal = "apigateway.amazonaws.com" + source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.main.id}/*/*/amateur/telemetry" +} + + +resource "aws_apigatewayv2_route" "ham_get" { + api_id = aws_apigatewayv2_api.main.id + api_key_required = false + authorization_type = "NONE" + route_key = "GET /amateur" + target = "integrations/${aws_apigatewayv2_integration.ham_get.id}" +} + +resource "aws_apigatewayv2_route" "ham_telem" { + api_id = aws_apigatewayv2_api.main.id + api_key_required = false + authorization_type = "NONE" + route_key = "GET /amateur/telemetry" + target = "integrations/${aws_apigatewayv2_integration.ham_telem.id}" +} + +resource "aws_apigatewayv2_integration" "ham_get" { + api_id = aws_apigatewayv2_api.main.id + connection_type = "INTERNET" + integration_method = "POST" + integration_type = "AWS_PROXY" + integration_uri = aws_lambda_function.ham_get.arn + timeout_milliseconds = 30000 + payload_format_version = "2.0" +} + +resource "aws_apigatewayv2_integration" "ham_telem" { + api_id = aws_apigatewayv2_api.main.id + connection_type = "INTERNET" + integration_method = "POST" + integration_type = "AWS_PROXY" + integration_uri = aws_lambda_function.ham_telem.arn + timeout_milliseconds = 30000 + payload_format_version = "2.0" +} \ No newline at end of file diff --git a/lambda/query_ham/__init__.py b/lambda/query_ham/__init__.py new file mode 100644 index 0000000..dfad2d1 --- /dev/null +++ b/lambda/query_ham/__init__.py @@ -0,0 +1,215 @@ +import json +from datetime import datetime, timedelta, timezone +import base64 +import gzip +from io import BytesIO +import es + +def get(event, context): + path = "ham-telm-*/_search" + payload = { + "size": 0, + "aggs": { + "2": { + "terms": { + "field": "payload_callsign.keyword", + "order": {"_key": "desc"}, + "size": 10000, + }, + "aggs": { + "1": { + "top_hits": { + "size": 1, + "sort": [{"datetime": {"order": "desc"}}], + } + } + }, + } + }, + "query": {"bool": {"filter": [{"match_all": {}}]}}, + } + + # add filters + if "queryStringParameters" in event: + if "last" in event["queryStringParameters"]: + payload["query"]["bool"]["filter"].append( + { + "range": { + "datetime": { + "gte": f"now-{abs(int(event['queryStringParameters']['last']))}s", + "lte": "now+1m", + } + } + } + ) + else: + payload["query"]["bool"]["filter"].append( + {"range": {"datetime": {"gte": "now-1d", "lte": "now+1m"}}} + ) + + if ( + "lat" in event["queryStringParameters"] + and "lon" in event["queryStringParameters"] + and "distance" in event["queryStringParameters"] + ): + payload["query"]["bool"]["filter"].append( + { + "geo_distance": { + "distance": f"{int(event['queryStringParameters']['distance'])}m", + "position": { + "lat": float(event["queryStringParameters"]["lat"]), + "lon": float(event["queryStringParameters"]["lon"]), + }, + } + } + ) + else: + payload["query"]["bool"]["filter"].append( + {"range": {"datetime": {"gte": "now-1d", "lte": "now+1m"}}} + ) + try: + results = es.request(json.dumps(payload), path, "POST") + except: + print(json.dumps(event)) + raise + buckets = results["aggregations"]["2"]["buckets"] + sondes = { + bucket["1"]["hits"]["hits"][0]["_source"]["payload_callsign"]: bucket["1"]["hits"][ + "hits" + ][0]["_source"] + for bucket in buckets + } + return json.dumps(sondes) + + +def get_telem(event, context): + + durations = { # ideally we shouldn't need to predefine these, but it's a shit load of data and we don't need want to overload ES + "3d": (259200, 1200), # 3d, 20m + "1d": (86400, 600), # 1d, 10m + "12h": (43200, 600), # 1d, 10m + "6h": (21600, 1), # 6h, 1m + "3h": (10800, 1), # 3h, 10s + "1h": (3600, 1), + "30m": (1800, 1), + "1m": (60, 1), + "15s": (15, 1), + "0": (0, 1) # for getting a single time point + } + duration_query = "3h" + requested_time = datetime.now(timezone.utc) + + if ( + "queryStringParameters" in event + and "duration" in event["queryStringParameters"] + ): + if event["queryStringParameters"]["duration"] in durations: + duration_query = event["queryStringParameters"]["duration"] + else: + return f"Duration must be either {', '.join(durations.keys())}" + + if ( + "queryStringParameters" in event + and "datetime" in event["queryStringParameters"] + ): + requested_time = datetime.fromisoformat( + event["queryStringParameters"]["datetime"].replace("Z", "+00:00") + ) + + (duration, interval) = durations[duration_query] + if "payload_callsign" in event["queryStringParameters"]: + interval = 1 + lt = requested_time + timedelta(0, 1) + gte = requested_time - timedelta(0, duration) + + path = f"ham-telm-{lt.year:2}-{lt.month:02},telm-{gte.year:2}-{gte.month:02}/_search" + payload = { + "timeout": "30s", + "size": 0, + "aggs": { + "2": { + "terms": { + "field": "payload_callsign.keyword", + "order": {"_key": "desc"}, + "size": 10000, + }, + "aggs": { + "3": { + "date_histogram": { + "field": "datetime", + "fixed_interval": f"{str(interval)}s", + "min_doc_count": 1, + }, + "aggs": { + "1": { + "top_hits": { + # "docvalue_fields": [ + # {"field": "position"}, + # {"field": "alt"}, + # {"field": "datetime"}, + # ], + # "_source": "position", + "size": 10, + "sort": [ + {"datetime": {"order": "desc"}} + ], + } + } + }, + } + }, + } + }, + "query": { + "bool": { + "must_not": [{"match_phrase": {"software_name": "SondehubV1"}}, {"match_phrase": {"payload_callsign": "xxxxxxxx"}}], + "filter": [ + {"match_all": {}}, + { + "range": { + "datetime": {"gte": gte.isoformat(), "lt": lt.isoformat()} + } + }, + ] + } + }, + } + if "queryStringParameters" in event: + if "payload_callsign" in event["queryStringParameters"]: + payload["query"]["bool"]["filter"].append( + { + "match_phrase": { + "payload_callsign": str(event["queryStringParameters"]["payload_callsign"]) + } + } + ) + results = es.request(json.dumps(payload), path, "POST") + output = { + sonde["key"]: { + data["key_as_string"]: dict(data["1"]["hits"]["hits"][0]["_source"], + uploaders=[ #add additional uploader information + {key:value for key,value in uploader['_source'].items() if key in ["snr","rssi","uploader_callsign", "frequency"]} + for uploader in data["1"]["hits"]["hits"] + ]) + for data in sonde["3"]["buckets"] + } + for sonde in results["aggregations"]["2"]["buckets"] + } + + compressed = BytesIO() + with gzip.GzipFile(fileobj=compressed, mode='w') as f: + json_response = json.dumps(output) + f.write(json_response.encode('utf-8')) + + gzippedResponse = compressed.getvalue() + return { + "body": base64.b64encode(gzippedResponse).decode(), + "isBase64Encoded": True, + "statusCode": 200, + "headers": { + "Content-Encoding": "gzip", + "content-type": "application/json" + } + + } + diff --git a/lambda/query_ham/__main__.py b/lambda/query_ham/__main__.py new file mode 100644 index 0000000..c3f0b6a --- /dev/null +++ b/lambda/query_ham/__main__.py @@ -0,0 +1,18 @@ +from . import * +import base64 + +import zlib + +response = get_telem( + { + "queryStringParameters":{ + "payload_callsign": "HORUS-V2", + "duration": "3d" + } + }, {}) +compressed = base64.b64decode(response['body']) + +decompressed = (zlib.decompress(compressed, 16 + zlib.MAX_WBITS)) +print(json.loads(decompressed) +) +print(len(json.dumps(response))) \ No newline at end of file