From 27824549afadfe5f7a9519736479b6b613f92a5f Mon Sep 17 00:00:00 2001 From: Michaela Date: Mon, 24 May 2021 16:54:41 +1000 Subject: [PATCH] change async and remove alt requirement on listeners --- query/lambda_function.py | 33 +++++++++++------------ sonde-api-to-iot-core/lambda_function.py | 34 +++++++++++++++++------- 2 files changed, 40 insertions(+), 27 deletions(-) diff --git a/query/lambda_function.py b/query/lambda_function.py index 8a62bc9..0867eb9 100644 --- a/query/lambda_function.py +++ b/query/lambda_function.py @@ -514,7 +514,6 @@ def get_listeners(event, context): "filter": [ {"match_all": {}}, {"exists": {"field": "uploader_position_elk"},}, - {"exists": {"field": "uploader_alt"},}, {"exists": {"field": "uploader_antenna.keyword"},}, {"exists": {"field": "software_name.keyword"},}, {"exists": {"field": "software_version.keyword"},}, @@ -563,7 +562,7 @@ def get_listeners(event, context): .replace(" ", "") .split(",")[0] ), - "alt": float(listener["1"]["hits"]["hits"][0]["fields"]["uploader_alt"][0]), + "alt": float(listener["1"]["hits"]["hits"][0]["fields"]["uploader_alt"][0]) if "uploader_alt" in listener["1"]["hits"]["hits"][0]["fields"] else 0, "description": f"""\n
\n Radio: {html.escape(listener["1"]["hits"]["hits"][0]["fields"]["software_name.keyword"][0])}-{html.escape(listener["1"]["hits"]["hits"][0]["fields"]["software_version.keyword"][0])}
\n @@ -601,21 +600,21 @@ if __name__ == "__main__": # max_positions: 0 # position_id: 0 # vehicles: RS_*;*chase - print( - datanew( - { - "queryStringParameters": { - "type": "positions", - "mode": "12hours", - "position_id": "0", - "vehicles": "" - } - }, - {}, - ) - ) # print( - # get_listeners( - # {},{} + # datanew( + # { + # "queryStringParameters": { + # "type": "positions", + # "mode": "12hours", + # "position_id": "0", + # "vehicles": "" + # } + # }, + # {}, # ) # ) + print( + get_listeners( + {},{} + ) + ) diff --git a/sonde-api-to-iot-core/lambda_function.py b/sonde-api-to-iot-core/lambda_function.py index 44214a0..97b02fa 100644 --- a/sonde-api-to-iot-core/lambda_function.py +++ b/sonde-api-to-iot-core/lambda_function.py @@ -16,6 +16,18 @@ import os import asyncio import aioboto3 +import asyncio +from functools import wraps, partial + +def async_wrap(func): + @wraps(func) + async def run(*args, loop=None, executor=None, **kwargs): + if loop is None: + loop = asyncio.get_event_loop() + pfunc = partial(func, *args, **kwargs) + return await loop.run_in_executor(executor, pfunc) + return run + # this needs a bunch of refactor but the general approach is # connect to mqtt via websockets during init # if we detect that we are disconnected then reconnect @@ -36,7 +48,7 @@ event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) -io.init_logging(io.LogLevel.Error, "stderr") +#io.init_logging(io.LogLevel.Error, "stderr") def connect(): @@ -60,13 +72,19 @@ def connect(): connect() +sns = boto3.client("sns",region_name="us-east-1") +sns.meta.events.register('request-created.dynamodb', set_connection_header) - +@async_wrap +def post(payload): + sns.publish( + TopicArn=os.getenv("SNS_TOPIC"), + Message=json.dumps(payload) + ) async def upload(event, context): - global connect_future, mqtt_connection - async with aioboto3.client("sns",region_name="us-east-1") as sns: - sns.meta.events.register('request-created.dynamodb', set_connection_header) + global connect_future, mqtt_connection + tasks = [] # Future.result() waits until a result is available try: @@ -109,16 +127,12 @@ async def upload(event, context): payload["uploader_position"][2], f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}", ) - tasks.append(sns.publish( - TopicArn=os.getenv("SNS_TOPIC"), - Message=json.dumps(payload) - )) + tasks.append(post(payload)) (msg, x) = mqtt_connection.publish( topic=f'sondes/{payload["serial"]}', payload=json.dumps(payload), qos=mqtt.QoS.AT_MOST_ONCE, ) - try: msg.result() except (RuntimeError, AwsCrtError):