diff --git a/historic.tf b/historic.tf index ddb3ac6..2c87e00 100644 --- a/historic.tf +++ b/historic.tf @@ -78,10 +78,10 @@ resource "aws_lambda_function" "historic_to_s3" { filename = "${path.module}/build/historic_to_s3.zip" source_code_hash = data.archive_file.historic_to_s3.output_base64sha256 publish = true - memory_size = 512 + memory_size = 2048 role = aws_iam_role.historic.arn runtime = "python3.8" - timeout = 30 + timeout = 60 reserved_concurrent_executions = 8 environment { variables = { diff --git a/historic/historic_es_to_s3/index.py b/historic/historic_es_to_s3/index.py index 5a8bd5b..5009a04 100644 --- a/historic/historic_es_to_s3/index.py +++ b/historic/historic_es_to_s3/index.py @@ -7,6 +7,7 @@ import botocore.credentials import os import gzip from botocore.exceptions import ClientError +from io import BytesIO HOST = os.getenv("ES") BUCKET = "sondehub-history" @@ -17,7 +18,13 @@ def es_request(payload, path, method, params=None): # get aws creds session = boto3.Session() - headers = {"Host": HOST, "Content-Type": "application/json"} + compressed = BytesIO() + with gzip.GzipFile(fileobj=compressed, mode='w') as f: + f.write(payload.encode('utf-8')) + payload = compressed.getvalue() + + headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"} + request = AWSRequest( method=method, url=f"https://{HOST}/{path}", data=payload, headers=headers, params=params ) diff --git a/historic/queue_data_update/index.py b/historic/queue_data_update/index.py index 9f33af1..5d9cbd8 100644 --- a/historic/queue_data_update/index.py +++ b/historic/queue_data_update/index.py @@ -9,6 +9,8 @@ import zlib import base64 import datetime import os +import gzip +from io import BytesIO HOST = os.getenv("ES") @@ -23,7 +25,14 @@ def es_request(payload, path, method): session = boto3.Session() params = json.dumps(payload) - headers = {"Host": HOST, "Content-Type": "application/json"} + + compressed = BytesIO() + with gzip.GzipFile(fileobj=compressed, mode='w') as f: + f.write(params.encode('utf-8')) + params = compressed.getvalue() + + headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"} + request = AWSRequest( method=method, url=f"https://{HOST}/{path}", data=params, headers=headers ) diff --git a/history/lambda_function.py b/history/lambda_function.py index 05e80c6..1731001 100644 --- a/history/lambda_function.py +++ b/history/lambda_function.py @@ -8,7 +8,8 @@ import os from datetime import datetime, timedelta, timezone import sys, traceback import uuid - +import gzip +from io import BytesIO # TODO , HEAD S3 object, if it's less than 24 hours check ES, else 302 to bucket @@ -84,7 +85,14 @@ def es_request(payload, path, method): session = boto3.Session() params = json.dumps(payload) - headers = {"Host": HOST, "Content-Type": "application/json"} + + compressed = BytesIO() + with gzip.GzipFile(fileobj=compressed, mode='w') as f: + f.write(params.encode('utf-8')) + params = compressed.getvalue() + + + headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"} request = AWSRequest( method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers ) @@ -98,7 +106,7 @@ def es_request(payload, path, method): if __name__ == "__main__": print( history( - {"pathParameters": {"serial": "S5050020"}}, {} + {"pathParameters": {"serial": "T1510227"}}, {} ) ) diff --git a/main.tf b/main.tf index 09e721b..02b0009 100644 --- a/main.tf +++ b/main.tf @@ -619,27 +619,7 @@ resource "aws_lambda_function" "listeners" { } -resource "aws_lambda_function" "datanew" { - function_name = "datanew" - handler = "lambda_function.datanew" - filename = "${path.module}/build/query.zip" - source_code_hash = data.archive_file.query.output_base64sha256 - publish = true - memory_size = 1024 - role = aws_iam_role.IAMRole5.arn - runtime = "python3.7" - timeout = 30 - environment { - variables = { - "ES" = "es.${local.domain_name}" - } - } - layers = [ - "arn:aws:lambda:us-east-1:${data.aws_caller_identity.current.account_id}:layer:xray-python:1", - "arn:aws:lambda:us-east-1:${data.aws_caller_identity.current.account_id}:layer:iot:3" - ] -} resource "aws_lambda_function" "predictions" { function_name = "predictions" @@ -779,12 +759,6 @@ resource "aws_lambda_permission" "listeners" { source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.ApiGatewayV2Api.id}/*/*/listeners" } -resource "aws_lambda_permission" "datanew" { - action = "lambda:InvokeFunction" - function_name = aws_lambda_function.datanew.arn - principal = "apigateway.amazonaws.com" - source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.ApiGatewayV2Api.id}/*/*/datanew" -} resource "aws_lambda_permission" "predictions" { action = "lambda:InvokeFunction" @@ -848,6 +822,7 @@ resource "aws_s3_bucket" "S3Bucket" { resource "aws_cloudwatch_log_group" "LogsLogGroup" { name = "/aws/lambda/sonde-api-to-iot-core" + retention_in_days = 30 } resource "aws_apigatewayv2_api" "ApiGatewayV2Api" { @@ -951,13 +926,7 @@ resource "aws_apigatewayv2_route" "listeners" { target = "integrations/${aws_apigatewayv2_integration.listeners.id}" } -resource "aws_apigatewayv2_route" "datanew" { - api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id - api_key_required = false - authorization_type = "NONE" - route_key = "GET /datanew" - target = "integrations/${aws_apigatewayv2_integration.datanew.id}" -} + resource "aws_apigatewayv2_route" "predictions" { api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id @@ -1023,15 +992,6 @@ resource "aws_apigatewayv2_integration" "listeners" { payload_format_version = "2.0" } -resource "aws_apigatewayv2_integration" "datanew" { - api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id - connection_type = "INTERNET" - integration_method = "POST" - integration_type = "AWS_PROXY" - integration_uri = aws_lambda_function.datanew.arn - timeout_milliseconds = 30000 - payload_format_version = "2.0" -} resource "aws_apigatewayv2_integration" "predictions" { api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id @@ -1153,7 +1113,7 @@ EOF ebs_options { ebs_enabled = true volume_type = "gp2" - volume_size = 500 + volume_size = 400 } log_publishing_options { cloudwatch_log_group_arn = "arn:aws:logs:us-east-1:143841941773:log-group:/aws/aes/domains/sondes-v2/application-logs" diff --git a/predict/lambda_function.py b/predict/lambda_function.py index d441afd..3cc809d 100644 --- a/predict/lambda_function.py +++ b/predict/lambda_function.py @@ -10,8 +10,8 @@ import sys, traceback import http.client import math import logging - -logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG) +import gzip +from io import BytesIO HOST = os.getenv("ES") @@ -323,9 +323,16 @@ def es_request(payload, path, method): session = boto3.Session() params = json.dumps(payload) - headers = {"Host": HOST, "Content-Type": "application/json"} + + compressed = BytesIO() + with gzip.GzipFile(fileobj=compressed, mode='w') as f: + f.write(params.encode('utf-8')) + params = compressed.getvalue() + + + headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"} request = AWSRequest( - method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers + method=method, url=f"https://{HOST}/{path}", data=params, headers=headers ) SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request) @@ -344,7 +351,7 @@ if __name__ == "__main__": # vehicles: RS_*;*chase print(predict( {"queryStringParameters" : { - # "vehicles": "S4610686" + "vehicles": "" }},{} )) diff --git a/query/lambda_function.py b/query/lambda_function.py index d8b93eb..6d5035f 100644 --- a/query/lambda_function.py +++ b/query/lambda_function.py @@ -347,320 +347,6 @@ def get_listener_telemetry(event, context): } -def datanew(event, context): - durations = { # ideally we shouldn't need to predefine these, but it's a shit load of data and we don't need want to overload ES - "3days": (259200, 1800), # 3d, 20m - "1day": (86400, 900), # 1d, 10m - "12hours": (43200, 480), # 12h, 2m - "6hours": (21600, 180), # 6h, 1m - "3hours": (10800, 90), # 3h, 10s - "1hour": (3600, 30), # 1h, 5s - } - duration_query = "1hour" - requested_time = datetime.now(timezone.utc) - - max_positions = ( - int(event["queryStringParameters"]["max_positions"]) - if "max_positions" in event["queryStringParameters"] - else 10000 - ) - - if event["queryStringParameters"]["mode"] in durations: - duration_query = event["queryStringParameters"]["mode"] - (duration, interval) = durations[duration_query] - elif event["queryStringParameters"]["mode"] == "single": - duration = 259200 - interval = 1 - else: - return f"Duration must be either {', '.join(durations.keys())}" - - - if "vehicles" in event["queryStringParameters"] and ( - event["queryStringParameters"]["vehicles"] != "RS_*;*chase" - and event["queryStringParameters"]["vehicles"] != "" - ): - interval = 2 - - - if event["queryStringParameters"]["position_id"] != "0": - if event["queryStringParameters"]["mode"] == "single": - position_id = event["queryStringParameters"]["position_id"] - matches = re.search("(.+)-(\d{4}-\d{2}-\d{2}\w\d{2}:\d{2}:\d{2}.\d+)Z",position_id).groups() - matched_time = matches[1].replace("Z", "+00:00") - matched_vehicle = matches[0] - requested_time = datetime.fromisoformat(matched_time) - lt = requested_time - gte = requested_time - - else: - requested_time = datetime.fromisoformat( - event["queryStringParameters"]["position_id"].replace("Z", "+00:00") - ) - lt = requested_time + timedelta(seconds=duration) - gte = requested_time - - elif event["queryStringParameters"]["mode"] == "single": - return f"Single requires a position id specified" - else: - lt = datetime.now(timezone.utc) + timedelta(seconds=60) - gte = datetime.now(timezone.utc) - timedelta(0, duration) - output = {"positions": {"position": []}} - if "chase_only" not in event["queryStringParameters"] or event["queryStringParameters"]["chase_only"] != "true": - path = "telm-*/_search" - payload = { - "timeout": "30s", - "aggs": { - "2": { - "terms": { - "field": "serial.keyword", - "order": {"_key": "desc"}, - "size": 10000, - }, - "aggs": { - "3": { - "date_histogram": { - "field": "datetime", - "fixed_interval": f"{str(interval)}s", - "min_doc_count": 1, - }, - "aggs": { - "1": { - "top_hits": { - "size": 5, - "sort": [ - {"datetime": {"order": "desc"}}, - {"pressure": {"order": "desc","mode" : "median"}}, - {"humidity": {"order": "desc","mode" : "median"}}, - {"temp": {"order": "desc","mode" : "median"}}, - {"alt": {"order": "desc","mode" : "median"}} - ], - } - } - }, - } - }, - } - }, - "query": { - "bool": { - "filter": [ - {"match_all": {}}, - { - "range": { - "datetime": {"gte": gte.isoformat(), "lte": lt.isoformat()} - } - }, - ], - "must_not": [{"match_phrase": {"software_name": "SondehubV1"}}, {"match_phrase": {"serial": "xxxxxxxx"}}], - } - }, - } - if ( - "vehicles" in event["queryStringParameters"] - and event["queryStringParameters"]["vehicles"] != "RS_*;*chase" - and event["queryStringParameters"]["vehicles"] != "" - ): - payload["query"]["bool"]["filter"].append( - { - "match_phrase": { - "serial": str(event["queryStringParameters"]["vehicles"]) - } - } - ) - if event["queryStringParameters"]["mode"] == "single": - payload["query"]["bool"]["filter"].append( - { - "match_phrase": { - "serial": matched_vehicle - } - } - ) - results = es_request(payload, path, "POST") - - - - for sonde in results["aggregations"]["2"]["buckets"]: - for frame in sonde["3"]["buckets"]: - try: - frame_data = frame["1"]["hits"]["hits"][0]["_source"] - uploaders = {} - - # Use subtype if it exists, else just use the basic type. - if "subtype" in frame_data: - _type = html.escape(frame_data["subtype"]) - else: - _type = html.escape(frame_data["type"]) - - data = { - "manufacturer": html.escape(frame_data['manufacturer']), - "type": html.escape(_type) - } - - if "temp" in frame_data: - data["temperature_external"] = frame_data["temp"] - - if "humidity" in frame_data: - data["humidity"] = frame_data["humidity"] - - if "pressure" in frame_data: - data["pressure"] = frame_data["pressure"] - - if "sats" in frame_data: - data["sats"] = frame_data["sats"] - - if "batt" in frame_data: - data["batt"] = frame_data["batt"] - - if "burst_timer" in frame_data: - data["burst_timer"] = frame_data["burst_timer"] - - if "frequency" in frame_data: - data["frequency"] = frame_data["frequency"] - - # May need to revisit this, if the resultant strings are too long. - if "xdata" in frame_data: - data["xdata"] = html.escape(frame_data["xdata"]) - - output["positions"]["position"].append( - { - "position_id": html.escape(f'{frame_data["serial"]}-{frame_data["datetime"]}'), - "mission_id": "0", - "vehicle": html.escape(frame_data["serial"]), - "server_time": html.escape(frame_data["datetime"]), - "gps_time": html.escape(frame_data["datetime"]), - "gps_lat": frame_data["lat"], - "gps_lon": frame_data["lon"], - "gps_alt": frame_data["alt"], - "gps_heading": frame_data["heading"] - if "heading" in frame_data - else "", - "gps_speed": frame_data["vel_h"] if "vel_h" in frame_data else "", - "type": html.escape(_type), - "picture": "", - "temp_inside": "", - "data": data, - "callsign": uploaders, - "sequence": "0", - } - ) - except: - traceback.print_exc(file=sys.stdout) - output["positions"]["position"][-1]["callsign"] = { - html.escape(x["_source"]['uploader_callsign']) : { - "snr" : x["_source"]["snr"] if "snr" in x["_source"] else None, - "rssi" : x["_source"]["rssi"] if "rssi" in x["_source"] else None - } - for x in frame["1"]["hits"]["hits"] - } - - # get chase cars - - payload = { - "timeout": "30s", - "aggs": { - "2": { - "terms": { - "field": "uploader_callsign.keyword", - "order": {"_key": "desc"}, - "size": 10000, - }, - "aggs": { - "3": { - "date_histogram": { - "field": "ts", - "fixed_interval": f"{str(interval)}s", - "min_doc_count": 1, - }, - "aggs": { - "1": { - "top_hits": { - "size": 1, - "sort": [{"ts": {"order": "desc"}}], - } - } - }, - } - }, - } - }, - "query": { - "bool": { - "filter": [ - {"match_all": {}}, - { - "match_phrase": { - "mobile": True - } - }, - { - "range": { - "ts": {"gte": gte.isoformat(), "lt": lt.isoformat()} - } - }, - ] - } - }, - } - - path = "listeners-*/_search" - - # {"position_id":"82159921","mission_id":"0","vehicle":"KB9RKU_chase", - # "server_time":"2021-04-09 06:28:55.109589","gps_time":"2021-04-09 06:28:54", - # "gps_lat":"41.539648333","gps_lon":"-89.111862667","gps_alt":"231.6","gps_heading":"", - # "gps_speed":"0","picture":"","temp_inside":"","data":{},"callsign":"","sequence":""} - if event["queryStringParameters"]["mode"] != "single": - results = es_request(payload, path, "POST") - - for car in results["aggregations"]["2"]["buckets"]: - for frame in car["3"]["buckets"]: - try: - frame_data = frame["1"]["hits"]["hits"][0]["_source"] - - - data = {} - # - output["positions"]["position"].append( - { - "position_id": html.escape(f'{frame_data["uploader_callsign"]}-{frame_data["ts"]}'), - "mission_id": "0", - "vehicle": html.escape(f'{frame_data["uploader_callsign"]}_chase'), - "server_time": html.escape(datetime.fromtimestamp(frame_data["ts"]/1000).isoformat()), - "gps_time": html.escape(datetime.fromtimestamp(frame_data["ts"]/1000).isoformat()), - "gps_lat": frame_data["uploader_position"][0], - "gps_lon": frame_data["uploader_position"][1], - "gps_alt": frame_data["uploader_position"][2], - "gps_heading": "", - "gps_speed": 0, - "picture": "", - "temp_inside": "", - "data": data, - "callsign": html.escape(frame_data["uploader_callsign"]), - "sequence": "", - } - ) - except: - traceback.print_exc(file=sys.stdout) - - output["positions"]["position"] = sorted( - output["positions"]["position"], key=lambda k: k["position_id"] - ) - compressed = BytesIO() - with gzip.GzipFile(fileobj=compressed, mode='w') as f: - json_response = json.dumps(output) - f.write(json_response.encode('utf-8')) - - gzippedResponse = compressed.getvalue() - return { - "body": base64.b64encode(gzippedResponse).decode(), - "isBase64Encoded": True, - "statusCode": 200, - "headers": { - "Content-Encoding": "gzip", - "content-type": "application/json" - } - - } - def get_listeners(event, context): @@ -767,7 +453,12 @@ def es_request(payload, path, method): session = boto3.Session() params = json.dumps(payload) - headers = {"Host": HOST, "Content-Type": "application/json"} + compressed = BytesIO() + with gzip.GzipFile(fileobj=compressed, mode='w') as f: + f.write(params.encode('utf-8')) + params = compressed.getvalue() + + headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"} request = AWSRequest( method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers ) @@ -779,7 +470,7 @@ def es_request(payload, path, method): if __name__ == "__main__": - print(get_sondes({"queryStringParameters":{"lat":"-32.7933","lon":"151.8358","distance":"5000", "last":"604800"}}, {})) + #print(get_sondes({"queryStringParameters":{"lat":"-32.7933","lon":"151.8358","distance":"5000", "last":"604800"}}, {})) # mode: 6hours # type: positions # format: json @@ -798,11 +489,11 @@ if __name__ == "__main__": # {}, # ) # ) - # print( - # get_listeners( - # {},{} - # ) - # ) + print( + get_listeners( + {},{} + ) + ) # print ( # get_chase( # {"queryStringParameters": { diff --git a/recovered/lambda_function.py b/recovered/lambda_function.py index 951555b..172c5fb 100644 --- a/recovered/lambda_function.py +++ b/recovered/lambda_function.py @@ -9,6 +9,8 @@ import zlib import base64 import datetime import os +from io import BytesIO +import gzip HOST = os.getenv("ES") @@ -18,7 +20,13 @@ def es_request(payload, path, method): session = boto3.Session() params = json.dumps(payload) - headers = {"Host": HOST, "Content-Type": "application/json"} + compressed = BytesIO() + with gzip.GzipFile(fileobj=compressed, mode='w') as f: + f.write(params.encode('utf-8')) + params = compressed.getvalue() + + headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"} + request = AWSRequest( method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers ) diff --git a/sqs-to-elk/lambda_function.py b/sqs-to-elk/lambda_function.py index 3ea2ca8..45756b8 100644 --- a/sqs-to-elk/lambda_function.py +++ b/sqs-to-elk/lambda_function.py @@ -5,14 +5,19 @@ from botocore.auth import SigV4Auth import boto3 import botocore.credentials import os - +from io import BytesIO +import gzip +null = None HOST = os.getenv("ES") def es_request(payload, path, method): # get aws creds session = boto3.Session() - - headers = {"Host": HOST, "Content-Type": "application/json"} + compressed = BytesIO() + with gzip.GzipFile(fileobj=compressed, mode='w') as f: + f.write(payload.encode('utf-8')) + payload = compressed.getvalue() + headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"} request = AWSRequest( method="POST", url=f"https://{HOST}/{path}", data=payload, headers=headers ) @@ -20,6 +25,8 @@ def es_request(payload, path, method): session = URLLib3Session() r = session.send(request.prepare()) + if r.status_code != 200: + raise RuntimeError return json.loads(r.text) @@ -52,4 +59,4 @@ def lambda_handler(event, context): if error_types: print(event) print(result) - raise RuntimeError + raise RuntimeError \ No newline at end of file diff --git a/sqs_to_s3.tf b/sqs_to_s3.tf deleted file mode 100644 index 153018e..0000000 --- a/sqs_to_s3.tf +++ /dev/null @@ -1,143 +0,0 @@ -data "archive_file" "sqs_to_s3" { - type = "zip" - source_dir = "sonde-to-s3/" - output_path = "${path.module}/build/sonde-to-s3.zip" -} - -resource "aws_iam_role" "sqs_to_s3" { - path = "/service-role/" - name = "sqs_to_s3" - assume_role_policy = <