mirror of
https://github.com/projecthorus/sondehub-infra.git
synced 2025-01-10 23:12:37 +00:00
408 lines
14 KiB
Python
408 lines
14 KiB
Python
import json
|
|
import boto3
|
|
import zlib
|
|
import base64
|
|
import datetime
|
|
from email.utils import parsedate
|
|
import os
|
|
import config_handler
|
|
|
|
TOPIC = config_handler.get("HAM_SNS","TOPIC")
|
|
|
|
HELIUM_GW_VERSION = "2024.11.10"
|
|
|
|
# Mappings between input (Helium) field names, and field names fed into SondeHub-Amateur
|
|
FIELD_MAPPINGS = [
|
|
['lat', 'lat'],
|
|
['lon', 'lon'],
|
|
['alt', 'alt'],
|
|
['latitude', 'lat'],
|
|
['longitude', 'lon'],
|
|
['altitude', 'alt'],
|
|
['sats', 'sats'],
|
|
['battery', 'batt'],
|
|
['batt', 'batt'],
|
|
['speed', 'speed'],
|
|
['heading', 'heading'],
|
|
['temp', 'temp'],
|
|
['temperature', 'temp'],
|
|
['ext_temperature', 'ext_temperature'],
|
|
['ext_pressure','ext_pressure'],
|
|
['pressure', 'ext_pressure'],
|
|
['ext_humidity','ext_humidity'],
|
|
['accel_x', 'accel_x'],
|
|
['accel_y', 'accel_y'],
|
|
['accel_z', 'accel_z'],
|
|
['gyro_x', 'gyro_x'],
|
|
['gyro_y', 'gyro_y'],
|
|
['gyro_z', 'gyro_z'],
|
|
['illuminance', 'illuminance']
|
|
]
|
|
|
|
|
|
def set_connection_header(request, operation_name, **kwargs):
|
|
request.headers['Connection'] = 'keep-alive'
|
|
|
|
sns = boto3.client("sns",region_name="us-east-1")
|
|
sns.meta.events.register('request-created.sns', set_connection_header)
|
|
|
|
|
|
def post(payload):
|
|
sns.publish(
|
|
TopicArn=TOPIC,
|
|
Message=json.dumps(payload)
|
|
)
|
|
|
|
|
|
def upload_helium(event, context):
|
|
if "isBase64Encoded" in event and event["isBase64Encoded"] == True:
|
|
event["body"] = base64.b64decode(event["body"])
|
|
if (
|
|
"content-encoding" in event["headers"]
|
|
and event["headers"]["content-encoding"] == "gzip"
|
|
):
|
|
event["body"] = zlib.decompress(event["body"], 16 + zlib.MAX_WBITS)
|
|
|
|
payloads = json.loads(event["body"])
|
|
to_sns = []
|
|
errors = []
|
|
warnings = []
|
|
|
|
|
|
# If only have one object, turn it into a single-entry list.
|
|
if type(payloads) == dict:
|
|
payloads = [payloads]
|
|
|
|
# Iterate over list:
|
|
for payload in payloads:
|
|
|
|
# Use the presence of the 'reported_at' field to determine if this data is from either
|
|
# Chirpstack, or the legacy Helium IoT.
|
|
if "reported_at" in payload:
|
|
chirpstack = False
|
|
else:
|
|
chirpstack = True
|
|
|
|
if chirpstack:
|
|
try:
|
|
telem = {
|
|
'software_name': 'SondeHub-Amateur Helium Gateway',
|
|
'software_version': HELIUM_GW_VERSION
|
|
}
|
|
|
|
#
|
|
# Extract mandatory fields.
|
|
#
|
|
# Device Name -> Payload Callsign
|
|
telem['payload_callsign'] = payload["deviceInfo"]["deviceName"]
|
|
|
|
# Time
|
|
telem['datetime'] = payload["time"].split("+")[0] + "Z"
|
|
|
|
# Positional and other data
|
|
telem_data = payload["object"]["decoded"]
|
|
|
|
# Work through all accepted field names and map them
|
|
# into the output structure.
|
|
for _field in FIELD_MAPPINGS:
|
|
_input = _field[0]
|
|
_output = _field[1]
|
|
|
|
if _input in telem_data:
|
|
telem[_output] = telem_data[_input]
|
|
|
|
# Position field, required by OpenSearch
|
|
# If lat/lon are not in the telemetry, then this will error
|
|
telem["position"] = f'{telem["lat"]},{telem["lon"]}'
|
|
|
|
# We also need altitude as a minimum
|
|
if 'alt' not in telem:
|
|
raise IOError("No altitude field")
|
|
|
|
# Extract raw payload data, base64
|
|
telem["raw"] = payload["data"]
|
|
|
|
except Exception as e:
|
|
errors.append({
|
|
"error_message": f"Error parsing telemetry data - {str(e)}",
|
|
"payload": payload
|
|
})
|
|
continue
|
|
|
|
# Now iterate through the receiving stations
|
|
_frequency = payload['txInfo']['frequency']/1e6
|
|
_bw = int(payload['txInfo']['modulation']['lora']['bandwidth']/1000)
|
|
_sf = payload['txInfo']['modulation']['lora']['spreadingFactor']
|
|
_cr = payload['txInfo']['modulation']['lora']['codeRate'][3:].replace('_','')
|
|
_modulation = f"Helium (SF{_sf}BW{_bw}CR{_cr})"
|
|
for hotspot in payload['rxInfo']:
|
|
try:
|
|
hotspot_telem = telem.copy()
|
|
|
|
hotspot_telem['uploader_callsign'] = hotspot["metadata"]["gateway_name"]
|
|
hotspot_telem['modulation'] = _modulation
|
|
hotspot_telem['snr'] = hotspot['snr']
|
|
hotspot_telem['rssi'] = hotspot['rssi']
|
|
hotspot_telem['frequency'] = _frequency
|
|
hotspot_telem['time_received'] = hotspot["gwTime"].split("+")[0] + "Z"
|
|
|
|
try:
|
|
hotspot_telem['uploader_position'] = f'{hotspot["metadata"]["gateway_lat"]},{hotspot["metadata"]["gateway_long"]}'
|
|
hotspot_telem['uploader_alt'] = 0
|
|
except:
|
|
pass
|
|
|
|
|
|
to_sns.append(hotspot_telem)
|
|
|
|
except Exception as e:
|
|
errors.append({
|
|
"error_message": f"Error parsing hotspot data - {str(e)}",
|
|
"payload": payload
|
|
})
|
|
continue
|
|
|
|
|
|
else:
|
|
try:
|
|
telem = {
|
|
'software_name': 'SondeHub-Amateur Helium Gateway',
|
|
'software_version': HELIUM_GW_VERSION
|
|
}
|
|
|
|
#
|
|
# Extract mandatory fields.
|
|
#
|
|
# Name -> Payload Callsign
|
|
telem['payload_callsign'] = payload['name']
|
|
|
|
# Time
|
|
telem['datetime'] = datetime.datetime.utcfromtimestamp(payload["reported_at"]/1000.0).isoformat() + "Z"
|
|
|
|
# Positional and other data
|
|
telem_data = payload["decoded"]["payload"]
|
|
|
|
# Work through all accepted field names and map them
|
|
# into the output structure.
|
|
for _field in FIELD_MAPPINGS:
|
|
_input = _field[0]
|
|
_output = _field[1]
|
|
|
|
if _input in telem_data:
|
|
telem[_output] = telem_data[_input]
|
|
|
|
# Position field, required by OpenSearch
|
|
# If lat/lon are not in the telemetry, then this will error
|
|
telem["position"] = f'{telem["lat"]},{telem["lon"]}'
|
|
|
|
# We also need altitude as a minimum
|
|
if 'alt' not in telem:
|
|
raise IOError("No altitude field")
|
|
|
|
# Extract raw payload data, base64
|
|
telem["raw"] = payload["payload"]
|
|
|
|
except Exception as e:
|
|
errors.append({
|
|
"error_message": f"Error parsing telemetry data - {str(e)}",
|
|
"payload": payload
|
|
})
|
|
continue
|
|
|
|
# Now iterate through the receiving stations
|
|
for hotspot in payload['hotspots']:
|
|
try:
|
|
hotspot_telem = telem.copy()
|
|
|
|
hotspot_telem['uploader_callsign'] = hotspot['name']
|
|
hotspot_telem['modulation'] = f"Helium ({hotspot['spreading']})"
|
|
hotspot_telem['snr'] = hotspot['snr']
|
|
hotspot_telem['rssi'] = hotspot['rssi']
|
|
hotspot_telem['frequency'] = hotspot['frequency']
|
|
hotspot_telem['time_received'] = datetime.datetime.utcfromtimestamp(hotspot["reported_at"]/1000.0).isoformat() + "Z"
|
|
|
|
try:
|
|
hotspot_telem['uploader_position'] = f'{hotspot["lat"]},{hotspot["long"]}'
|
|
hotspot_telem['uploader_alt'] = 0
|
|
except:
|
|
pass
|
|
|
|
|
|
to_sns.append(hotspot_telem)
|
|
|
|
except Exception as e:
|
|
errors.append({
|
|
"error_message": f"Error parsing hotspot data - {str(e)}",
|
|
"payload": payload
|
|
})
|
|
continue
|
|
|
|
#import pprint
|
|
#pprint.pprint(to_sns)
|
|
#post(to_sns)
|
|
return errors, warnings
|
|
|
|
|
|
def upload_ttn(event, context):
|
|
if "isBase64Encoded" in event and event["isBase64Encoded"] == True:
|
|
event["body"] = base64.b64decode(event["body"])
|
|
if (
|
|
"content-encoding" in event["headers"]
|
|
and event["headers"]["content-encoding"] == "gzip"
|
|
):
|
|
event["body"] = zlib.decompress(event["body"], 16 + zlib.MAX_WBITS)
|
|
|
|
payloads = json.loads(event["body"])
|
|
to_sns = []
|
|
errors = []
|
|
warnings = []
|
|
|
|
|
|
# If only have one object, turn it into a single-entry list.
|
|
if type(payloads) == dict:
|
|
payloads = [payloads]
|
|
|
|
# Iterate over list:
|
|
for payload in payloads:
|
|
|
|
try:
|
|
telem = {
|
|
'software_name': 'SondeHub-Amateur TTN Gateway',
|
|
'software_version': HELIUM_GW_VERSION
|
|
}
|
|
|
|
#
|
|
# Extract mandatory fields.
|
|
#
|
|
# Name -> Payload Callsign
|
|
telem['payload_callsign'] = payload['end_device_ids']['application_ids']['application_id']
|
|
|
|
# Time
|
|
telem['datetime'] = payload['received_at']
|
|
|
|
# Positional and other data
|
|
telem_data = payload["uplink_message"]["decoded_payload"]
|
|
|
|
# Work through all accepted field names and map them
|
|
# into the output structure.
|
|
for _field in FIELD_MAPPINGS:
|
|
_input = _field[0]
|
|
_output = _field[1]
|
|
|
|
if _input in telem_data:
|
|
telem[_output] = telem_data[_input]
|
|
|
|
# Position field, required by OpenSearch
|
|
# If lat/lon are not in the telemetry, then this will error
|
|
telem["position"] = f'{telem["lat"]},{telem["lon"]}'
|
|
|
|
# We also need altitude as a minimum
|
|
if 'alt' not in telem:
|
|
raise IOError("No altitude field")
|
|
|
|
# Extract raw payload data, base64
|
|
telem["raw"] = payload["uplink_message"]["frm_payload"]
|
|
|
|
except Exception as e:
|
|
errors.append({
|
|
"error_message": f"Error parsing telemetry data - {str(e)}",
|
|
"payload": payload
|
|
})
|
|
continue
|
|
|
|
# Now iterate through the receiving stations
|
|
for hotspot in payload['uplink_message']['rx_metadata']:
|
|
try:
|
|
hotspot_telem = telem.copy()
|
|
|
|
hotspot_telem['uploader_callsign'] = hotspot['gateway_ids']['gateway_id']
|
|
|
|
# Handle telemetry arriving via a packetbroker, and try and get the real gateway ID
|
|
if hotspot_telem['uploader_callsign'] == "packetbroker":
|
|
try:
|
|
hotspot_telem['uploader_callsign'] = hotspot['packet_broker']['forwarder_gateway_id']
|
|
except:
|
|
pass
|
|
|
|
# Frequency and modulation metadata is common to all packets
|
|
# Frequency is in Hz
|
|
hotspot_telem['frequency'] = float(payload['uplink_message']['settings']['frequency'])/1e6
|
|
|
|
# Construct the lora modulation details.
|
|
_bw = int( int(payload['uplink_message']['settings']['data_rate']['lora']['bandwidth']) / 1000)
|
|
_sf = int(payload['uplink_message']['settings']['data_rate']['lora']['spreading_factor'])
|
|
_cr = payload['uplink_message']['settings']['data_rate']['lora']['coding_rate'].replace('/','')
|
|
hotspot_telem['modulation'] = f"TTN (SF{_sf}BW{_bw}CR{_cr})"
|
|
|
|
# SNR and RSSI is unique to each receiver
|
|
hotspot_telem['snr'] = hotspot['snr']
|
|
hotspot_telem['rssi'] = hotspot['rssi']
|
|
# There is also a channel_rssi field that we could include...
|
|
|
|
# Can't seem to trust the timestamp in the per-receiver metadata
|
|
# Example input has some very wrong timestamps in it.
|
|
hotspot_telem['time_received'] = payload['received_at']
|
|
|
|
try:
|
|
hotspot_telem['uploader_position'] = f'{hotspot["location"]["latitude"]},{hotspot["location"]["longitude"]}'
|
|
if 'altitude' in hotspot["location"]:
|
|
hotspot_telem['uploader_alt'] = hotspot["location"]["altitude"]
|
|
else:
|
|
hotspot_telem['uploader_alt'] = 0
|
|
except:
|
|
pass
|
|
|
|
|
|
to_sns.append(hotspot_telem)
|
|
|
|
except Exception as e:
|
|
errors.append({
|
|
"error_message": f"Error parsing hotspot data - {str(e)}",
|
|
"payload": payload
|
|
})
|
|
continue
|
|
|
|
#print(to_sns)
|
|
post(to_sns)
|
|
return errors, warnings
|
|
|
|
|
|
def lambda_handler(event, context, ttn_source=False):
|
|
try:
|
|
if ttn_source:
|
|
errors, warnings = upload_ttn(event, context)
|
|
else:
|
|
errors, warnings = upload_helium(event, context)
|
|
except zlib.error:
|
|
return {"statusCode": 400, "body": "Could not decompress"}
|
|
except json.decoder.JSONDecodeError:
|
|
return {"statusCode": 400, "body": "Not valid json"}
|
|
error_message = {
|
|
"message": "some or all payloads could not be processed or have warnings",
|
|
"errors": errors,
|
|
"warnings": warnings
|
|
}
|
|
if errors or warnings:
|
|
output = {
|
|
"statusCode": 202,
|
|
"body": json.dumps(error_message),
|
|
"headers": {
|
|
"content-type": "application/json"
|
|
}
|
|
}
|
|
print({
|
|
"statusCode": 202,
|
|
"body": error_message,
|
|
"headers": {
|
|
"content-type": "application/json"
|
|
}
|
|
})
|
|
return output
|
|
else:
|
|
return {"statusCode": 200, "body": "^v^ telm logged"}
|
|
|
|
def lambda_handler_helium(event, context):
|
|
return lambda_handler(event, context)
|
|
|
|
def lambda_handler_ttn(event, context):
|
|
return lambda_handler(event, context, ttn_source=True) |