add recovery reports over mqtt

This commit is contained in:
xssfox 2025-01-29 13:30:38 +11:00
parent 632d54679b
commit 1a0df9402f
5 changed files with 225 additions and 61 deletions

View File

@ -6,6 +6,14 @@ from datetime import datetime, timedelta
import es
import boto3
import botocore.exceptions
import time
from email.utils import parsedate
import sys
sys.path.append("sns_to_mqtt/vendor")
import config_handler
import random
def sondeExists(serial):
query = {
@ -104,8 +112,62 @@ def getRecovered(serial):
results = es.request(json.dumps(query), "recovered*/_search", "POST")
return results["aggregations"]["1"]["hits"]["hits"]
client = None
connected_flag = False
setup = False
import socket
socket.setdefaulttimeout(1)
## MQTT functions
def connect():
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
#client.tls_set()
client.username_pw_set(config_handler.get("MQTT","USERNAME"), password=config_handler.get("MQTT","PASSWORD"))
HOSTS = config_handler.get("MQTT","HOST").split(",")
PORT = int(config_handler.get("MQTT","PORT", default="8080"))
if PORT == 443:
client.tls_set()
HOST = random.choice(HOSTS)
print(f"Connecting to {HOST}")
client.connect(HOST, PORT, 5)
client.loop_start()
print("loop started")
def on_disconnect(client, userdata, rc):
global connected_flag
print("disconnected")
connected_flag=False #set flag
def on_connect(client, userdata, flags, rc):
global connected_flag
if rc==0:
print("connected")
connected_flag=True #set flag
else:
print("Bad connection Returned code")
def on_publish(client, userdata, mid):
pass
def put(event, context):
global client, setup
import paho.mqtt.client as mqtt
# Setup MQTT
if not client:
client = mqtt.Client(transport="websockets")
# Connect to MQTT
if not setup:
connect()
setup = True
if "isBase64Encoded" in event and event["isBase64Encoded"] == True:
event["body"] = base64.b64decode(event["body"])
if (
@ -136,9 +198,22 @@ def put(event, context):
return {"statusCode": 400, "body": json.dumps({"message": "serial not found in db"})}
recovered['position'] = [float(recovered['lon']), float(recovered['lat'])]
while not connected_flag:
time.sleep(0.01) # wait until connected
client.publish(
topic=f'recovery/{recovered["serial"]}',
payload=json.dumps(recovered),
qos=0,
retain=False
)
result = es.request(json.dumps(recovered), "recovered/_doc", "POST")
time.sleep(0.3) # give paho mqtt 300ms to send messages this could be improved on but paho mqtt is a pain to interface with
# add in elasticsearch extra position field
return {"statusCode": 200, "body": json.dumps({"message": "Recovery logged. Have a good day ^_^"})}

View File

@ -1,62 +1,130 @@
from . import *
import recovered
import logging
import json
from datetime import datetime
import time
import unittest
from unittest.mock import MagicMock, call, patch
payload = {
"version": "2.0",
"routeKey": "PUT /recovered",
"rawPath": "/recovered",
"rawQueryString": "",
"queryStringParameters": {
# "datetime": "2021-12-20T00:00",
# "duration": 1000000
# "lat": "-32.7933",
# "lon": "151.835",
# "distance": "30000000"
"serial": "S5031499"
},
"headers": {
"accept": "*/*",
"accept-encoding": "deflate",
"content-encoding": "",
"content-length": "2135",
"content-type": "application/json",
"host": "api.v2.sondehub.org",
"user-agent": "autorx-1.4.1-beta4",
"x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a",
"x-forwarded-for": "103.107.130.22",
"x-forwarded-port": "443",
"x-forwarded-proto": "https",
"date": "Sun, 31 Jan 2021 00:21:45 GMT",
},
"requestContext": {
"accountId": "143841941773",
"apiId": "r03szwwq41",
"domainName": "api.v2.sondehub.org",
"domainPrefix": "api",
"http": {
"method": "PUT",
"path": "/sondes/telemetry",
"protocol": "HTTP/1.1",
"sourceIp": "103.107.130.22",
"userAgent": "autorx-1.4.1-beta4",
# Mock OpenSearch requests
def mock_es_request(body, path, method):
if path == "telm-*/_search":
return {
"aggregations":{
"1": {
"hits": {
"hits": [1]
}
}
}
}
return {}
# if path.endswith("_bulk"): # handle when the upload happens
# return {}
# elif(path == "flight-doc/_search"): # handle flightdoc queries
# return mock_values.flight_docs
# elif(path == "ham-telm-*/_search"): # handle telm searches
# return mock_values.ham_telm
# else:
# raise NotImplemented
logging.basicConfig(
format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG
)
class TestRecovered(unittest.TestCase):
def setUp(self):
recovered.es.request = MagicMock(side_effect=mock_es_request)
recovered.client = MagicMock()
recovered.client.connect = MagicMock()
recovered.client.loop_start = MagicMock()
recovered.client.username_pw_set = MagicMock()
recovered.client.tls_set = MagicMock()
recovered.client.publish = MagicMock()
def mock_config(a,b,default=""):
if b=="PORT":
return 1234
else:
return "test"
recovered.config_handler.get = mock_config
recovered.on_connect(recovered.client, "userdata", "flags", 0)
def tearDown(self): # reset some of the globals that get updated
recovered.setup = False
recovered.connected_flag = False
@patch("time.sleep")
def test_recovered(self, MockSleep):
r_payload = {
"datetime": "2021-06-06T01:10:07.629Z",
"serial": "S00000000",
"lat": 0,
"lon": 0,
"alt": 0,
"recovered": True,
"recovered_by": "string",
"description": "string"
}
recovered.put({
"version": "2.0",
"routeKey": "PUT /recovered",
"rawPath": "/recovered",
"rawQueryString": "",
"queryStringParameters": {
},
"headers": {
"accept": "*/*",
"accept-encoding": "deflate",
"content-encoding": "",
"content-length": "2135",
"content-type": "application/json",
"host": "api.v2.sondehub.org",
"user-agent": "autorx-1.4.1-beta4",
"x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a",
"x-forwarded-for": "103.107.130.22",
"x-forwarded-port": "443",
"x-forwarded-proto": "https",
"date": "Sun, 31 Jan 2021 00:21:45 GMT",
},
"requestContext": {
"accountId": "143841941773",
"apiId": "r03szwwq41",
"domainName": "api.v2.sondehub.org",
"domainPrefix": "api",
"http": {
"method": "PUT",
"path": "/sondes/telemetry",
"protocol": "HTTP/1.1",
"sourceIp": "103.107.130.22",
"userAgent": "autorx-1.4.1-beta4",
},
"requestId": "Z_NJvh0RoAMEJaw=",
"routeKey": "PUT /sondes/telemetry",
"stage": "$default",
"time": "31/Jan/2021:00:10:25 +0000",
"timeEpoch": 1612051825409,
},
"requestId": "Z_NJvh0RoAMEJaw=",
"routeKey": "PUT /sondes/telemetry",
"stage": "$default",
"time": "31/Jan/2021:00:10:25 +0000",
"timeEpoch": 1612051825409,
},
"body": json.dumps({
# "datetime": "2021-06-06T01:10:07.629Z",
# "serial": "S4631407",
# "lat": 0,
# "lon": 0,
# "alt": 0,
# "recovered": True,
# "recovered_by": "string",
# "description": "string"
}),
"body": json.dumps(r_payload),
"isBase64Encoded": False,
}
# print(put(payload, {}))
#print(get(payload, {}))
print(stats(payload, {}))
},{})
recovered.client.username_pw_set.assert_called()
recovered.client.loop_start.assert_called()
recovered.client.connect.assert_called()
recovered.client.publish.assert_called()
c_payload = r_payload
c_payload["position"] = [
float(r_payload['lat']),float(r_payload['lon'])
]
recovered.client.publish.assert_has_calls([
call(
topic="recovery/S00000000",
payload=json.dumps(r_payload),
qos=0,
retain=False
)
])
time.sleep.assert_called_with(0.3) # make sure we sleep to let paho mqtt queue clear
if __name__ == '__main__':
unittest.main()

View File

@ -69,7 +69,13 @@ def lambda_handler(event, context):
if "uploader_position" in payload and None == payload["uploader_position"] or None in payload["uploader_position"]:
payload.pop("uploader_position", None)
if (
"software_name" in payload and
payload["software_name"] == "SDRangel" and
"software_version" in payload and
payload["software_version"] == "7.22.5"
):
return {"statusCode": 403, "body": "This version is blocked"}
if "uploader_position" in payload:
(payload["uploader_alt"], payload["uploader_position_elk"]) = (
payload["uploader_position"][2],

View File

@ -64,6 +64,19 @@ class TestStation(unittest.TestCase):
mocked_print.assert_called()
json.loads(mocked_print.call_args.args[0])
@patch('builtins.print')
def test_blocked_software(self, mocked_print):
blocked_software_payload = copy.deepcopy(payload)
body = json.loads(blocked_software_payload["body"])
body["software_name"] = "SDRangel"
body["software_version"] = "7.22.5"
blocked_software_payload["body"] = json.dumps(body)
station_api_to_iot_core.lambda_handler(blocked_software_payload,{})
station_api_to_iot_core.es.request.assert_not_called()
station_api_to_iot_core.sns.publish.assert_not_called()
mocked_print.assert_called()
json.loads(mocked_print.call_args.args[0])
@patch('builtins.print')
def test_call(self, mocked_print):
blocked_call_payload = copy.deepcopy(payload)

View File

@ -103,7 +103,9 @@ resource "aws_lambda_function" "recovered_put" {
reserved_concurrent_executions = 100
environment {
variables = {
"ES" = aws_route53_record.es.fqdn
"ES" = aws_route53_record.es.fqdn
MQTT_HOST = "ws.v2.sondehub.org"
MQTT_PORT = "443"
}
}
tags = {