sondehub-infra/sonde-api-to-iot-core/lambda_function.py

416 lines
17 KiB
Python
Raw Normal View History

2021-05-23 13:14:15 +00:00
import sys
2021-01-30 11:07:34 +00:00
import json
import boto3
import zlib
import base64
import datetime
import functools
from email.utils import parsedate
2021-02-01 06:08:59 +00:00
import os
2021-08-19 01:38:49 +00:00
import re
from math import radians, degrees, sin, cos, atan2, sqrt, pi
import statistics
from collections import defaultdict
def z_check(data, threshold):
outliers = []
mean = statistics.mean(data)
sd = statistics.stdev(data)
for index, i in enumerate(data):
try:
z = (i-mean)/sd # calculate z-score
except ZeroDivisionError:
continue
if abs(z) > threshold: # identify outliers
outliers.append(index) # add to the empty list
return outliers
2021-01-30 11:07:34 +00:00
# todo
# we should add some value checking
# we typically perform version banning here based on user agent
# error handling - at the moment we bail on a single failure
# report to the user what's happened
2021-08-19 01:38:49 +00:00
# Earthmaths code by Daniel Richman (thanks!)
# Copyright 2012 (C) Daniel Richman; GNU GPL 3
def position_info(listener, balloon):
"""
Calculate and return information from 2 (lat, lon, alt) tuples
Returns a dict with:
- angle at centre
- great circle distance
- distance in a straight line
- bearing (azimuth or initial course)
- elevation (altitude)
Input and output latitudes, longitudes, angles, bearings and elevations are
in degrees, and input altitudes and output distances are in meters.
"""
# Earth:
radius = 6371000.0
(lat1, lon1, alt1) = listener
(lat2, lon2, alt2) = balloon
lat1 = radians(lat1)
lat2 = radians(lat2)
lon1 = radians(lon1)
lon2 = radians(lon2)
# Calculate the bearing, the angle at the centre, and the great circle
# distance using Vincenty's_formulae with f = 0 (a sphere). See
# http://en.wikipedia.org/wiki/Great_circle_distance#Formulas and
# http://en.wikipedia.org/wiki/Great-circle_navigation and
# http://en.wikipedia.org/wiki/Vincenty%27s_formulae
d_lon = lon2 - lon1
sa = cos(lat2) * sin(d_lon)
sb = (cos(lat1) * sin(lat2)) - (sin(lat1) * cos(lat2) * cos(d_lon))
bearing = atan2(sa, sb)
aa = sqrt((sa ** 2) + (sb ** 2))
ab = (sin(lat1) * sin(lat2)) + (cos(lat1) * cos(lat2) * cos(d_lon))
angle_at_centre = atan2(aa, ab)
great_circle_distance = angle_at_centre * radius
# Armed with the angle at the centre, calculating the remaining items
# is a simple 2D triangley circley problem:
# Use the triangle with sides (r + alt1), (r + alt2), distance in a
# straight line. The angle between (r + alt1) and (r + alt2) is the
# angle at the centre. The angle between distance in a straight line and
# (r + alt1) is the elevation plus pi/2.
# Use sum of angle in a triangle to express the third angle in terms
# of the other two. Use sine rule on sides (r + alt1) and (r + alt2),
# expand with compound angle formulae and solve for tan elevation by
# dividing both sides by cos elevation
ta = radius + alt1
tb = radius + alt2
ea = (cos(angle_at_centre) * tb) - ta
eb = sin(angle_at_centre) * tb
elevation = atan2(ea, eb)
# Use cosine rule to find unknown side.
distance = sqrt((ta ** 2) + (tb ** 2) - 2 * tb * ta * cos(angle_at_centre))
# Give a bearing in range 0 <= b < 2pi
if bearing < 0:
bearing += 2 * pi
return {
"listener": listener,
"balloon": balloon,
"listener_radians": (lat1, lon1, alt1),
"balloon_radians": (lat2, lon2, alt2),
"angle_at_centre": degrees(angle_at_centre),
"angle_at_centre_radians": angle_at_centre,
"bearing": degrees(bearing),
"bearing_radians": bearing,
"great_circle_distance": great_circle_distance,
"straight_distance": distance,
"elevation": degrees(elevation),
"elevation_radians": elevation,
}
# stolen from https://github.com/projecthorus/radiosonde_auto_rx/blob/master/auto_rx/auto_rx.py#L467
def telemetry_filter(telemetry):
"""Filter incoming radiosonde telemetry based on various factors,
- Invalid Position
- Invalid Altitude
- Abnormal range from receiver.
- Invalid serial number.
- Abnormal date (more than 6 hours from utcnow)
This function is defined within this script to avoid passing around large amounts of configuration data.
"""
# First Check: zero lat/lon
if (telemetry["lat"] == 0.0) and (telemetry["lon"] == 0.0):
return (False,f"Zero Lat/Lon. Sonde { telemetry['serial']} does not have GPS lock.")
max_altitude = 50000
# Second check: Altitude cap.
if telemetry["alt"] > max_altitude:
_altitude_breach = telemetry["alt"] - max_altitude
return (False,f"Sonde {telemetry['serial']} position breached altitude cap by {_altitude_breach}m.")
2021-08-19 02:11:16 +00:00
if telemetry["alt"] == 0:
return (False,f"Sonde {telemetry['serial']} altitude is exactly 0m. Position is likely incorrect")
2021-09-03 11:09:37 +00:00
if "humidity" in telemetry and telemetry["humidity"] < 0:
return (False,f"Humidity {telemetry['humidity']} is below 0")
if "pressure" in telemetry and telemetry["pressure"] < 0:
return (False,f"Pressure {telemetry['pressure']} is below 0")
2021-08-19 01:38:49 +00:00
# Third check: Number of satellites visible.
if "sats" in telemetry:
if telemetry["sats"] < 4:
return (False, f"Sonde {telemetry['serial']} can only see {telemetry['sats']} SVs - discarding position as bad.")
max_radius = 1000
# Fourth check - is the payload more than x km from our listening station.
# Only run this check if a station location has been provided.
if 'uploader_position' in telemetry and telemetry['uploader_position'] != None and telemetry['uploader_position'][0] != None and telemetry['uploader_position'][1] != None and telemetry['uploader_position'][2] != None and float(telemetry['uploader_position'][0]) != 0 and float(telemetry['uploader_position'][1]) != 0:
# Calculate the distance from the station to the payload.
_listener = (
float(telemetry['uploader_position'][0]),
float(telemetry['uploader_position'][1]),
float(telemetry['uploader_position'][2])
)
_payload = (telemetry["lat"], telemetry["lon"], telemetry["alt"])
# Calculate using positon_info function from rotator_utils.py
_info = position_info(_listener, _payload)
if _info["straight_distance"] > max_radius * 1000:
_radius_breach = (
_info["straight_distance"] / 1000.0 - max_radius
)
return (False, f"Sonde {telemetry['serial']} position breached radius cap by {_radius_breach:.1f} km.")
# DateTime Check
2021-09-05 07:57:21 +00:00
try:
_delta_time = (
datetime.datetime.now() - datetime.datetime.fromisoformat(telemetry["datetime"].replace("Z",""))
).total_seconds()
except:
return (False, f"Unable to parse time")
2021-08-19 01:38:49 +00:00
sonde_time_threshold = 48
if abs(_delta_time) > (3600 * sonde_time_threshold):
return (False, f"Sonde reported time too far from current UTC time. Either sonde time or system time is invalid. (Threshold: {sonde_time_threshold} hours)")
# Payload Serial Number Checks
_serial = telemetry["serial"]
# Run a Regex to match known Vaisala RS92/RS41 serial numbers (YWWDxxxx)
# RS92: https://www.vaisala.com/sites/default/files/documents/Vaisala%20Radiosonde%20RS92%20Serial%20Number.pdf
# RS41: https://www.vaisala.com/sites/default/files/documents/Vaisala%20Radiosonde%20RS41%20Serial%20Number.pdf
# This will need to be re-evaluated if we're still using this code in 2021!
# UPDATE: Had some confirmation that Vaisala will continue to use the alphanumeric numbering up until
# ~2025-2030, so have expanded the regex to match (and also support some older RS92s)
# Modified 2021-06 to be more flexible and match older sondes, and reprogrammed sondes.
# Still needs a letter at the start, but the numbers don't need to match the format exactly.
vaisala_callsign_valid = re.match(r"[C-Z][\d][\d][\d]\d{4}", _serial)
# Just make sure we're not getting the 'xxxxxxxx' unknown serial from the DFM decoder.
if "DFM" in telemetry["type"]:
dfm_callsign_valid = "x" not in _serial
else:
dfm_callsign_valid = False
# Check Meisei sonde callsigns for validity.
# meisei_ims returns a callsign of IMS100-xxxxxx until it receives the serial number, so we filter based on the x's being present or not.
if "MEISEI" in telemetry["type"]:
meisei_callsign_valid = "x" not in _serial
else:
meisei_callsign_valid = False
if "MRZ" in telemetry["type"]:
mrz_callsign_valid = "x" not in _serial
else:
mrz_callsign_valid = False
# If Vaisala or DFMs, check the callsigns are valid. If M10, iMet or LMS6, just pass it through - we get callsigns immediately and reliably from these.
2021-09-03 09:46:01 +00:00
if not (
2021-08-19 01:38:49 +00:00
vaisala_callsign_valid
or dfm_callsign_valid
or meisei_callsign_valid
or mrz_callsign_valid
or ("M10" in telemetry["type"])
or ("M20" in telemetry["type"])
or ("LMS" in telemetry["type"])
or ("iMet" in telemetry["type"])
):
_id_msg = "Payload ID %s is invalid." % telemetry["serial"]
# Add in a note about DFM sondes and their oddness...
if "DFM" in telemetry["serial"]:
_id_msg += " Note: DFM sondes may take a while to get an ID."
if "MRZ" in telemetry["serial"]:
_id_msg += " Note: MRZ sondes may take a while to get an ID."
return (False, _id_msg)
2021-09-03 09:46:01 +00:00
if "dev" in telemetry:
return (False, "All checks passed however payload contained dev flag so will not be uploaded to the database")
return (True, "")
2021-08-19 01:38:49 +00:00
2021-01-30 11:07:34 +00:00
2021-05-23 13:14:15 +00:00
def set_connection_header(request, operation_name, **kwargs):
request.headers['Connection'] = 'keep-alive'
2021-01-30 11:07:34 +00:00
sns = boto3.client("sns",region_name="us-east-1")
sns.meta.events.register('request-created.sns', set_connection_header)
2021-02-01 04:10:25 +00:00
def post(payload):
sns.publish(
TopicArn=os.getenv("SNS_TOPIC"),
Message=json.dumps(payload)
)
2021-01-30 11:07:34 +00:00
def upload(event, context):
if "isBase64Encoded" in event and event["isBase64Encoded"] == True:
event["body"] = base64.b64decode(event["body"])
if (
"content-encoding" in event["headers"]
and event["headers"]["content-encoding"] == "gzip"
):
event["body"] = zlib.decompress(event["body"], 16 + zlib.MAX_WBITS)
time_delta = None
if "date" in event["headers"]:
try:
time_delta_header = event["headers"]["date"]
time_delta = (
datetime.datetime(*parsedate(time_delta_header)[:7])
2021-09-17 13:50:36 +00:00
- datetime.datetime.utcnow() # TODO we should use the request context datetime here so that timedelta works for chunked requests
).total_seconds()
except:
pass
payloads = json.loads(event["body"])
to_sns = []
first = False
2021-08-19 01:38:49 +00:00
errors = []
# perform z check
payload_serials = defaultdict(list)
for payload in payloads:
payload_serials[payload['serial']].append(payload)
# rebuild payloads with the outliers marked
payloads = []
for serial in payload_serials:
check_data = payload_serials[serial]
if len(check_data) > 10: # need at least 10 payloads to be useful
lats = [ x['lat'] for x in check_data ]
lons = [ abs(x['lon']) for x in check_data ]
alts = [ x['alt'] for x in check_data ]
lat_outliers = z_check(lats, 3)
lon_outliers = z_check(lons, 3)
alt_outliers = z_check(alts, 3)
if lat_outliers or lon_outliers or alt_outliers:
print("Outlier check detected outlier, serial:", check_data[0]["serial"])
for index in lat_outliers:
payload_serials[serial][index]["lat_outlier"] = True
for index in lon_outliers:
payload_serials[serial][index]["lon_outliers"] = True
for index in alt_outliers:
payload_serials[serial][index]["alt_outliers"] = True
#generate error messages and regenerate payload list of bad data removed
for payload in payload_serials[serial]:
if "alt_outliers" in payload or "lon_outliers" in payload or "lat_outlier" in payload:
errors.append({
"error_message": f"z-check failed - payload GPS may not be valid for {payload['serial']}.",
"payload": payload
})
else:
payloads.append(payload)
2021-09-03 11:21:11 +00:00
2021-08-23 12:04:36 +00:00
for payload in payloads:
if "user-agent" in event["headers"]:
event["time_server"] = datetime.datetime.now().isoformat()
payload["user-agent"] = event["headers"]["user-agent"]
payload["position"] = f'{payload["lat"]},{payload["lon"]}'
2021-08-19 01:38:49 +00:00
valid, error_message = telemetry_filter(payload)
if not valid:
errors.append({
"error_message": error_message,
"payload": payload
})
else:
if time_delta:
payload["upload_time_delta"] = time_delta
if "uploader_position" in payload:
if not payload["uploader_position"]:
payload.pop("uploader_position")
else:
(payload["uploader_alt"], payload["uploader_position"]) = (
payload["uploader_position"][2],
f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}",
)
to_sns.append(payload)
2021-09-03 11:21:11 +00:00
# limit payloads to 3
if to_sns:
last = to_sns.pop()
to_sns = to_sns[::5]
to_sns.append(last)
post(to_sns)
2021-08-19 01:38:49 +00:00
return errors
2021-05-23 13:14:15 +00:00
def lambda_handler(event, context):
2021-08-20 08:48:41 +00:00
try:
errors = upload(event, context)
except zlib.error:
return {"statusCode": 400, "body": "Could not decompress"}
except json.decoder.JSONDecodeError:
return {"statusCode": 400, "body": "Not valid json"}
2021-08-19 01:38:49 +00:00
error_message = {
"message": "some or all payloads could not be processed",
"errors": errors
}
if errors:
2021-08-23 12:04:36 +00:00
output = {
"statusCode": 202,
"body": json.dumps(error_message),
"headers": {
"content-type": "application/json"
}
}
print({
"statusCode": 202,
"body": error_message,
"headers": {
"content-type": "application/json"
}
})
return output
2021-08-19 01:38:49 +00:00
else:
return {"statusCode": 200, "body": "^v^ telm logged"}
2021-02-01 06:08:59 +00:00
if __name__ == "__main__":
payload = {
"version": "2.0",
"routeKey": "PUT /sondes/telemetry",
"rawPath": "/sondes/telemetry",
"rawQueryString": "",
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate",
"content-encoding": "gzip",
"content-length": "2135",
"content-type": "application/json",
"host": "api.v2.sondehub.org",
"user-agent": "autorx-1.4.1-beta4",
"x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a",
"x-forwarded-for": "103.107.130.22",
"x-forwarded-port": "443",
"x-forwarded-proto": "https",
2021-02-01 06:08:59 +00:00
"date": "Sun, 31 Jan 2021 00:21:45 GMT",
},
"requestContext": {
"accountId": "143841941773",
"apiId": "r03szwwq41",
"domainName": "api.v2.sondehub.org",
"domainPrefix": "api",
"http": {
"method": "PUT",
"path": "/sondes/telemetry",
"protocol": "HTTP/1.1",
"sourceIp": "103.107.130.22",
2021-02-01 06:08:59 +00:00
"userAgent": "autorx-1.4.1-beta4",
},
"requestId": "Z_NJvh0RoAMEJaw=",
"routeKey": "PUT /sondes/telemetry",
"stage": "$default",
"time": "31/Jan/2021:00:10:25 +0000",
2021-02-01 06:08:59 +00:00
"timeEpoch": 1612051825409,
},
2021-09-03 11:21:11 +00:00
"body": "H4sIAFsEMmEAA+XaS2/jNhAA4Pv+CiNnh+X74fOiLYr2UvTUojC0MbMRYEuuLDtdFP3vHWlFKiapmMAaEFAtkMOOKVkjfiGHZP74sFr9Az+r1cOpfm5fi8Zuq+JgHzarh6bYlfWprnZ2W5zbetv8/bAOml5scyrrqmtNkEACGnz42uR83NfFzjbbp2K/P5Wf+zYffyY//fjrI+nbXTcrqtZWVdHf6Tu+ei0udnWoq/pY76372rY82G1jn2x5sbuuJcWUPGLziNlvWG4E3giOhDBaqt/dNbuitd116eYM4f6fb34oqvNz8dSeG9t0l/zQFK/+678c+9t8/P4X/yLOn95EsfFx25TFvk9GY40Zx+6T5+br2yWMMGYIFmz4YF+0EOYKaUYUd8H+3RKMtGGaDMFi37UkUmvEqXs2ezhCUCE5BC52v71AhCLC34ReIAT9pIfQi4U+rj5DUENDd7NT0Z4gZIb/firatr+MuTu9nA/lrmy/QPSR+MTsX2dbPXVBjikS2F1/qpo+Yf9sx/pUtgMbn7BL0r2oUUafruAC4v+u/2deBeKSSZzrlc/vlae8imuvtO9KE3g1GBl+2ytLeFWRV6UQY7leb9qEb522KZxNsyybCmwaTmSmTTm/TZmyqa9tdl3JsQptSsQjmyKyyWObOLapCSL6bjYZItM29doltCCbZIMZvCcqlcixCc3p7DYlTdjUKmFTBDYVjE5c3rYpTGCTI8oimwpn2KS5NumkTEjNpbMsmfAM0NtaZ8qcf0aXqRndJGD6qtTDVMiwECaPYZIAph4JXg2avuG3w2TvTOhm7fJZlkyY4qDQNlm1Zg95dpkiJZMkaGoW0KQYMXKbJglpCkRjmhKqdHM3mgSpaZpk7RJals1uKauFzJ3P1fw2VcqmiG2KUKZAMkOmkIFMPRafbwZNjPDNVXv+bO6fIyFzWAWJZbk0SDAmNcl0qed3qVMuE3Xm2PtephrVTcqEbqGBTIFimJIgKu4I850hcyg0BV7S8pxsYB4RHLPcQhPe0dw0FY5p0gRMGg6ZjCA/jL4D04QLIIlMos5Uo+B7bBxNLs7p2qWzLJfdLglXima6JPO7JAmXOFFmChaugBgfdyG9TBbIpIiJSGZi20gBpptl5l1k4qHMhISWZZMjLnH2EojMfzykEsdDFMuETT+oeZsaaZ1hk0U2ZcImLIzU3WxOrs271Fw6y5IJi3NFlOSZMudfnKvE4pzixL6R8CWpk8kZ0tHBZSyThIUmLIFIJFOYsTj4dpnYb6wmbA4bR5DQsmxC5yiuOM60Of9BkEocBFGSqjVVZFMmNo5imyIcNRXyo/LVjK7veUg5uQrqknMJLcsmjAnKYJa5cUTm3zhSwcaR7ruPJWzq8CCIG/hFvG3ThIeUciwErv7gQ93P5vSMDh+5dN6X+ed/QRi1DYwkAAA=",
2021-02-01 06:08:59 +00:00
"isBase64Encoded": True,
}
2021-05-23 13:14:15 +00:00
print(lambda_handler(payload, {}))