sondehub-infra/query/lambda_function.py

511 lines
18 KiB
Python
Raw Normal View History

2021-02-02 17:14:38 +10:00
import boto3
import botocore.credentials
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
2021-03-29 13:44:33 +11:00
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
2021-02-02 17:14:38 +10:00
import json
import os
2021-02-22 16:13:30 +10:00
from datetime import datetime, timedelta, timezone
import sys, traceback
2021-02-02 17:14:38 +10:00
2021-02-02 20:44:39 +10:00
HOST = os.getenv("ES")
2021-02-02 17:14:38 +10:00
# get current sondes, filter by date, location
2021-02-02 20:44:39 +10:00
2021-02-02 17:14:38 +10:00
def get_sondes(event, context):
path = "telm-*/_search"
payload = {
"aggs": {
"2": {
2021-02-02 20:44:39 +10:00
"terms": {
"field": "serial.keyword",
"order": {"_key": "desc"},
"size": 10000,
2021-02-02 17:14:38 +10:00
},
2021-02-02 20:44:39 +10:00
"aggs": {
"1": {
"top_hits": {
"size": 1,
"sort": [{"datetime": {"order": "desc"}}],
2021-02-02 17:14:38 +10:00
}
}
2021-02-02 20:44:39 +10:00
},
2021-02-02 17:14:38 +10:00
}
},
2021-02-02 20:44:39 +10:00
"query": {"bool": {"filter": [{"match_all": {}}]}},
2021-02-02 17:14:38 +10:00
}
# add filters
if "queryStringParameters" in event:
if "last" in event["queryStringParameters"]:
payload["query"]["bool"]["filter"].append(
{
"range": {
"datetime": {
"gte": f"now-{int(event['queryStringParameters']['last'])}s",
2021-02-02 20:44:39 +10:00
"lte": "now",
2021-02-02 17:14:38 +10:00
}
}
}
)
2021-02-02 20:44:39 +10:00
if (
"lat" in event["queryStringParameters"]
and "lon" in event["queryStringParameters"]
and "distance" in event["queryStringParameters"]
):
2021-02-02 17:14:38 +10:00
payload["query"]["bool"]["filter"].append(
{
"geo_distance": {
"distance": f"{int(event['queryStringParameters']['distance'])}m",
"position": {
2021-02-02 20:44:39 +10:00
"lat": float(event["queryStringParameters"]["lat"]),
"lon": float(event["queryStringParameters"]["lon"]),
},
2021-02-02 17:14:38 +10:00
}
}
)
# if the user doesn't specify a range we should add one - 24 hours is probably a good start
if "range" not in payload["query"]["bool"]["filter"]:
payload["query"]["bool"]["filter"].append(
2021-02-02 20:44:39 +10:00
{"range": {"datetime": {"gte": "now-1d", "lte": "now"}}}
)
results = es_request(payload, path, "POST")
buckets = results["aggregations"]["2"]["buckets"]
sondes = {
bucket["1"]["hits"]["hits"][0]["_source"]["serial"]: bucket["1"]["hits"][
"hits"
][0]["_source"]
for bucket in buckets
}
return json.dumps(sondes)
def get_telem(event, context):
durations = { # ideally we shouldn't need to predefine these, but it's a shit load of data and we don't need want to overload ES
"3d": (259200, 1200), # 3d, 20m
"1d": (86400, 600), # 1d, 10m
"6h": (21600, 60), # 6h, 1m
2021-04-01 19:47:58 +11:00
"3h": (10800, 15), # 3h, 10s
2021-02-02 20:44:39 +10:00
}
duration_query = "3h"
2021-02-15 15:23:51 +10:00
requested_time = datetime.now()
2021-02-02 20:44:39 +10:00
if (
"queryStringParameters" in event
and "duration" in event["queryStringParameters"]
):
if event["queryStringParameters"]["duration"] in durations:
duration_query = event["queryStringParameters"]["duration"]
else:
return f"Duration must be either {', '.join(durations.keys())}"
2021-02-15 15:23:51 +10:00
if (
"queryStringParameters" in event
and "datetime" in event["queryStringParameters"]
):
2021-03-28 15:30:44 +10:30
requested_time = datetime.fromisoformat(
event["queryStringParameters"]["datetime"].replace("Z", "+00:00")
)
2021-02-15 15:23:51 +10:00
2021-02-02 20:44:39 +10:00
(duration, interval) = durations[duration_query]
2021-02-15 15:23:51 +10:00
2021-03-28 15:30:44 +10:30
lt = requested_time
gte = requested_time - timedelta(0, duration)
2021-02-02 20:44:39 +10:00
path = "telm-*/_search"
payload = {
"aggs": {
"2": {
"terms": {
"field": "serial.keyword",
"order": {"_key": "desc"},
"size": 10000,
},
"aggs": {
"3": {
"date_histogram": {
"field": "datetime",
"fixed_interval": f"{str(interval)}s",
"min_doc_count": 1,
},
"aggs": {
"1": {
"top_hits": {
2021-02-02 21:09:04 +10:00
# "docvalue_fields": [
# {"field": "position"},
# {"field": "alt"},
# {"field": "datetime"},
# ],
# "_source": "position",
2021-02-02 20:44:39 +10:00
"size": 1,
"sort": [{"datetime": {"order": "desc"}}],
}
}
},
}
},
}
},
"query": {
"bool": {
"filter": [
{"match_all": {}},
{
"range": {
2021-03-28 15:30:44 +10:30
"datetime": {"gte": gte.isoformat(), "lt": lt.isoformat()}
2021-02-02 17:14:38 +10:00
}
2021-02-02 21:09:04 +10:00
},
2021-02-02 20:44:39 +10:00
]
}
},
}
if "queryStringParameters" in event:
if "serial" in event["queryStringParameters"]:
payload["query"]["bool"]["filter"].append(
{
"match_phrase": {
"serial": str(event["queryStringParameters"]["serial"])
}
2021-02-02 17:14:38 +10:00
}
)
results = es_request(payload, path, "POST")
2021-02-02 20:44:39 +10:00
output = {
sonde["key"]: {
2021-02-02 21:09:04 +10:00
data["key_as_string"]: data["1"]["hits"]["hits"][0]["_source"]
2021-02-02 20:44:39 +10:00
for data in sonde["3"]["buckets"]
}
for sonde in results["aggregations"]["2"]["buckets"]
}
return json.dumps(output)
2021-03-28 15:30:44 +10:30
2021-02-22 16:13:30 +10:00
def datanew(event, context):
durations = { # ideally we shouldn't need to predefine these, but it's a shit load of data and we don't need want to overload ES
"3days": (259200, 1200), # 3d, 20m
"1day": (86400, 600), # 1d, 10m
"12hours": (43200, 120), # 12h, 2m
"6hours": (21600, 60), # 6h, 1m
2021-03-17 19:58:53 +11:00
"3hours": (10800, 30), # 3h, 10s
"1hour": (3600, 15), # 1h, 5s
2021-02-22 16:13:30 +10:00
}
duration_query = "1hour"
requested_time = datetime.now()
2021-03-28 15:30:44 +10:30
if event["queryStringParameters"]["type"] != "positions":
2021-02-22 16:13:30 +10:00
raise ValueError
2021-03-28 15:30:44 +10:30
max_positions = (
int(event["queryStringParameters"]["max_positions"])
if "max_positions" in event["queryStringParameters"]
else 10000
)
2021-02-22 16:13:30 +10:00
if event["queryStringParameters"]["mode"] in durations:
duration_query = event["queryStringParameters"]["mode"]
else:
return f"Duration must be either {', '.join(durations.keys())}"
(duration, interval) = durations[duration_query]
2021-03-28 15:30:44 +10:30
if "vehicles" in event["queryStringParameters"] and (
event["queryStringParameters"]["vehicles"] != "RS_*;*chase"
and event["queryStringParameters"]["vehicles"] != ""
):
2021-03-17 19:58:53 +11:00
interval = 1
2021-02-22 16:13:30 +10:00
if event["queryStringParameters"]["position_id"] != "0":
2021-03-28 15:30:44 +10:30
requested_time = datetime.fromisoformat(
event["queryStringParameters"]["position_id"].replace("Z", "+00:00")
)
2021-02-22 16:13:30 +10:00
lt = datetime.now()
gte = requested_time
else:
lt = datetime.now()
2021-03-28 15:30:44 +10:30
gte = datetime.now() - timedelta(0, duration)
2021-02-22 16:13:30 +10:00
path = "telm-*/_search"
payload = {
"aggs": {
"2": {
"terms": {
"field": "serial.keyword",
"order": {"_key": "desc"},
"size": 10000,
},
"aggs": {
"3": {
"date_histogram": {
"field": "datetime",
"fixed_interval": f"{str(interval)}s",
"min_doc_count": 1,
},
"aggs": {
"1": {
"top_hits": {
"size": 1,
"sort": [{"datetime": {"order": "desc"}}],
}
}
},
}
},
}
},
"query": {
"bool": {
"filter": [
{"match_all": {}},
{
"range": {
2021-03-28 15:30:44 +10:30
"datetime": {"gte": gte.isoformat(), "lt": lt.isoformat()}
2021-02-22 16:13:30 +10:00
}
2021-03-28 15:30:44 +10:30
},
2021-03-22 19:29:37 +11:00
],
2021-03-28 15:30:44 +10:30
"must_not": [{"match_phrase": {"software_name": "SondehubV1"}}],
2021-02-22 16:13:30 +10:00
}
},
}
2021-03-28 15:30:44 +10:30
if (
"vehicles" in event["queryStringParameters"]
and event["queryStringParameters"]["vehicles"] != "RS_*;*chase"
and event["queryStringParameters"]["vehicles"] != ""
):
2021-02-22 16:13:30 +10:00
payload["query"]["bool"]["filter"].append(
{
"match_phrase": {
"serial": str(event["queryStringParameters"]["vehicles"])
}
}
)
results = es_request(payload, path, "POST")
2021-03-28 15:30:44 +10:30
output = {"positions": {"position": []}}
2021-02-22 16:13:30 +10:00
for sonde in results["aggregations"]["2"]["buckets"]:
for frame in sonde["3"]["buckets"]:
try:
2021-03-28 15:30:44 +10:30
frame_data = frame["1"]["hits"]["hits"][0]["_source"]
2021-03-28 16:13:44 +10:30
# Commented out until im sure we dont need it.
# frequency = (
# f'{frame_data["frequency"]} MHz'
# if "frequency" in frame_data
# else ""
# )
# pressure = (
# f'{frame_data["pressure"]}hPa' if "pressure" in frame_data else ""
# )
# bt = (
# f'BT {frame_data["burst_timer"]}'
# if "burst_timer" in frame_data
# else ""
# )
# batt = f'{frame_data["batt"]}V' if "batt" in frame_data else ""
# subtype = frame_data["subtype"] if "subtype" in frame_data else ""
2021-03-28 15:30:44 +10:30
# Use subtype if it exists, else just use the basic type.
if "subtype" in frame_data:
_type = frame_data["subtype"]
else:
_type = frame_data["type"]
2021-03-28 16:13:44 +10:30
data = {
"manufacturer": frame_data['manufacturer'],
"type": _type
}
2021-03-14 09:47:54 +11:00
if "temp" in frame_data:
data["temperature_external"] = frame_data["temp"]
2021-03-28 15:30:44 +10:30
if "humidity" in frame_data:
2021-03-14 09:47:54 +11:00
data["humidity"] = frame_data["humidity"]
2021-03-28 15:30:44 +10:30
2021-03-14 09:47:54 +11:00
if "pressure" in frame_data:
data["pressure"] = frame_data["pressure"]
2021-03-28 15:30:44 +10:30
2021-03-14 09:47:54 +11:00
if "sats" in frame_data:
data["sats"] = frame_data["sats"]
2021-03-28 15:30:44 +10:30
2021-03-14 10:43:05 +11:00
if "batt" in frame_data:
2021-03-14 09:47:54 +11:00
data["batt"] = frame_data["batt"]
2021-03-28 15:30:44 +10:30
if "burst_timer" in frame_data:
data["burst_timer"] = frame_data["burst_timer"]
if "frequency" in frame_data:
data["frequency"] = frame_data["frequency"]
2021-03-28 16:13:44 +10:30
# May need to revisit this, if the resultant strings are too long.
if "xdata" in frame_data:
data["xdata"] = frame_data["xdata"]
2021-03-28 15:30:44 +10:30
output["positions"]["position"].append(
{
"position_id": f'{frame_data["serial"]}-{frame_data["datetime"]}',
"mission_id": "0",
"vehicle": frame_data["serial"],
"server_time": frame_data["datetime"],
"gps_time": frame_data["datetime"],
"gps_lat": frame_data["lat"],
"gps_lon": frame_data["lon"],
"gps_alt": frame_data["alt"],
"gps_heading": frame_data["heading"]
if "heading" in frame_data
else "",
"gps_speed": frame_data["vel_h"],
"type": _type,
"picture": "",
"temp_inside": "",
"data": data,
"callsign": frame_data["uploader_callsign"],
"sequence": "0",
}
)
2021-02-22 16:13:30 +10:00
except:
2021-03-14 10:43:05 +11:00
pass
2021-03-28 15:30:44 +10:30
output["positions"]["position"] = sorted(
output["positions"]["position"], key=lambda k: k["position_id"]
)
2021-02-22 16:13:30 +10:00
return json.dumps(output)
def get_listeners(event, context):
path = "telm-*/_search"
payload = {
"aggs": {
"2": {
2021-03-28 15:30:44 +10:30
"terms": {
"field": "uploader_callsign.keyword",
"order": {"_key": "desc"},
"size": 500,
2021-02-22 16:13:30 +10:00
},
2021-03-28 15:30:44 +10:30
"aggs": {
"1": {
"top_hits": {
"_source": False,
"size": 1,
"docvalue_fields": [
"uploader_position",
"uploader_alt",
"uploader_antenna.keyword",
"software_name.keyword",
"software_version.keyword",
"datetime",
],
"sort": [{"datetime": {"order": "desc"}}],
2021-02-22 16:13:30 +10:00
}
}
2021-03-28 15:30:44 +10:30
},
2021-02-22 16:13:30 +10:00
}
},
"size": 0,
"query": {
"bool": {
2021-03-28 15:30:44 +10:30
"must": [],
"filter": [
{"match_all": {}},
{"exists": {"field": "uploader_position"},},
{"exists": {"field": "uploader_alt"},},
{"exists": {"field": "uploader_antenna.keyword"},},
{"exists": {"field": "software_name.keyword"},},
{"exists": {"field": "software_version.keyword"},},
{"exists": {"field": "datetime"},},
{
"range": {
"datetime": {
"gte": "now-7d",
"lte": "now",
"format": "strict_date_optional_time",
}
}
2021-02-22 16:13:30 +10:00
},
2021-03-28 15:30:44 +10:30
],
"should": [],
"must_not": [{"match_phrase": {"type": "SondehubV1"}}],
2021-02-22 16:13:30 +10:00
}
2021-03-28 15:30:44 +10:30
},
2021-02-22 16:13:30 +10:00
}
2021-03-28 15:30:44 +10:30
2021-02-22 16:13:30 +10:00
results = es_request(payload, path, "GET")
2021-03-28 15:30:44 +10:30
2021-02-22 16:13:30 +10:00
output = [
{
2021-03-28 15:30:44 +10:30
"name": listener["key"],
"tdiff_hours": (
datetime.now(timezone.utc)
- datetime.fromisoformat(
listener["1"]["hits"]["hits"][0]["fields"]["datetime"][0].replace(
"Z", "+00:00"
)
)
).seconds
/ 60
/ 60,
"lon": float(
listener["1"]["hits"]["hits"][0]["fields"]["uploader_position"][0]
.replace(" ", "")
.split(",")[1]
),
"lat": float(
listener["1"]["hits"]["hits"][0]["fields"]["uploader_position"][0]
.replace(" ", "")
.split(",")[0]
),
"alt": float(listener["1"]["hits"]["hits"][0]["fields"]["uploader_alt"][0]),
2021-02-22 16:13:30 +10:00
"description": f"""\n
<font size=\"-2\"><BR>\n
<B>Radio: {listener["1"]["hits"]["hits"][0]["fields"]["software_name.keyword"][0]}-{listener["1"]["hits"]["hits"][0]["fields"]["software_version.keyword"][0]}</B><BR>\n
<B>Antenna: </B>{listener["1"]["hits"]["hits"][0]["fields"]["uploader_antenna.keyword"][0]}<BR>\n
<B>Last Contact: </B>{listener["1"]["hits"]["hits"][0]["fields"]["datetime"][0]} <BR>\n
</font>\n
2021-03-28 15:30:44 +10:30
""",
2021-02-22 16:13:30 +10:00
}
for listener in results["aggregations"]["2"]["buckets"]
]
return json.dumps(output)
2021-02-02 20:44:39 +10:00
2021-02-02 17:14:38 +10:00
def es_request(payload, path, method):
2021-02-02 20:44:39 +10:00
# get aws creds
2021-02-02 17:14:38 +10:00
session = boto3.Session()
params = json.dumps(payload)
2021-02-02 20:44:39 +10:00
headers = {"Host": HOST, "Content-Type": "application/json"}
request = AWSRequest(
method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
2021-02-02 17:14:38 +10:00
session = URLLib3Session()
r = session.send(request.prepare())
return json.loads(r.text)
if __name__ == "__main__":
2021-02-02 20:44:39 +10:00
# print(get_sondes({"queryStringParameters":{"lat":"-28.22717","lon":"153.82996","distance":"50000"}}, {}))
2021-02-22 16:13:30 +10:00
# mode: 6hours
2021-03-28 15:30:44 +10:30
# type: positions
# format: json
# max_positions: 0
# position_id: 0
# vehicles: RS_*;*chase
2021-02-02 21:09:04 +10:00
print(
2021-02-22 16:13:30 +10:00
datanew(
2021-03-28 15:30:44 +10:30
{
"queryStringParameters": {
"mode": "1day",
"type": "positions",
"format": "json",
"max_positions": "0",
"position_id": "0",
"vehicles": "",
}
},
{},
2021-02-02 21:09:04 +10:00
)
)