diff --git a/sonde-api-to-iot-core/lambda_function.py b/sonde-api-to-iot-core/lambda_function.py index 829efcd..40bf559 100644 --- a/sonde-api-to-iot-core/lambda_function.py +++ b/sonde-api-to-iot-core/lambda_function.py @@ -1,27 +1,14 @@ import sys -sys.path.append("vendor/lib/python3.9/site-packages") import json import boto3 import zlib import base64 import datetime import functools -from awscrt import io, mqtt, auth, http -from awscrt.exceptions import AwsCrtError -from awsiot import mqtt_connection_builder -import uuid -import threading from email.utils import parsedate import os -import asyncio -# this needs a bunch of refactor but the general approach is -# connect to mqtt via websockets during init -# if we detect that we are disconnected then reconnect -# this is to make the lambda function nice and quick when during -# peak load - # todo # we should add some value checking # we typically perform version banning here based on user agent @@ -32,33 +19,8 @@ import asyncio def set_connection_header(request, operation_name, **kwargs): request.headers['Connection'] = 'keep-alive' -event_loop_group = io.EventLoopGroup(1) -host_resolver = io.DefaultHostResolver(event_loop_group) - - -def connect(): - global connect_future, mqtt_connection - session = boto3.session.Session() - client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) - credentials_provider = auth.AwsCredentialsProvider.new_default_chain( - client_bootstrap - ) - mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing( - endpoint=os.getenv("IOT_ENDPOINT"), - client_bootstrap=client_bootstrap, - region="us-east-1", - credentials_provider=credentials_provider, - client_id=str(uuid.uuid4()), - clean_session=False, - keep_alive_secs=6, - ) - connect_future = mqtt_connection.connect() - connect_future.result() - - -connect() sns = boto3.client("sns",region_name="us-east-1") -sns.meta.events.register('request-created.dynamodb', set_connection_header) +sns.meta.events.register('request-created.sns', set_connection_header) def post(payload): sns.publish( @@ -66,16 +28,7 @@ def post(payload): Message=json.dumps(payload) ) -async def upload(event, context): - global connect_future, mqtt_connection - - tasks = [] - # Future.result() waits until a result is available - try: - connect_future.result() - except: - connect() - +def upload(event, context): if "isBase64Encoded" in event and event["isBase64Encoded"] == True: event["body"] = base64.b64decode(event["body"]) if ( @@ -118,25 +71,9 @@ async def upload(event, context): f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}", ) to_sns.append(payload) - (msg, x) = mqtt_connection.publish( - topic=f'sondes/{payload["serial"]}', - payload=json.dumps(payload), - qos=mqtt.QoS.AT_MOST_ONCE, - ) - try: - msg.result() - except (RuntimeError, AwsCrtError): - connect() - (msg, x) = mqtt_connection.publish( - topic=f'sondes/{payload["serial"]}', - payload=json.dumps(payload), - qos=mqtt.QoS.AT_MOST_ONCE, - ) - msg.result() - post(to_sns) def lambda_handler(event, context): - asyncio.run(upload(event, context)) + upload(event, context) return {"statusCode": 200, "body": "^v^ telm logged"} if __name__ == "__main__": diff --git a/swagger.yaml b/swagger.yaml index 3d70af9..12ce52c 100644 --- a/swagger.yaml +++ b/swagger.yaml @@ -252,7 +252,64 @@ paths: items: type: object - + /recovered: + put: + summary: Adds a recovery object to the SondeHub database to indicate if a radiosonde was recovered + consumes: + - "application/json" + produces: + - "text/plain" + parameters: + - in: header + name: Date + description: , :: UTC as per RFC7231. This is used to calculate receiver time offset for correcting clients that have the incorrect time. + required: true + type: string + format: date-time + - in: header + name: User-Agent + type: string + description: "The software and version performing the telemetry upload, eg: `autorx-1.4.1-beta5`" + - in: body + required: true + name: body + schema: + $ref: "#/definitions/recovery_object" + + responses: + 200: + description: Recovery logged + 500: + description: Other Server error (including malformed data submissions) + get: + summary: Request Recovery Data + description: > + Use this to get the recovery data + produces: + - "application/json" + parameters: + - in: query + name: lat + type: number + description: "Latitude - if specified, lon and distance are required. Eg: -34.9285" + - in: query + name: lon + description: "Longitude - if specified, lat and distance are required Eg: 138.6007" + type: number + - in: query + name: distance + description: "Distance in meters - if specified, lat and lon are required" + type: number + - in: query + name: last + description: "How far back to search in seconds. Defaults to 14 days" + type: number + + responses: + 200: + description: Returns a list of recovery objects + schema: + $ref: "#/definitions/recovery_results_format" parameters: input_payloads: in: body @@ -262,7 +319,50 @@ parameters: description: SondeHub telemetry format items: $ref: "#/definitions/telemetry_format" + recovery_object: + in: body + required: true + name: body + schema: + description: SondeHub recovery format + items: + $ref: "#/definitions/recovery_object" + definitions: + recovery_object: + type: object + required: + - serial + - lat + - lon + - alt + - recovered_by + - recovered + properties: + serial: + description: Serial number of the radiosonde + type: string + lat: + description: Latitude (decimal degrees) of the recovery location + type: "number" + format: "double" + lon: + description: Longitude (decimal degrees) of the recovery location + type: "number" + format: "double" + alt: + description: Altitude (metres) of the recovery location + type: "number" + format: "double" + recovered: + description: was this recovery attempt was successful + type: boolean + recovered_by: + description: callsign or name of the person who recovered the sonde + type: string + description: + description: Description of the recovery effort + type: string sonde_query_results_format: type: object properties: @@ -271,6 +371,10 @@ definitions: properties: datetime: $ref: "#/definitions/telemetry_format" + recovery_results_format: + type: array + items: + $ref: "#/definitions/recovery_object" telemetry_format: description: SondeHub telemetry format type: "object"