From 81b85fb05e1b5b2715f6197e3394d27536daaff8 Mon Sep 17 00:00:00 2001 From: Michaela Date: Mon, 24 May 2021 20:44:29 +1000 Subject: [PATCH] Batch up payloads in sns messages to reduce SQS and SNS spend --- sonde-api-to-iot-core/lambda_function.py | 124 ++++++++++------------- sonde-to-s3/lambda_function.py | 42 ++++---- sqs-to-elk/lambda_function.py | 17 ++-- 3 files changed, 87 insertions(+), 96 deletions(-) diff --git a/sonde-api-to-iot-core/lambda_function.py b/sonde-api-to-iot-core/lambda_function.py index 97b02fa..ab68970 100644 --- a/sonde-api-to-iot-core/lambda_function.py +++ b/sonde-api-to-iot-core/lambda_function.py @@ -14,19 +14,7 @@ import threading from email.utils import parsedate import os import asyncio -import aioboto3 -import asyncio -from functools import wraps, partial - -def async_wrap(func): - @wraps(func) - async def run(*args, loop=None, executor=None, **kwargs): - if loop is None: - loop = asyncio.get_event_loop() - pfunc = partial(func, *args, **kwargs) - return await loop.run_in_executor(executor, pfunc) - return run # this needs a bunch of refactor but the general approach is # connect to mqtt via websockets during init @@ -48,9 +36,6 @@ event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) -#io.init_logging(io.LogLevel.Error, "stderr") - - def connect(): global connect_future, mqtt_connection session = boto3.session.Session() @@ -75,7 +60,6 @@ connect() sns = boto3.client("sns",region_name="us-east-1") sns.meta.events.register('request-created.dynamodb', set_connection_header) -@async_wrap def post(payload): sns.publish( TopicArn=os.getenv("SNS_TOPIC"), @@ -83,68 +67,68 @@ def post(payload): ) async def upload(event, context): - global connect_future, mqtt_connection - - tasks = [] - # Future.result() waits until a result is available + global connect_future, mqtt_connection + + tasks = [] + # Future.result() waits until a result is available + try: + connect_future.result() + except: + connect() + + if "isBase64Encoded" in event and event["isBase64Encoded"] == True: + event["body"] = base64.b64decode(event["body"]) + if ( + "content-encoding" in event["headers"] + and event["headers"]["content-encoding"] == "gzip" + ): + event["body"] = zlib.decompress(event["body"], 16 + zlib.MAX_WBITS) + time_delta = None + if "date" in event["headers"]: try: - connect_future.result() + time_delta_header = event["headers"]["date"] + time_delta = ( + datetime.datetime(*parsedate(time_delta_header)[:7]) + - datetime.datetime.utcnow() + ).total_seconds() except: + pass + payloads = json.loads(event["body"]) + to_sns = [] + first = False + for payload in payloads: + if "user-agent" in event["headers"]: + event["time_server"] = datetime.datetime.now().isoformat() + payload["user-agent"] = event["headers"]["user-agent"] + payload["position"] = f'{payload["lat"]},{payload["lon"]}' + if time_delta: + payload["upload_time_delta"] = time_delta + if "uploader_position" in payload: + if not payload["uploader_position"]: + payload.pop("uploader_position") + else: + (payload["uploader_alt"], payload["uploader_position"]) = ( + payload["uploader_position"][2], + f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}", + ) + to_sns.append(payload) + (msg, x) = mqtt_connection.publish( + topic=f'sondes/{payload["serial"]}', + payload=json.dumps(payload), + qos=mqtt.QoS.AT_MOST_ONCE, + ) + try: + msg.result() + except (RuntimeError, AwsCrtError): connect() - - if "isBase64Encoded" in event and event["isBase64Encoded"] == True: - event["body"] = base64.b64decode(event["body"]) - if ( - "content-encoding" in event["headers"] - and event["headers"]["content-encoding"] == "gzip" - ): - event["body"] = zlib.decompress(event["body"], 16 + zlib.MAX_WBITS) - time_delta = None - if "date" in event["headers"]: - try: - time_delta_header = event["headers"]["date"] - time_delta = ( - datetime.datetime(*parsedate(time_delta_header)[:7]) - - datetime.datetime.utcnow() - ).total_seconds() - except: - pass - payloads = json.loads(event["body"]) - - first = False - for payload in payloads: - if "user-agent" in event["headers"]: - event["time_server"] = datetime.datetime.now().isoformat() - payload["user-agent"] = event["headers"]["user-agent"] - payload["position"] = f'{payload["lat"]},{payload["lon"]}' - if time_delta: - payload["upload_time_delta"] = time_delta - if "uploader_position" in payload: - if not payload["uploader_position"]: - payload.pop("uploader_position") - else: - (payload["uploader_alt"], payload["uploader_position"]) = ( - payload["uploader_position"][2], - f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}", - ) - tasks.append(post(payload)) (msg, x) = mqtt_connection.publish( topic=f'sondes/{payload["serial"]}', payload=json.dumps(payload), qos=mqtt.QoS.AT_MOST_ONCE, ) - try: - msg.result() - except (RuntimeError, AwsCrtError): - connect() - (msg, x) = mqtt_connection.publish( - topic=f'sondes/{payload["serial"]}', - payload=json.dumps(payload), - qos=mqtt.QoS.AT_MOST_ONCE, - ) - msg.result() - - await asyncio.gather(*tasks) + msg.result() + + post(to_sns) def lambda_handler(event, context): asyncio.run(upload(event, context)) return {"statusCode": 200, "body": "^v^ telm logged"} diff --git a/sonde-to-s3/lambda_function.py b/sonde-to-s3/lambda_function.py index b442732..7d2e17e 100644 --- a/sonde-to-s3/lambda_function.py +++ b/sonde-to-s3/lambda_function.py @@ -45,25 +45,29 @@ async def upload(event, context): payloads = {} for record in event['Records']: sns_message = json.loads(record["body"]) - payload = json.loads(sns_message["Message"]) + if type(json.loads(sns_message["Message"])) == dict: + incoming_payloads = [json.loads(sns_message["Message"])] + else: + incoming_payloads = json.loads(sns_message["Message"]) + for payload in incoming_payloads: + + body = json.dumps(payload) + id = str(uuid.uuid4()) + hash = hashlib.sha256(values_to_hash(payload).encode("utf-8")).hexdigest() - body = json.dumps(payload) - id = str(uuid.uuid4()) - hash = hashlib.sha256(values_to_hash(payload).encode("utf-8")).hexdigest() - - filenames = [ - f"date/{payload['datetime']}-{payload['serial']}-{id}.json", - f"serial/{payload['serial']}/{payload['datetime']}-{id}.json", - f"serial-hashed/{payload['serial']}/{payload['datetime']}-{hash}.json" - ] - - for filename in filenames: - tasks.append(s3.put_object( - ACL="public-read", - Bucket=BUCKET, - Body=body, - Key=filename - )) + filenames = [ + f"date/{payload['datetime']}-{payload['serial']}-{id}.json", + f"serial/{payload['serial']}/{payload['datetime']}-{id}.json", + f"serial-hashed/{payload['serial']}/{payload['datetime']}-{hash}.json" + ] + + for filename in filenames: + tasks.append(s3.put_object( + ACL="public-read", + Bucket=BUCKET, + Body=body, + Key=filename + )) await asyncio.gather(*tasks) @@ -79,7 +83,7 @@ if __name__ == "__main__": { "messageId": "262d4090-e23b-4907-b677-3c94334dc899", "receiptHandle": "AQEBL1FXHS4m+Om59KZH9ayxC5VBqDEDh6DgXUZuBhV2uQJS312bhOTpLvptuCCIWaeLkfHU+7NajqV2kTVhnz5lehE/zfQ8OU1jqqm+cHxyul99MxA7K7+C+ww2Ri9KSbgaAgqvZzcLbwpW8rP0MNhrBcIQAE5Pz1urfTZKx1RVnv/XQHbR2ARPwocOzk2yEexa0y2f7FedS4F10gju8Ypp0Zr4DSRb1zUkES3QJGiSJakaO1QJT5npRySjAd0CUSPXw7IDTejolfGkItQG5eMRx0enELTUDv8LPsHJkr7ha3DHNfbvxTtdk406nWFn8U8DW515emp7+Y+AD469OnceIMdVC62GHwrpMkedXzLEH0C8TOXHQ+WuRkhR1dauwKqO", - "body": "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"65147554-e06d-5324-a87d-2da107fea807\",\n \"TopicArn\" : \"arn:aws:sns:us-east-1:143841941773:sonde-telem\",\n \"Message\" : \"{\\\"software_name\\\":\\\"radiosonde_auto_rx\\\",\\\"software_version\\\":\\\"1.5.1\\\",\\\"uploader_callsign\\\":\\\"BIOWL1\\\",\\\"uploader_position\\\":\\\"52.014417,8.47351\\\",\\\"uploader_antenna\\\":\\\"SirioCX395\\\",\\\"time_received\\\":\\\"2021-04-18T07:52:37.196266Z\\\",\\\"datetime\\\":\\\"2021-04-18T07:52:53.001000Z\\\",\\\"manufacturer\\\":\\\"Vaisala\\\",\\\"type\\\":\\\"RS41\\\",\\\"serial\\\":\\\"meowmeowtest\\\",\\\"subtype\\\":\\\"RS41-SGP\\\",\\\"frame\\\":12781,\\\"lat\\\":50.65064,\\\"lon\\\":6.60805,\\\"alt\\\":2954.44289,\\\"temp\\\":-9.3,\\\"humidity\\\":75.4,\\\"pressure\\\":709.79,\\\"vel_v\\\":-2.85326,\\\"vel_h\\\":8.53055,\\\"heading\\\":236.0122,\\\"sats\\\":9,\\\"batt\\\":2.7,\\\"frequency\\\":405.3,\\\"burst_timer\\\":25423,\\\"snr\\\":12.5,\\\"user-agent\\\":\\\"Amazon CloudFront\\\",\\\"position\\\":\\\"50.65064,6.60805\\\",\\\"upload_time_delta\\\":-0.713689,\\\"uploader_alt\\\":340}\",\n \"Timestamp\" : \"2021-04-18T07:52:51.776Z\",\n \"SignatureVersion\" : \"1\",\n \"Signature\" : \"qXuYwDAGPYYLjKXfDtF69AWKDEhhz9MXlqxO2nBwJ/dgOqNSUZtDPqOYSuge3jVCoTSRY5qGw38gg2G+JnEbJd8SVvp9GRsFre8MKWu8T0obq3rj8S0YAh7dTqi4EILIMmi2KziasCDQlrVuZvCSgPnC+hYF3GByI626QW6m3a4E2igclvbE+O6x6qvVDKwmf/eh+8LRiH1PCrEckiXthnr+qOCiTcstyZoOqMOShJBun9k0DK07+Yf1tYDPSHnqZSIaOvAMSjIKKXfGCkel3SWieO7Zgk7xQuo9Z1bcV8Miu4uEvge4G9HKU3S41zaVcQjYvEhQLxxgd1x3HxXImA==\",\n \"SigningCertURL\" : \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService-010a507c1833636cd94bdb98bd93083a.pem\",\n \"UnsubscribeURL\" : \"https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:143841941773:sonde-telem:1a52ac41-6e17-43da-bfb6-114577c94ca6\"\n}", + "body": "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"65147554-e06d-5324-a87d-2da107fea807\",\n \"TopicArn\" : \"arn:aws:sns:us-east-1:143841941773:sonde-telem\",\n \"Message\" : \"[{\\\"software_name\\\":\\\"radiosonde_auto_rx\\\",\\\"software_version\\\":\\\"1.5.1\\\",\\\"uploader_callsign\\\":\\\"BIOWL1\\\",\\\"uploader_position\\\":\\\"52.014417,8.47351\\\",\\\"uploader_antenna\\\":\\\"SirioCX395\\\",\\\"time_received\\\":\\\"2021-04-18T07:52:37.196266Z\\\",\\\"datetime\\\":\\\"2021-04-18T07:52:53.001000Z\\\",\\\"manufacturer\\\":\\\"Vaisala\\\",\\\"type\\\":\\\"RS41\\\",\\\"serial\\\":\\\"meowmeowtest\\\",\\\"subtype\\\":\\\"RS41-SGP\\\",\\\"frame\\\":12781,\\\"lat\\\":50.65064,\\\"lon\\\":6.60805,\\\"alt\\\":2954.44289,\\\"temp\\\":-9.3,\\\"humidity\\\":75.4,\\\"pressure\\\":709.79,\\\"vel_v\\\":-2.85326,\\\"vel_h\\\":8.53055,\\\"heading\\\":236.0122,\\\"sats\\\":9,\\\"batt\\\":2.7,\\\"frequency\\\":405.3,\\\"burst_timer\\\":25423,\\\"snr\\\":12.5,\\\"user-agent\\\":\\\"Amazon CloudFront\\\",\\\"position\\\":\\\"50.65064,6.60805\\\",\\\"upload_time_delta\\\":-0.713689,\\\"uploader_alt\\\":340}]\",\n \"Timestamp\" : \"2021-04-18T07:52:51.776Z\",\n \"SignatureVersion\" : \"1\",\n \"Signature\" : \"qXuYwDAGPYYLjKXfDtF69AWKDEhhz9MXlqxO2nBwJ/dgOqNSUZtDPqOYSuge3jVCoTSRY5qGw38gg2G+JnEbJd8SVvp9GRsFre8MKWu8T0obq3rj8S0YAh7dTqi4EILIMmi2KziasCDQlrVuZvCSgPnC+hYF3GByI626QW6m3a4E2igclvbE+O6x6qvVDKwmf/eh+8LRiH1PCrEckiXthnr+qOCiTcstyZoOqMOShJBun9k0DK07+Yf1tYDPSHnqZSIaOvAMSjIKKXfGCkel3SWieO7Zgk7xQuo9Z1bcV8Miu4uEvge4G9HKU3S41zaVcQjYvEhQLxxgd1x3HxXImA==\",\n \"SigningCertURL\" : \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService-010a507c1833636cd94bdb98bd93083a.pem\",\n \"UnsubscribeURL\" : \"https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:143841941773:sonde-telem:1a52ac41-6e17-43da-bfb6-114577c94ca6\"\n}", "attributes": { "ApproximateReceiveCount": "2", "SentTimestamp": "1618732371814", diff --git a/sqs-to-elk/lambda_function.py b/sqs-to-elk/lambda_function.py index 46d10e6..3ea2ca8 100644 --- a/sqs-to-elk/lambda_function.py +++ b/sqs-to-elk/lambda_function.py @@ -27,13 +27,17 @@ def lambda_handler(event, context): payloads = {} for record in event['Records']: sns_message = json.loads(record["body"]) - payload = json.loads(sns_message["Message"]) - index = payload['datetime'][:7] - - if index not in payloads: # create index if not exists - payloads[index] = [] + if type(json.loads(sns_message["Message"])) == dict: + incoming_payloads = [json.loads(sns_message["Message"])] + else: + incoming_payloads = json.loads(sns_message["Message"]) + for payload in incoming_payloads: + index = payload['datetime'][:7] - payloads[index].append(payload) + if index not in payloads: # create index if not exists + payloads[index] = [] + + payloads[index].append(payload) for index in payloads: body="" @@ -49,4 +53,3 @@ def lambda_handler(event, context): print(event) print(result) raise RuntimeError - \ No newline at end of file