Add tests for ingestion, add blocks for M10 and DFM issues. Closes #121,#109

This commit is contained in:
xss 2023-10-18 10:11:43 +11:00
parent dc4c184835
commit 3e849d2606
2 changed files with 193 additions and 56 deletions

View File

@ -20,6 +20,12 @@ from io import BytesIO
logs = boto3.client('logs') logs = boto3.client('logs')
sequenceToken = None sequenceToken = None
def set_connection_header(request, operation_name, **kwargs):
request.headers['Connection'] = 'keep-alive'
sns = boto3.client("sns",region_name="us-east-1")
sns.meta.events.register('request-created.sns', set_connection_header)
def handle_error(message, event, stream_name): def handle_error(message, event, stream_name):
global sequenceToken global sequenceToken
print(message) print(message)
@ -296,11 +302,24 @@ def telemetry_filter(telemetry):
if telemetry["software_name"] == "radiosonde_auto_rx": if telemetry["software_name"] == "radiosonde_auto_rx":
if parse_autorx_version(telemetry["software_version"]) < (1,5,9): if parse_autorx_version(telemetry["software_version"]) < (1,5,9):
return (False,f"Autorx version is out of date and doesn't handle iMet-1 and iMet-4 radiosondes correctly. Please update to 1.5.9 or later") return (False,f"Autorx version is out of date and doesn't handle iMet-1 and iMet-4 radiosondes correctly. Please update to 1.5.9 or later")
if "M10" in telemetry["type"]:
if telemetry["software_name"] == "dxlAPRS-SHUE":
if parse_dxlaprs_shue_version(telemetry["software_version"]) < (1,1,2):
return (False, f"dxlAPRS-SHUE versions below 1.1.2 due not send correct serial numbers for M10 radiosondes. Please update to 1.1.2 or later")
if "DFM" in telemetry["type"]: if "DFM" in telemetry["type"]:
if telemetry["software_name"] == "SondeMonitor": if telemetry["software_name"] == "SondeMonitor":
if parse_sondemonitor_version(telemetry["software_version"]) < (6,2,8,7): if parse_sondemonitor_version(telemetry["software_version"]) < (6,2,8,7):
return (False,f"SondeMonitor version is out of date and doesn't handle DFM radiosondes correctly. Please update to 6.2.8.7 or later") return (False,f"SondeMonitor version is out of date and doesn't handle DFM radiosondes correctly. Please update to 6.2.8.7 or later")
if telemetry["software_name"] == "rdzTTGOsonde":
ttgo_branch, ttgo_version = parse_rdz_ttgo_version(telemetry["software_version"])
if ttgo_branch == "devel":
if ttgo_version < (20230427,0,0):
return (False,f"rdzTTGOsonde version is out of date and doesn't handle DFM radiosondes correctly. Please update to master 0.9.3, devel20230427 or later")
elif ttgo_branch == "master":
if ttgo_version < (0,9,3):
return (False,f"rdzTTGOsonde version is out of date and doesn't handle DFM radiosondes correctly. Please update to master 0.9.3, devel20230427 or later")
else:
return (False,f"rdzTTGOsonde branch and version was unable to be determined. We are unsure if this version handles DFM sondes correctly. Please update to master 0.9.3, devel20230427 or later")
# block callsigns # block callsigns
if telemetry["uploader_callsign"] in ["M00ON-5", "LEKUKU", "BS144", "Carlo-12", "GAB1", "FEJ-5", "KR001"]: if telemetry["uploader_callsign"] in ["M00ON-5", "LEKUKU", "BS144", "Carlo-12", "GAB1", "FEJ-5", "KR001"]:
return (False, "Something is wrong with the data your station is uploading, please contact us so we can resolve what is going on. support@sondehub.org") return (False, "Something is wrong with the data your station is uploading, please contact us so we can resolve what is going on. support@sondehub.org")
@ -324,11 +343,22 @@ def parse_autorx_version(version):
except: except:
return (0,0,0) return (0,0,0)
def set_connection_header(request, operation_name, **kwargs): def parse_rdz_ttgo_version(version):
request.headers['Connection'] = 'keep-alive' try:
# RDZ TTGO has two branches, master and devel, however there are also a bunch of other custom versions
# devel20230829, master_v0.9.3, master_v0.9.2, devel20230427, devLZ20230812, devel20230829.NE, Alex_ver_2.7_M, multich_v3
# in the cases that don't match develxxxx or master_vxxxx format we'll give a 0,0,0 version here
m = re.search(r'([a-zA-Z_]+?)_?v?(\d+)(?:\.(\d+))?(?:\.(\d+))?', version)
return (m.groups()[0],tuple([int(x if x != None else 0) for x in m.groups()[1:]]))
except:
return ("unknown", (0,0,0))
sns = boto3.client("sns",region_name="us-east-1") def parse_dxlaprs_shue_version(version):
sns.meta.events.register('request-created.sns', set_connection_header) try:
m = re.search(r'(\d+)\.(\d+)(?:\.(\d+))?', version)
return tuple([int(x if x != None else 0) for x in m.groups()])
except:
return (0,0,0)
def post(payload): def post(payload):
compressed = BytesIO() compressed = BytesIO()

View File

@ -1,10 +1,22 @@
from . import * from . import *
import unittest
from unittest.mock import MagicMock
# mock out context
class fakeContext:
def __init__(self):
self.log_stream_name = str(uuid.uuid4())
import json import json
import base64 import base64
import gzip import gzip
import uuid import uuid
body = [{ import datetime
"dev":True, import copy
example_body = [{
"software_name": "SondeHubUploader", "software_name": "SondeHubUploader",
"software_version": "1.0.0", "software_version": "1.0.0",
"uploader_callsign": "a", "uploader_callsign": "a",
@ -28,53 +40,148 @@ body = [{
"rssi": 70.9 "rssi": 70.9
}] }]
compressed = BytesIO() def compress_payload(payload):
with gzip.GzipFile(fileobj=compressed, mode='w') as f: compressed = BytesIO()
f.write(json.dumps(body).encode('utf-8')) with gzip.GzipFile(fileobj=compressed, mode='w') as f:
compressed.seek(0) f.write(json.dumps(payload).encode('utf-8'))
bbody = base64.b64encode(compressed.read()).decode("utf-8") compressed.seek(0)
payload = compressed.getvalue() bbody = base64.b64encode(compressed.read()).decode("utf-8")
payload = { output = {
"version": "2.0", "version": "2.0",
"routeKey": "PUT /sondes/telemetry",
"rawPath": "/sondes/telemetry",
"rawQueryString": "",
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate",
"content-encoding": "gzip",
"content-length": "2135",
"content-type": "application/json",
"host": "api.v2.sondehub.org",
"user-agent": "autorx-1.4.1-beta4",
"x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a",
"x-forwarded-for": "103.107.130.22",
"x-forwarded-port": "443",
"x-forwarded-proto": "https",
"date": "Sun, 31 Jan 2021 00:21:45 GMT",
},
"requestContext": {
"accountId": "143841941773",
"apiId": "r03szwwq41",
"domainName": "api.v2.sondehub.org",
"domainPrefix": "api",
"http": {
"method": "PUT",
"path": "/sondes/telemetry",
"protocol": "HTTP/1.1",
"sourceIp": "103.107.130.22",
"userAgent": "autorx-1.4.1-beta4",
},
"requestId": "Z_NJvh0RoAMEJaw=",
"routeKey": "PUT /sondes/telemetry", "routeKey": "PUT /sondes/telemetry",
"stage": "$default", "rawPath": "/sondes/telemetry",
"time": "31/Jan/2021:00:10:25 +0000", "rawQueryString": "",
"timeEpoch": 1612051825409, "headers": {
}, "accept": "*/*",
"body": bbody, "accept-encoding": "gzip, deflate",
"isBase64Encoded": True, "content-encoding": "gzip",
} "content-length": "2135",
class fakeContext: "content-type": "application/json",
def __init__(self): "host": "api.v2.sondehub.org",
self.log_stream_name = str(uuid.uuid4()) "user-agent": "autorx-1.4.1-beta4",
print(lambda_handler(payload, fakeContext())) "x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a",
"x-forwarded-for": "103.107.130.22",
"x-forwarded-port": "443",
"x-forwarded-proto": "https",
"date": "Sun, 31 Jan 2021 00:21:45 GMT",
},
"requestContext": {
"accountId": "143841941773",
"apiId": "r03szwwq41",
"domainName": "api.v2.sondehub.org",
"domainPrefix": "api",
"http": {
"method": "PUT",
"path": "/sondes/telemetry",
"protocol": "HTTP/1.1",
"sourceIp": "103.107.130.22",
"userAgent": "autorx-1.4.1-beta4",
},
"requestId": "Z_NJvh0RoAMEJaw=",
"routeKey": "PUT /sondes/telemetry",
"stage": "$default",
"time": "31/Jan/2021:00:10:25 +0000",
"timeEpoch": 1612051825409,
},
"body": bbody,
"isBase64Encoded": True,
}
return output
logs.put_log_events = MagicMock(return_value={'nextSequenceToken':1})
logs.create_log_stream = MagicMock(return_value={'nextSequenceToken':1})
class TestIngestion(unittest.TestCase):
def setUp(self):
sns.publish = MagicMock() # we reset the mock for every time so we can assert correctly if its called - this won't work when doing parallel testing
def test_report_time_too_late(self):
payload = copy.deepcopy(example_body)
output = lambda_handler(compress_payload(payload), fakeContext())
sns.publish.assert_not_called()
body_decode = json.loads(output["body"])
self.assertEqual(body_decode["message"], "some or all payloads could not be processed")
self.assertEqual(body_decode["errors"][0]["error_message"],"Sonde reported time too far from current UTC time. Either sonde time or system time is invalid. (Threshold: 48 hours)")
def test_good_payload(self):
payload = copy.deepcopy(example_body)
payload[0]["datetime"] = datetime.datetime.now().isoformat()
output = lambda_handler(compress_payload(payload), fakeContext())
sns.publish.assert_called()
self.assertEqual(output["body"], "^v^ telm logged")
self.assertEqual(output["statusCode"], 200)
def test_good_ttgo_devel_payload(self):
payload = copy.deepcopy(example_body)
payload[0]["datetime"] = datetime.datetime.now().isoformat()
payload[0]["software_name"] = "rdzTTGOsonde"
payload[0]["software_version"] = "devel20230829"
payload[0]["type"] = "DFM"
output = lambda_handler(compress_payload(payload), fakeContext())
sns.publish.assert_called()
self.assertEqual(output["body"], "^v^ telm logged")
self.assertEqual(output["statusCode"], 200)
def test_good_ttgo_master_payload(self):
payload = copy.deepcopy(example_body)
payload[0]["datetime"] = datetime.datetime.now().isoformat()
payload[0]["software_name"] = "rdzTTGOsonde"
payload[0]["software_version"] = "master_v0.9.3"
payload[0]["type"] = "DFM"
output = lambda_handler(compress_payload(payload), fakeContext())
sns.publish.assert_called()
self.assertEqual(output["body"], "^v^ telm logged")
self.assertEqual(output["statusCode"], 200)
def test_bad_ttgo_devel_payload(self):
payload = copy.deepcopy(example_body)
payload[0]["datetime"] = datetime.datetime.now().isoformat()
payload[0]["software_name"] = "rdzTTGOsonde"
payload[0]["software_version"] = "devel20230104"
payload[0]["type"] = "DFM"
output = lambda_handler(compress_payload(payload), fakeContext())
sns.publish.assert_not_called()
body_decode = json.loads(output["body"])
self.assertEqual(body_decode["message"], "some or all payloads could not be processed")
def test_bad_ttgo_master_payload(self):
payload = copy.deepcopy(example_body)
payload[0]["datetime"] = datetime.datetime.now().isoformat()
payload[0]["software_name"] = "rdzTTGOsonde"
payload[0]["software_version"] = "master_v0.9.2"
payload[0]["type"] = "DFM"
output = lambda_handler(compress_payload(payload), fakeContext())
sns.publish.assert_not_called()
body_decode = json.loads(output["body"])
self.assertEqual(body_decode["message"], "some or all payloads could not be processed")
def test_weird_ttgo_branch_payload(self):
payload = copy.deepcopy(example_body)
payload[0]["datetime"] = datetime.datetime.now().isoformat()
payload[0]["software_name"] = "rdzTTGOsonde"
payload[0]["software_version"] = "multich_v3"
payload[0]["type"] = "DFM"
output = lambda_handler(compress_payload(payload), fakeContext())
sns.publish.assert_not_called()
body_decode = json.loads(output["body"])
self.assertEqual(body_decode["message"], "some or all payloads could not be processed")
def test_good_dxlaprsshue_payload(self):
payload = copy.deepcopy(example_body)
payload[0]["datetime"] = datetime.datetime.now().isoformat()
payload[0]["software_name"] = "dxlAPRS-SHUE"
payload[0]["software_version"] = "1.1.2"
payload[0]["type"] = "M10"
output = lambda_handler(compress_payload(payload), fakeContext())
sns.publish.assert_called()
self.assertEqual(output["body"], "^v^ telm logged")
self.assertEqual(output["statusCode"], 200)
def test_bad_dxlaprsshue_payload(self):
payload = copy.deepcopy(example_body)
payload[0]["datetime"] = datetime.datetime.now().isoformat()
payload[0]["software_name"] = "dxlAPRS-SHUE"
payload[0]["software_version"] = "1.0.2"
payload[0]["type"] = "M10"
output = lambda_handler(compress_payload(payload), fakeContext())
sns.publish.assert_not_called()
body_decode = json.loads(output["body"])
self.assertEqual(body_decode["message"], "some or all payloads could not be processed")
if __name__ == '__main__':
unittest.main()