diff --git a/main.tf b/main.tf index 8683184..32e7150 100644 --- a/main.tf +++ b/main.tf @@ -462,6 +462,31 @@ resource "aws_lambda_function" "get_sondes" { ] } + +resource "aws_lambda_function" "get_telem" { + function_name = "get_telem" + handler = "lambda_function.get_telem" + filename = "${path.module}/build/query.zip" + source_code_hash = data.archive_file.query.output_base64sha256 + publish = true + memory_size = 256 + role = aws_iam_role.IAMRole5.arn + runtime = "python3.7" + timeout = 30 + tracing_config { + mode = "Active" + } + environment { + variables = { + "ES" = "es.${local.domain_name}" + } + } + layers = [ + "arn:aws:lambda:us-east-1:${data.aws_caller_identity.current.account_id}:layer:xray-python:1", + "arn:aws:lambda:us-east-1:${data.aws_caller_identity.current.account_id}:layer:iot:3" + ] +} + resource "aws_lambda_function" "sign_socket" { function_name = "sign-websocket" handler = "lambda_function.lambda_handler" @@ -500,6 +525,13 @@ resource "aws_lambda_permission" "get_sondes" { source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:r03szwwq41/*/*/sondes" } +resource "aws_lambda_permission" "get_telem" { + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.get_telem.arn + principal = "apigateway.amazonaws.com" + source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:r03szwwq41/*/*/sondes/telemetry" +} + resource "aws_lambda_permission" "LambdaPermission2" { action = "lambda:InvokeFunction" function_name = aws_lambda_function.LambdaFunction.arn @@ -597,6 +629,14 @@ resource "aws_apigatewayv2_route" "get_sondes" { target = "integrations/${aws_apigatewayv2_integration.get_sondes.id}" } +resource "aws_apigatewayv2_route" "get_telem" { + api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id + api_key_required = false + authorization_type = "NONE" + route_key = "GET /sondes/telemetry" + target = "integrations/${aws_apigatewayv2_integration.get_telem.id}" +} + resource "aws_apigatewayv2_integration" "sign_socket" { api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id connection_type = "INTERNET" @@ -617,6 +657,16 @@ resource "aws_apigatewayv2_integration" "get_sondes" { payload_format_version = "2.0" } +resource "aws_apigatewayv2_integration" "get_telem" { + api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id + connection_type = "INTERNET" + integration_method = "POST" + integration_type = "AWS_PROXY" + integration_uri = aws_lambda_function.get_telem.arn + timeout_milliseconds = 30000 + payload_format_version = "2.0" +} + resource "aws_apigatewayv2_integration" "ApiGatewayV2Integration" { api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id connection_type = "INTERNET" diff --git a/query/lambda_function.py b/query/lambda_function.py index 8b0b0a5..990c643 100644 --- a/query/lambda_function.py +++ b/query/lambda_function.py @@ -6,46 +6,31 @@ from botocore.auth import SigV4Auth import json import os -HOST=os.getenv("ES") +HOST = os.getenv("ES") # get current sondes, filter by date, location + def get_sondes(event, context): path = "telm-*/_search" payload = { "aggs": { "2": { - "terms": { - "field": "serial.keyword", - "order": { - "_key": "desc" + "terms": { + "field": "serial.keyword", + "order": {"_key": "desc"}, + "size": 10000, }, - "size": 10000 - }, - "aggs": { - "1": { - "top_hits": { - "size": 1, - "sort": [ - { - "datetime": { - "order": "desc" + "aggs": { + "1": { + "top_hits": { + "size": 1, + "sort": [{"datetime": {"order": "desc"}}], } } - ] - } - } - } + }, } }, - "query": { - "bool": { - "filter": [ - { - "match_all": {} - } - ] - } - } + "query": {"bool": {"filter": [{"match_all": {}}]}}, } # add filters @@ -56,60 +41,154 @@ def get_sondes(event, context): "range": { "datetime": { "gte": f"now-{int(event['queryStringParameters']['last'])}s", - "lte": "now" + "lte": "now", } } } ) - if "lat" in event["queryStringParameters"] and "lon" in event["queryStringParameters"] and "distance" in event["queryStringParameters"]: + if ( + "lat" in event["queryStringParameters"] + and "lon" in event["queryStringParameters"] + and "distance" in event["queryStringParameters"] + ): payload["query"]["bool"]["filter"].append( { "geo_distance": { "distance": f"{int(event['queryStringParameters']['distance'])}m", "position": { - "lat": float(event['queryStringParameters']['lat']), - "lon": float(event['queryStringParameters']['lon']) - } + "lat": float(event["queryStringParameters"]["lat"]), + "lon": float(event["queryStringParameters"]["lon"]), + }, } } ) # if the user doesn't specify a range we should add one - 24 hours is probably a good start if "range" not in payload["query"]["bool"]["filter"]: payload["query"]["bool"]["filter"].append( - { - "range": { - "datetime": { - "gte": "now-1d", - "lte": "now" + {"range": {"datetime": {"gte": "now-1d", "lte": "now"}}} + ) + + results = es_request(payload, path, "POST") + buckets = results["aggregations"]["2"]["buckets"] + sondes = { + bucket["1"]["hits"]["hits"][0]["_source"]["serial"]: bucket["1"]["hits"][ + "hits" + ][0]["_source"] + for bucket in buckets + } + return json.dumps(sondes) + + +def get_telem(event, context): + + durations = { # ideally we shouldn't need to predefine these, but it's a shit load of data and we don't need want to overload ES + "3d": (259200, 1200), # 3d, 20m + "1d": (86400, 600), # 1d, 10m + "6h": (21600, 60), # 6h, 1m + "3h": (10800, 1), # 3h, 1s + } + duration_query = "3h" + + if ( + "queryStringParameters" in event + and "duration" in event["queryStringParameters"] + ): + if event["queryStringParameters"]["duration"] in durations: + duration_query = event["queryStringParameters"]["duration"] + else: + return f"Duration must be either {', '.join(durations.keys())}" + + (duration, interval) = durations[duration_query] + + path = "telm-*/_search" + payload = { + "aggs": { + "2": { + "terms": { + "field": "serial.keyword", + "order": {"_key": "desc"}, + "size": 10000, + }, + "aggs": { + "3": { + "date_histogram": { + "field": "datetime", + "fixed_interval": f"{str(interval)}s", + "time_zone": "Australia/Brisbane", + "min_doc_count": 1, + }, + "aggs": { + "1": { + "top_hits": { + "docvalue_fields": [ + {"field": "position"}, + {"field": "alt"}, + {"field": "datetime"}, + ], + "_source": "position", + "size": 1, + "sort": [{"datetime": {"order": "desc"}}], + } + } + }, + } + }, + } + }, + "query": { + "bool": { + "filter": [ + {"match_all": {}}, + { + "range": { + "datetime": {"gte": f"now-{str(duration)}s", "lt": "now"} } } + ] + } + }, + } + if "queryStringParameters" in event: + if "serial" in event["queryStringParameters"]: + print("test") + payload["query"]["bool"]["filter"].append( + { + "match_phrase": { + "serial": str(event["queryStringParameters"]["serial"]) + } } ) - + print(payload) results = es_request(payload, path, "POST") - buckets = results["aggregations"]["2"]["buckets"] - sondes = { bucket["1"]["hits"]["hits"][0]["_source"]["serial"]: bucket["1"]["hits"]["hits"][0]["_source"] for bucket in buckets} - return json.dumps(sondes) + output = { + sonde["key"]: { + data["key_as_string"]: { + field: data["1"]["hits"]["hits"][0]["fields"][field][0] + for field in data["1"]["hits"]["hits"][0]["fields"] + } + for data in sonde["3"]["buckets"] + } + for sonde in results["aggregations"]["2"]["buckets"] + } + return json.dumps(output) + def es_request(payload, path, method): - #get aws creds + # get aws creds session = boto3.Session() params = json.dumps(payload) - headers = { - 'Host': HOST, - 'Content-Type': "application/json" - } - request = AWSRequest(method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers) - SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request) - + headers = {"Host": HOST, "Content-Type": "application/json"} + request = AWSRequest( + method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers + ) + SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request) session = URLLib3Session() r = session.send(request.prepare()) return json.loads(r.text) - if __name__ == "__main__": - #print(get_sondes({"queryStringParameters":{"lat":"-28.22717","lon":"153.82996","distance":"50000"}}, {})) - print(get_sondes({},{})) \ No newline at end of file + # print(get_sondes({"queryStringParameters":{"lat":"-28.22717","lon":"153.82996","distance":"50000"}}, {})) + print(get_telem({"queryStringParameters":{"serial": "R4450388", "duration": "3h"}}, {}))