From 6e636fd2f592a31bc344815a0b59e16ce5c605dc Mon Sep 17 00:00:00 2001 From: xss Date: Wed, 11 May 2022 13:34:57 +1000 Subject: [PATCH] add full ham telm dump endpoints --- ham_query.tf | 46 +++++++++++ lambda/query_ham/__init__.py | 147 ++++++++++++++++++++++++++++++++++- lambda/query_ham/__main__.py | 24 +++--- swagger.yaml | 40 ++++++++++ 4 files changed, 242 insertions(+), 15 deletions(-) diff --git a/ham_query.tf b/ham_query.tf index 552453d..23dc54f 100644 --- a/ham_query.tf +++ b/ham_query.tf @@ -48,6 +48,28 @@ resource "aws_lambda_function" "ham_telem" { } } +resource "aws_lambda_function" "ham_telem_full" { + function_name = "ham_get_telem_full" + handler = "query_ham.get_telem_full" + s3_bucket = aws_s3_bucket_object.lambda.bucket + s3_key = aws_s3_bucket_object.lambda.key + source_code_hash = data.archive_file.lambda.output_base64sha256 + publish = true + memory_size = 1024 + role = aws_iam_role.basic_lambda_role.arn + runtime = "python3.9" + timeout = 30 + architectures = ["arm64"] + environment { + variables = { + "ES" = "es.${local.domain_name}" + } + } + tags = { + Name = "ham_get_telem_full" + } +} + resource "aws_lambda_permission" "ham_get" { action = "lambda:InvokeFunction" function_name = aws_lambda_function.ham_get.arn @@ -62,6 +84,13 @@ resource "aws_lambda_permission" "ham_telem" { source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.main.id}/*/*/amateur/telemetry" } +resource "aws_lambda_permission" "ham_telem_full" { + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.ham_telem_full.arn + principal = "apigateway.amazonaws.com" + source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.main.id}/*/*/amateur/telemetry/*" +} + resource "aws_apigatewayv2_route" "ham_get" { api_id = aws_apigatewayv2_api.main.id @@ -79,6 +108,14 @@ resource "aws_apigatewayv2_route" "ham_telem" { target = "integrations/${aws_apigatewayv2_integration.ham_telem.id}" } +resource "aws_apigatewayv2_route" "ham_telem_full" { + api_id = aws_apigatewayv2_api.main.id + api_key_required = false + authorization_type = "NONE" + route_key = "GET /amateur/telemetry/{payload_callsign}" + target = "integrations/${aws_apigatewayv2_integration.ham_telem_full.id}" +} + resource "aws_apigatewayv2_integration" "ham_get" { api_id = aws_apigatewayv2_api.main.id connection_type = "INTERNET" @@ -99,6 +136,15 @@ resource "aws_apigatewayv2_integration" "ham_telem" { payload_format_version = "2.0" } +resource "aws_apigatewayv2_integration" "ham_telem_full" { + api_id = aws_apigatewayv2_api.main.id + connection_type = "INTERNET" + integration_method = "POST" + integration_type = "AWS_PROXY" + integration_uri = aws_lambda_function.ham_telem_full.arn + timeout_milliseconds = 30000 + payload_format_version = "2.0" +} resource "aws_lambda_function" "ham_get_listener_telemetry" { function_name = "ham_get_listener_telemetry" diff --git a/lambda/query_ham/__init__.py b/lambda/query_ham/__init__.py index e39ec10..7225f69 100644 --- a/lambda/query_ham/__init__.py +++ b/lambda/query_ham/__init__.py @@ -2,7 +2,7 @@ import json from datetime import datetime, timedelta, timezone import base64 import gzip -from io import BytesIO +from io import BytesIO, StringIO import es def get(event, context): @@ -245,6 +245,151 @@ def get_telem(event, context): } } +def get_telem_full(event, context): + + + + if ( + "queryStringParameters" in event + and "last" in event["queryStringParameters"] + ): + last = int(event["queryStringParameters"]["last"]) + else: + last = 21600 # 6 hours + + if ( + "queryStringParameters" in event + and "datetime" in event["queryStringParameters"] + ): + requested_time = datetime.fromisoformat( + event["queryStringParameters"]["datetime"].replace("Z", "+00:00") + ) + else: + requested_time = datetime.now(timezone.utc) + + + lt = requested_time + timedelta(0, 1) + gte = requested_time - timedelta(0, last) + + path = f"ham-telm-{lt.year:2}-{lt.month:02},telm-{gte.year:2}-{gte.month:02}/_search" + payload = { + "timeout": "30s", + "size": 10000, + "query": { + "bool": { + "minimum_should_match": 1, + "must_not": [{"match_phrase": {"software_name": "SondehubV1"}}, {"match_phrase": {"payload_callsign": "xxxxxxxx"}}], + "should": [ + { + "bool": { + "must": [ + { + "exists": { + "field": "sats" + } + }, + { + "range": { + "sats": { + "gte": 1, + "lt": None + } + } + } + ] + } + }, + { + "bool": { + "must_not": [ + { + "exists": { + "field": "sats" + } + } + ] + } + } + ], + "filter": [ + {"match_all": {}}, + { + "range": { + "datetime": {"gte": gte.isoformat(), "lt": lt.isoformat()} + } + } + ] + } + }, + } + payload["query"]["bool"]["filter"].append( + { + "match_phrase": { + "payload_callsign": str(event["pathParameters"]["payload_callsign"]) + } + } + ) + data = [] + response = es.request(json.dumps(payload), path, "POST", params={"scroll": "1m"}) + scroll_id = response['_scroll_id'] + scroll_ids = [scroll_id] + data += [ x["_source"] for x in response['hits']['hits']] + + + while response['hits']['hits']: + response = es.request(json.dumps({"scroll": "1m", "scroll_id": scroll_id }), + "_search/scroll", "POST") + scroll_id = response['_scroll_id'] + scroll_ids.append(scroll_id) + data += [ x["_source"] for x in response['hits']['hits']] + for scroll_id in scroll_ids: # clean up scrolls + try: + scroll_delete = es.request(json.dumps({"scroll_id": scroll_id }), + "_search/scroll", "DELETE") + print(scroll_delete) + except RuntimeError: + pass + + content_type = "application/json" + # convert to CSV if requested + if ( + "queryStringParameters" in event + and "format" in event["queryStringParameters"] + and event["queryStringParameters"]['format'] == "csv" + ): + import csv + content_type = "text/csv" + csv_keys = list(set().union(*(d.keys() for d in data))) + csv_keys.remove("datetime") + csv_keys.insert(0,"datetime") # datetime should be at the front of the CSV + csv_output = StringIO(newline='') + fc = csv.DictWriter(csv_output, fieldnames=csv_keys) + fc.writeheader() + fc.writerows(data) + data = csv_output.getvalue() + else: + data = json.dumps(data) + + compressed = BytesIO() + with gzip.GzipFile(fileobj=compressed, mode='w') as f: + f.write(data.encode('utf-8')) + + gzippedResponse = compressed.getvalue() + body = base64.b64encode(gzippedResponse).decode() + if len(body) > (1024 * 1024 * 6) - 1000 : # check if payload is too big + content_type = "text/plain" + body = "Output is too large, try a smaller time frame" + + return { + "body": body, + "isBase64Encoded": True, + "statusCode": 400, + "headers": { + "Content-Encoding": "gzip", + "content-type": content_type + } + + } def get_listener_telemetry(event, context): diff --git a/lambda/query_ham/__main__.py b/lambda/query_ham/__main__.py index ff73821..7d9c4c4 100644 --- a/lambda/query_ham/__main__.py +++ b/lambda/query_ham/__main__.py @@ -3,24 +3,20 @@ import base64 import zlib -response = get_telem( +response = get_telem_full( { + "pathParameters": { + "payload_callsign" : "HORUS-V2" + }, "queryStringParameters":{ - # "payload_callsign": "HORUS-V2", - "duration": "3d" + "last": "3600", + "datetime": "2022-05-07T04:18:10.000000Z", + "format": "csv" } }, {}) - -# response = get_listener_telemetry( -# { -# "queryStringParameters":{ -# # "payload_callsign": "HORUS-V2", -# "duration": "3h" -# } -# }, {}) +print(len(response['body'])) compressed = base64.b64decode(response['body']) decompressed = (zlib.decompress(compressed, 16 + zlib.MAX_WBITS)) -print(json.loads(decompressed) -) -print(len(json.dumps(response))) \ No newline at end of file +#print(json.loads(decompressed)) +print(decompressed.decode().splitlines()[:5]) \ No newline at end of file diff --git a/swagger.yaml b/swagger.yaml index b9f507f..40407ae 100644 --- a/swagger.yaml +++ b/swagger.yaml @@ -75,6 +75,42 @@ paths: description: Returns a dictionary keyed by serial number of a dictionary of times with SondeHub Telemetry values schema: $ref: "#/definitions/amateur_query_results_format" + /amateur/telemetry/{payload_callsign}: + get: + summary: Request Amateur Radiosonde Telemetry Data + tags: + - amateur + description: > + Use this to get the current state of all the radiosondes then use the realtime API to access streaming data. Do not regularly poll this endpoint, it is rate limited. + produces: + - "application/json" + parameters: + - in: query + name: last + description: "How far back to search in seconds. Defaults to 24hrs" + type: number + - in: query + name: datetime + description: "End time to query as an ISO-8601 time string. Defaults to now. Example: `2021-02-02T11:27:38.634Z`" + required: false + type: string + format: date-time + - in: path + name: payload_callsign + description: Specific callsign to query (if wanted). Requests for data for a single sonde will return the highest time resolution data available. + required: true + type: string + - in: query + name: format + description: "If set to csv will provide a CSV of the results rather than JSON" + required: false + type: string + format: string + responses: + 200: + description: Returns a list of all data received + schema: + $ref: "#/definitions/amateur_query_full_results_format" /sondes/telemetry: put: summary: Upload Radiosonde Telemetry to Sondehub database. @@ -667,6 +703,10 @@ definitions: properties: datetime: $ref: "#/definitions/listener" + amateur_query_full_results_format: + type: array + items: + $ref: "#/definitions/amateur_telemetry_format" amateur_query_results_format: type: object properties: