diff --git a/lambda/sonde_api_to_iot_core/__init__.py b/lambda/sonde_api_to_iot_core/__init__.py index e12ef19..21150bd 100644 --- a/lambda/sonde_api_to_iot_core/__init__.py +++ b/lambda/sonde_api_to_iot_core/__init__.py @@ -20,6 +20,12 @@ from io import BytesIO logs = boto3.client('logs') sequenceToken = None +def set_connection_header(request, operation_name, **kwargs): + request.headers['Connection'] = 'keep-alive' + +sns = boto3.client("sns",region_name="us-east-1") +sns.meta.events.register('request-created.sns', set_connection_header) + def handle_error(message, event, stream_name): global sequenceToken print(message) @@ -296,11 +302,24 @@ def telemetry_filter(telemetry): if telemetry["software_name"] == "radiosonde_auto_rx": if parse_autorx_version(telemetry["software_version"]) < (1,5,9): return (False,f"Autorx version is out of date and doesn't handle iMet-1 and iMet-4 radiosondes correctly. Please update to 1.5.9 or later") + if "M10" in telemetry["type"]: + if telemetry["software_name"] == "dxlAPRS-SHUE": + if parse_dxlaprs_shue_version(telemetry["software_version"]) < (1,1,2): + return (False, f"dxlAPRS-SHUE versions below 1.1.2 due not send correct serial numbers for M10 radiosondes. Please update to 1.1.2 or later") if "DFM" in telemetry["type"]: if telemetry["software_name"] == "SondeMonitor": if parse_sondemonitor_version(telemetry["software_version"]) < (6,2,8,7): return (False,f"SondeMonitor version is out of date and doesn't handle DFM radiosondes correctly. Please update to 6.2.8.7 or later") - + if telemetry["software_name"] == "rdzTTGOsonde": + ttgo_branch, ttgo_version = parse_rdz_ttgo_version(telemetry["software_version"]) + if ttgo_branch == "devel": + if ttgo_version < (20230427,0,0): + return (False,f"rdzTTGOsonde version is out of date and doesn't handle DFM radiosondes correctly. Please update to master 0.9.3, devel20230427 or later") + elif ttgo_branch == "master": + if ttgo_version < (0,9,3): + return (False,f"rdzTTGOsonde version is out of date and doesn't handle DFM radiosondes correctly. Please update to master 0.9.3, devel20230427 or later") + else: + return (False,f"rdzTTGOsonde branch and version was unable to be determined. We are unsure if this version handles DFM sondes correctly. Please update to master 0.9.3, devel20230427 or later") # block callsigns if telemetry["uploader_callsign"] in ["M00ON-5", "LEKUKU", "BS144", "Carlo-12", "GAB1", "FEJ-5", "KR001"]: return (False, "Something is wrong with the data your station is uploading, please contact us so we can resolve what is going on. support@sondehub.org") @@ -324,11 +343,22 @@ def parse_autorx_version(version): except: return (0,0,0) -def set_connection_header(request, operation_name, **kwargs): - request.headers['Connection'] = 'keep-alive' +def parse_rdz_ttgo_version(version): + try: + # RDZ TTGO has two branches, master and devel, however there are also a bunch of other custom versions + # devel20230829, master_v0.9.3, master_v0.9.2, devel20230427, devLZ20230812, devel20230829.NE, Alex_ver_2.7_M, multich_v3 + # in the cases that don't match develxxxx or master_vxxxx format we'll give a 0,0,0 version here + m = re.search(r'([a-zA-Z_]+?)_?v?(\d+)(?:\.(\d+))?(?:\.(\d+))?', version) + return (m.groups()[0],tuple([int(x if x != None else 0) for x in m.groups()[1:]])) + except: + return ("unknown", (0,0,0)) -sns = boto3.client("sns",region_name="us-east-1") -sns.meta.events.register('request-created.sns', set_connection_header) +def parse_dxlaprs_shue_version(version): + try: + m = re.search(r'(\d+)\.(\d+)(?:\.(\d+))?', version) + return tuple([int(x if x != None else 0) for x in m.groups()]) + except: + return (0,0,0) def post(payload): compressed = BytesIO() diff --git a/lambda/sonde_api_to_iot_core/__main__.py b/lambda/sonde_api_to_iot_core/__main__.py index bb576af..b96bbd7 100644 --- a/lambda/sonde_api_to_iot_core/__main__.py +++ b/lambda/sonde_api_to_iot_core/__main__.py @@ -1,10 +1,22 @@ from . import * + + +import unittest +from unittest.mock import MagicMock + +# mock out context +class fakeContext: + def __init__(self): + self.log_stream_name = str(uuid.uuid4()) + import json import base64 import gzip import uuid -body = [{ - "dev":True, +import datetime +import copy + +example_body = [{ "software_name": "SondeHubUploader", "software_version": "1.0.0", "uploader_callsign": "a", @@ -28,53 +40,148 @@ body = [{ "rssi": 70.9 }] -compressed = BytesIO() -with gzip.GzipFile(fileobj=compressed, mode='w') as f: - f.write(json.dumps(body).encode('utf-8')) -compressed.seek(0) -bbody = base64.b64encode(compressed.read()).decode("utf-8") -payload = compressed.getvalue() -payload = { - "version": "2.0", - "routeKey": "PUT /sondes/telemetry", - "rawPath": "/sondes/telemetry", - "rawQueryString": "", - "headers": { - "accept": "*/*", - "accept-encoding": "gzip, deflate", - "content-encoding": "gzip", - "content-length": "2135", - "content-type": "application/json", - "host": "api.v2.sondehub.org", - "user-agent": "autorx-1.4.1-beta4", - "x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a", - "x-forwarded-for": "103.107.130.22", - "x-forwarded-port": "443", - "x-forwarded-proto": "https", - "date": "Sun, 31 Jan 2021 00:21:45 GMT", - }, - "requestContext": { - "accountId": "143841941773", - "apiId": "r03szwwq41", - "domainName": "api.v2.sondehub.org", - "domainPrefix": "api", - "http": { - "method": "PUT", - "path": "/sondes/telemetry", - "protocol": "HTTP/1.1", - "sourceIp": "103.107.130.22", - "userAgent": "autorx-1.4.1-beta4", - }, - "requestId": "Z_NJvh0RoAMEJaw=", +def compress_payload(payload): + compressed = BytesIO() + with gzip.GzipFile(fileobj=compressed, mode='w') as f: + f.write(json.dumps(payload).encode('utf-8')) + compressed.seek(0) + bbody = base64.b64encode(compressed.read()).decode("utf-8") + output = { + "version": "2.0", "routeKey": "PUT /sondes/telemetry", - "stage": "$default", - "time": "31/Jan/2021:00:10:25 +0000", - "timeEpoch": 1612051825409, - }, - "body": bbody, - "isBase64Encoded": True, -} -class fakeContext: - def __init__(self): - self.log_stream_name = str(uuid.uuid4()) -print(lambda_handler(payload, fakeContext())) + "rawPath": "/sondes/telemetry", + "rawQueryString": "", + "headers": { + "accept": "*/*", + "accept-encoding": "gzip, deflate", + "content-encoding": "gzip", + "content-length": "2135", + "content-type": "application/json", + "host": "api.v2.sondehub.org", + "user-agent": "autorx-1.4.1-beta4", + "x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a", + "x-forwarded-for": "103.107.130.22", + "x-forwarded-port": "443", + "x-forwarded-proto": "https", + "date": "Sun, 31 Jan 2021 00:21:45 GMT", + }, + "requestContext": { + "accountId": "143841941773", + "apiId": "r03szwwq41", + "domainName": "api.v2.sondehub.org", + "domainPrefix": "api", + "http": { + "method": "PUT", + "path": "/sondes/telemetry", + "protocol": "HTTP/1.1", + "sourceIp": "103.107.130.22", + "userAgent": "autorx-1.4.1-beta4", + }, + "requestId": "Z_NJvh0RoAMEJaw=", + "routeKey": "PUT /sondes/telemetry", + "stage": "$default", + "time": "31/Jan/2021:00:10:25 +0000", + "timeEpoch": 1612051825409, + }, + "body": bbody, + "isBase64Encoded": True, + } + return output + +logs.put_log_events = MagicMock(return_value={'nextSequenceToken':1}) +logs.create_log_stream = MagicMock(return_value={'nextSequenceToken':1}) + +class TestIngestion(unittest.TestCase): + def setUp(self): + sns.publish = MagicMock() # we reset the mock for every time so we can assert correctly if its called - this won't work when doing parallel testing + def test_report_time_too_late(self): + payload = copy.deepcopy(example_body) + output = lambda_handler(compress_payload(payload), fakeContext()) + sns.publish.assert_not_called() + body_decode = json.loads(output["body"]) + self.assertEqual(body_decode["message"], "some or all payloads could not be processed") + self.assertEqual(body_decode["errors"][0]["error_message"],"Sonde reported time too far from current UTC time. Either sonde time or system time is invalid. (Threshold: 48 hours)") + def test_good_payload(self): + payload = copy.deepcopy(example_body) + payload[0]["datetime"] = datetime.datetime.now().isoformat() + output = lambda_handler(compress_payload(payload), fakeContext()) + sns.publish.assert_called() + self.assertEqual(output["body"], "^v^ telm logged") + self.assertEqual(output["statusCode"], 200) + + def test_good_ttgo_devel_payload(self): + payload = copy.deepcopy(example_body) + payload[0]["datetime"] = datetime.datetime.now().isoformat() + payload[0]["software_name"] = "rdzTTGOsonde" + payload[0]["software_version"] = "devel20230829" + payload[0]["type"] = "DFM" + output = lambda_handler(compress_payload(payload), fakeContext()) + sns.publish.assert_called() + self.assertEqual(output["body"], "^v^ telm logged") + self.assertEqual(output["statusCode"], 200) + + def test_good_ttgo_master_payload(self): + payload = copy.deepcopy(example_body) + payload[0]["datetime"] = datetime.datetime.now().isoformat() + payload[0]["software_name"] = "rdzTTGOsonde" + payload[0]["software_version"] = "master_v0.9.3" + payload[0]["type"] = "DFM" + output = lambda_handler(compress_payload(payload), fakeContext()) + sns.publish.assert_called() + self.assertEqual(output["body"], "^v^ telm logged") + self.assertEqual(output["statusCode"], 200) + def test_bad_ttgo_devel_payload(self): + payload = copy.deepcopy(example_body) + payload[0]["datetime"] = datetime.datetime.now().isoformat() + payload[0]["software_name"] = "rdzTTGOsonde" + payload[0]["software_version"] = "devel20230104" + payload[0]["type"] = "DFM" + output = lambda_handler(compress_payload(payload), fakeContext()) + sns.publish.assert_not_called() + body_decode = json.loads(output["body"]) + self.assertEqual(body_decode["message"], "some or all payloads could not be processed") + + def test_bad_ttgo_master_payload(self): + payload = copy.deepcopy(example_body) + payload[0]["datetime"] = datetime.datetime.now().isoformat() + payload[0]["software_name"] = "rdzTTGOsonde" + payload[0]["software_version"] = "master_v0.9.2" + payload[0]["type"] = "DFM" + output = lambda_handler(compress_payload(payload), fakeContext()) + sns.publish.assert_not_called() + body_decode = json.loads(output["body"]) + self.assertEqual(body_decode["message"], "some or all payloads could not be processed") + def test_weird_ttgo_branch_payload(self): + payload = copy.deepcopy(example_body) + payload[0]["datetime"] = datetime.datetime.now().isoformat() + payload[0]["software_name"] = "rdzTTGOsonde" + payload[0]["software_version"] = "multich_v3" + payload[0]["type"] = "DFM" + output = lambda_handler(compress_payload(payload), fakeContext()) + sns.publish.assert_not_called() + body_decode = json.loads(output["body"]) + self.assertEqual(body_decode["message"], "some or all payloads could not be processed") + def test_good_dxlaprsshue_payload(self): + payload = copy.deepcopy(example_body) + payload[0]["datetime"] = datetime.datetime.now().isoformat() + payload[0]["software_name"] = "dxlAPRS-SHUE" + payload[0]["software_version"] = "1.1.2" + payload[0]["type"] = "M10" + output = lambda_handler(compress_payload(payload), fakeContext()) + sns.publish.assert_called() + self.assertEqual(output["body"], "^v^ telm logged") + self.assertEqual(output["statusCode"], 200) + def test_bad_dxlaprsshue_payload(self): + payload = copy.deepcopy(example_body) + payload[0]["datetime"] = datetime.datetime.now().isoformat() + payload[0]["software_name"] = "dxlAPRS-SHUE" + payload[0]["software_version"] = "1.0.2" + payload[0]["type"] = "M10" + output = lambda_handler(compress_payload(payload), fakeContext()) + sns.publish.assert_not_called() + body_decode = json.loads(output["body"]) + self.assertEqual(body_decode["message"], "some or all payloads could not be processed") + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file