From a061c4ae67520bc20cd4656323e9fe5117a2595b Mon Sep 17 00:00:00 2001 From: xss Date: Sat, 15 Jul 2023 11:30:56 +1000 Subject: [PATCH] Add site based endpoint --- lambda/historic_es_to_s3/__init__.py | 4 +- lambda/query/__init__.py | 62 ++++++++++++++++++++++++++++ lambda/query/__main__.py | 16 ++++--- query.tf | 48 +++++++++++++++++++++ swagger.yaml | 20 +++++++++ 5 files changed, 143 insertions(+), 7 deletions(-) diff --git a/lambda/historic_es_to_s3/__init__.py b/lambda/historic_es_to_s3/__init__.py index eba17b1..34542bd 100644 --- a/lambda/historic_es_to_s3/__init__.py +++ b/lambda/historic_es_to_s3/__init__.py @@ -71,7 +71,7 @@ def fetch_s3(serial): else: raise -def fetch_launch_sites(): +def fetch_launch_sites(time_filter="24h"): payload = { "aggs": { "2": { @@ -136,7 +136,7 @@ def fetch_launch_sites(): { "range": { "datetime": { - "gte": "now-24h", + "gte": f"now-{time_filter}", "lte": "now", "format": "strict_date_optional_time" } diff --git a/lambda/query/__init__.py b/lambda/query/__init__.py index 3cf7dc8..40ed71f 100644 --- a/lambda/query/__init__.py +++ b/lambda/query/__init__.py @@ -5,6 +5,8 @@ import gzip from io import BytesIO import es +from historic_es_to_s3 import fetch_launch_sites + def get_sondes(event, context): path = "telm-*/_search" payload = { @@ -81,6 +83,66 @@ def get_sondes(event, context): } return json.dumps(sondes) +def get_sondes_site(event, context): + site = str(event["pathParameters"]["site"]) + if "queryStringParameters" in event and "last" in event["queryStringParameters"]: + last_seconds = abs(int(event['queryStringParameters']['last'])) + if last_seconds > 60 * 60 * 24 * 7: + return f"Duration too long. Must be less than 7 days" + last_time = f"{last_seconds}s" + else: + last_time = "1d" + launch_sites = fetch_launch_sites(time_filter=last_time) + path = "telm-*/_search" + payload = { + "size": 0, + "aggs": { + "2": { + "terms": { + "field": "serial.keyword", + "order": {"_key": "desc"}, + "size": 10000, + }, + "aggs": { + "1": { + "top_hits": { + "size": 1, + "sort": [{"datetime": {"order": "desc"}}], + } + } + }, + } + }, + "query": { + "bool": { + "filter": [ + {"match_all": {}}, + ] + } + }, + } + + + + payload["query"]["bool"]["filter"].append( + {"range": {"datetime": {"gte": f"now-{last_time}", "lte": "now+1m"}}} + ) + try: + results = es.request(json.dumps(payload), path, "POST") + except: + print(json.dumps(event)) + raise + output = {} + buckets = results["aggregations"]["2"]["buckets"] + sondes = { + bucket["1"]["hits"]["hits"][0]["_source"]["serial"]: bucket["1"]["hits"][ + "hits" + ][0]["_source"] + for bucket in buckets + if bucket["1"]["hits"]["hits"][0]["_source"]["serial"] in launch_sites and + launch_sites[bucket["1"]["hits"]["hits"][0]["_source"]["serial"]]['launch_site'] == site + } + return json.dumps(sondes) def get_telem(event, context): diff --git a/lambda/query/__main__.py b/lambda/query/__main__.py index 2a61301..c71f79e 100644 --- a/lambda/query/__main__.py +++ b/lambda/query/__main__.py @@ -64,12 +64,18 @@ from . import * # ) # print(get_sites({"queryStringParameters":{"station":-1}},{})) -print(get_telem( +print(get_sondes_site( { - "queryStringParameters": { - "duration": "3h", - # "serial": "S4430086" - }},{} + "pathParameters": { + "site": "10868" + + }, + "queryStringParameters": {"last": str(60* 60 *24 * 7)} + },{} + # "queryStringParameters": { + # "duration": "3h", + # # "serial": "S4430086" + # }},{} )) # b=get_telem( # { diff --git a/query.tf b/query.tf index c263be4..10c5cf5 100644 --- a/query.tf +++ b/query.tf @@ -71,6 +71,28 @@ resource "aws_lambda_function" "get_sites" { } } +resource "aws_lambda_function" "get_sondes_site" { + function_name = "get_sondes_site" + handler = "query.get_sondes_site" + s3_bucket = aws_s3_bucket_object.lambda.bucket + s3_key = aws_s3_bucket_object.lambda.key + source_code_hash = data.archive_file.lambda.output_base64sha256 + publish = true + memory_size = 256 + role = aws_iam_role.basic_lambda_role.arn + runtime = "python3.9" + timeout = 30 + architectures = ["arm64"] + environment { + variables = { + "ES" = "es.${local.domain_name}" + } + } + tags = { + Name = "get_sondes_site" + } +} + resource "aws_lambda_function" "get_listener_telemetry" { function_name = "get_listener_telemetry" handler = "query.get_listener_telemetry" @@ -94,6 +116,32 @@ resource "aws_lambda_function" "get_listener_telemetry" { } +resource "aws_apigatewayv2_integration" "get_sondes_site" { + api_id = aws_apigatewayv2_api.main.id + connection_type = "INTERNET" + integration_method = "POST" + integration_type = "AWS_PROXY" + integration_uri = aws_lambda_function.get_sondes_site.arn + timeout_milliseconds = 30000 + payload_format_version = "2.0" +} + +resource "aws_lambda_permission" "get_sondes_site" { + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.get_sondes_site.arn + principal = "apigateway.amazonaws.com" + source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.main.id}/*/*/sondes/site/{site}" +} + +resource "aws_apigatewayv2_route" "get_sondes_site" { + api_id = aws_apigatewayv2_api.main.id + api_key_required = false + authorization_type = "NONE" + route_key = "GET /sondes/site/{site}" + target = "integrations/${aws_apigatewayv2_integration.get_sondes_site.id}" +} + + resource "aws_lambda_permission" "get_sondes" { action = "lambda:InvokeFunction" diff --git a/swagger.yaml b/swagger.yaml index 6e86921..16d4995 100644 --- a/swagger.yaml +++ b/swagger.yaml @@ -235,6 +235,26 @@ paths: description: Returns a dictionary keyed by serial number of a dictionary of times with SondeHub Telemetry values schema: $ref: "#/definitions/sonde_telm_results_format" + /sondes/site/{site}: + get: + summary: Request latest sonde data indexed by serial number based on site ID + produces: + - "application/json" + parameters: + - in: query + name: last + description: "How far back to search in seconds. Defaults to 24hrs. Limited to 7 days" + type: number + - in: path + name: site + description: Site number of the radiosonde to request data for (see /sites endpoint) + required: true + type: string + responses: + 200: + description: Returns a dictionary keyed by serial number of a dictionary of times with SondeHub Telemetry values + schema: + $ref: "#/definitions/sonde_telm_results_format" /sonde/{serial}: get: summary: Request telemetry data for an individual radiosonde