sondehub-infra/query/lambda_function.py

537 lines
18 KiB
Python
Raw Normal View History

2021-02-02 07:14:38 +00:00
import boto3
import botocore.credentials
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import json
import os
2021-02-22 06:13:30 +00:00
from datetime import datetime, timedelta, timezone
import sys, traceback
2021-04-20 08:06:21 +00:00
import re
2021-04-27 22:30:28 +00:00
import html
2021-07-18 22:38:31 +00:00
import base64
import gzip
from io import BytesIO
2021-02-02 07:14:38 +00:00
2021-02-02 10:44:39 +00:00
HOST = os.getenv("ES")
2021-02-02 07:14:38 +00:00
# get current sondes, filter by date, location
2021-02-02 10:44:39 +00:00
2021-02-02 07:14:38 +00:00
def get_sondes(event, context):
path = "telm-*/_search"
payload = {
"aggs": {
"2": {
2021-02-02 10:44:39 +00:00
"terms": {
"field": "serial.keyword",
"order": {"_key": "desc"},
"size": 10000,
2021-02-02 07:14:38 +00:00
},
2021-02-02 10:44:39 +00:00
"aggs": {
"1": {
"top_hits": {
"size": 1,
"sort": [{"datetime": {"order": "desc"}}],
2021-02-02 07:14:38 +00:00
}
}
2021-02-02 10:44:39 +00:00
},
2021-02-02 07:14:38 +00:00
}
},
2021-02-02 10:44:39 +00:00
"query": {"bool": {"filter": [{"match_all": {}}]}},
2021-02-02 07:14:38 +00:00
}
# add filters
if "queryStringParameters" in event:
if "last" in event["queryStringParameters"]:
payload["query"]["bool"]["filter"].append(
{
"range": {
"datetime": {
"gte": f"now-{int(event['queryStringParameters']['last'])}s",
2021-07-24 09:33:29 +00:00
"lte": "now+1m",
2021-02-02 07:14:38 +00:00
}
}
}
)
else:
payload["query"]["bool"]["filter"].append(
{"range": {"datetime": {"gte": "now-1d", "lte": "now+1m"}}}
)
2021-02-02 10:44:39 +00:00
if (
"lat" in event["queryStringParameters"]
and "lon" in event["queryStringParameters"]
and "distance" in event["queryStringParameters"]
):
2021-02-02 07:14:38 +00:00
payload["query"]["bool"]["filter"].append(
{
"geo_distance": {
"distance": f"{int(event['queryStringParameters']['distance'])}m",
"position": {
2021-02-02 10:44:39 +00:00
"lat": float(event["queryStringParameters"]["lat"]),
"lon": float(event["queryStringParameters"]["lon"]),
},
2021-02-02 07:14:38 +00:00
}
}
)
else:
2021-02-02 07:14:38 +00:00
payload["query"]["bool"]["filter"].append(
{"range": {"datetime": {"gte": "now-1d", "lte": "now+1m"}}}
2021-02-02 10:44:39 +00:00
)
2021-02-02 10:44:39 +00:00
results = es_request(payload, path, "POST")
buckets = results["aggregations"]["2"]["buckets"]
sondes = {
bucket["1"]["hits"]["hits"][0]["_source"]["serial"]: bucket["1"]["hits"][
"hits"
][0]["_source"]
for bucket in buckets
}
return json.dumps(sondes)
def get_telem(event, context):
durations = { # ideally we shouldn't need to predefine these, but it's a shit load of data and we don't need want to overload ES
"3d": (259200, 1200), # 3d, 20m
"1d": (86400, 600), # 1d, 10m
2021-07-19 05:54:54 +00:00
"12h": (43200, 600), # 1d, 10m
2021-09-17 13:50:36 +00:00
"6h": (21600, 120), # 6h, 1m
"3h": (10800, 60), # 3h, 10s
"1h": (3600, 40),
"30m": (1800, 20),
"1m": (60, 1),
"15s": (15, 1),
"0": (0, 1) # for getting a single time point
2021-02-02 10:44:39 +00:00
}
duration_query = "3h"
2021-04-09 07:14:14 +00:00
requested_time = datetime.now(timezone.utc)
2021-02-15 05:23:51 +00:00
2021-02-02 10:44:39 +00:00
if (
"queryStringParameters" in event
and "duration" in event["queryStringParameters"]
):
if event["queryStringParameters"]["duration"] in durations:
duration_query = event["queryStringParameters"]["duration"]
else:
return f"Duration must be either {', '.join(durations.keys())}"
2021-02-15 05:23:51 +00:00
if (
"queryStringParameters" in event
and "datetime" in event["queryStringParameters"]
):
2021-03-28 05:00:44 +00:00
requested_time = datetime.fromisoformat(
event["queryStringParameters"]["datetime"].replace("Z", "+00:00")
)
2021-02-15 05:23:51 +00:00
2021-02-02 10:44:39 +00:00
(duration, interval) = durations[duration_query]
if "serial" in event["queryStringParameters"]:
interval = 1
lt = requested_time + timedelta(0, 1)
2021-03-28 05:00:44 +00:00
gte = requested_time - timedelta(0, duration)
2021-02-02 10:44:39 +00:00
path = f"telm-{lt.year:2}-{lt.month:02},telm-{gte.year:2}-{gte.month:02}/_search"
2021-02-02 10:44:39 +00:00
payload = {
"timeout": "30s",
"size": 0,
2021-02-02 10:44:39 +00:00
"aggs": {
"2": {
"terms": {
"field": "serial.keyword",
"order": {"_key": "desc"},
"size": 10000,
},
"aggs": {
"3": {
"date_histogram": {
"field": "datetime",
"fixed_interval": f"{str(interval)}s",
"min_doc_count": 1,
},
"aggs": {
"1": {
"top_hits": {
2021-02-02 11:09:04 +00:00
# "docvalue_fields": [
# {"field": "position"},
# {"field": "alt"},
# {"field": "datetime"},
# ],
# "_source": "position",
"size": 10 if (duration == 0 ) else 1,
2021-07-26 09:51:48 +00:00
"sort": [
{"datetime": {"order": "desc"}},
2021-08-20 13:41:29 +00:00
{"pressure": {"order": "desc","mode" : "median"}}
2021-07-26 09:51:48 +00:00
],
2021-02-02 10:44:39 +00:00
}
}
},
}
},
}
},
"query": {
"bool": {
2021-07-21 09:54:09 +00:00
"must_not": [{"match_phrase": {"software_name": "SondehubV1"}}, {"match_phrase": {"serial": "xxxxxxxx"}}],
2021-02-02 10:44:39 +00:00
"filter": [
{"match_all": {}},
{
"range": {
2021-03-28 05:00:44 +00:00
"datetime": {"gte": gte.isoformat(), "lt": lt.isoformat()}
2021-02-02 07:14:38 +00:00
}
2021-02-02 11:09:04 +00:00
},
2021-02-02 10:44:39 +00:00
]
}
},
}
if "queryStringParameters" in event:
if "serial" in event["queryStringParameters"]:
payload["query"]["bool"]["filter"].append(
{
"match_phrase": {
"serial": str(event["queryStringParameters"]["serial"])
}
2021-02-02 07:14:38 +00:00
}
)
results = es_request(payload, path, "POST")
2021-02-02 10:44:39 +00:00
output = {
sonde["key"]: {
2021-07-26 09:51:48 +00:00
data["key_as_string"]: dict(data["1"]["hits"]["hits"][0]["_source"],
uploaders=[ #add additional uploader information
{key:value for key,value in uploader['_source'].items() if key in ["snr","rssi","uploader_callsign"]}
for uploader in data["1"]["hits"]["hits"]
])
2021-02-02 10:44:39 +00:00
for data in sonde["3"]["buckets"]
}
for sonde in results["aggregations"]["2"]["buckets"]
}
2021-03-28 05:00:44 +00:00
2021-07-21 09:54:09 +00:00
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
json_response = json.dumps(output)
f.write(json_response.encode('utf-8'))
gzippedResponse = compressed.getvalue()
return {
"body": base64.b64encode(gzippedResponse).decode(),
"isBase64Encoded": True,
"statusCode": 200,
"headers": {
"Content-Encoding": "gzip",
"content-type": "application/json"
}
}
def get_listener_telemetry(event, context):
durations = { # ideally we shouldn't need to predefine these, but it's a shit load of data and we don't need want to overload ES
2021-08-20 13:41:29 +00:00
"3d": (259200, 2400), # 3d, 20m
"1d": (86400, 2400), # 1d, 10m
"12h": (43200, 1200), # 1d, 10m
"6h": (21600, 300), # 6h, 1m
"3h": (10800, 120), # 3h, 10s
"1h": (3600, 120),
"30m": (1800, 30),
2021-07-21 09:54:09 +00:00
"1m": (60, 1),
2021-08-20 13:41:29 +00:00
"15s": (15, 1),
"0": (0, 1)
2021-07-21 09:54:09 +00:00
}
duration_query = "3h"
requested_time = datetime.now(timezone.utc)
if (
"queryStringParameters" in event
and "duration" in event["queryStringParameters"]
):
if event["queryStringParameters"]["duration"] in durations:
duration_query = event["queryStringParameters"]["duration"]
else:
return f"Duration must be either {', '.join(durations.keys())}"
if (
"queryStringParameters" in event
and "datetime" in event["queryStringParameters"]
):
requested_time = datetime.fromisoformat(
event["queryStringParameters"]["datetime"].replace("Z", "+00:00")
)
(duration, interval) = durations[duration_query]
if "uploader_callsign" in event["queryStringParameters"]:
interval = 1
lt = requested_time
gte = requested_time - timedelta(0, duration)
path = "listeners-*/_search"
payload = {
"timeout": "30s",
"aggs": {
"2": {
"terms": {
"field": "uploader_callsign.keyword",
"order": {"_key": "desc"},
"size": 10000,
},
"aggs": {
"3": {
"date_histogram": {
"field": "ts",
"fixed_interval": f"{str(interval)}s",
"min_doc_count": 1,
},
"aggs": {
"1": {
"top_hits": {
# "docvalue_fields": [
# {"field": "position"},
# {"field": "alt"},
# {"field": "datetime"},
# ],
# "_source": "position",
"size": 1,
"sort": [{"ts": {"order": "desc"}}],
}
}
},
}
},
}
},
"query": {
"bool": {
"filter": [
{"match_all": {}},
{
"range": {
"ts": {"gte": gte.isoformat(), "lt": lt.isoformat()}
}
},
]
}
},
}
if "queryStringParameters" in event:
if "uploader_callsign" in event["queryStringParameters"]:
payload["query"]["bool"]["filter"].append(
{
"match_phrase": {
"uploader_callsign": str(event["queryStringParameters"]["uploader_callsign"])
}
}
)
results = es_request(payload, path, "POST")
output = {
sonde["key"]: {
data["key_as_string"]: data["1"]["hits"]["hits"][0]["_source"]
for data in sonde["3"]["buckets"]
}
for sonde in results["aggregations"]["2"]["buckets"]
}
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
json_response = json.dumps(output)
f.write(json_response.encode('utf-8'))
gzippedResponse = compressed.getvalue()
return {
"body": base64.b64encode(gzippedResponse).decode(),
"isBase64Encoded": True,
"statusCode": 200,
"headers": {
"Content-Encoding": "gzip",
"content-type": "application/json"
}
}
2021-03-28 05:00:44 +00:00
2021-02-22 06:13:30 +00:00
def get_listeners(event, context):
2021-04-05 03:52:37 +00:00
path = "listeners-*/_search"
2021-02-22 06:13:30 +00:00
payload = {
"timeout": "30s",
2021-02-22 06:13:30 +00:00
"aggs": {
"2": {
2021-03-28 05:00:44 +00:00
"terms": {
"field": "uploader_callsign.keyword",
"order": {"_key": "desc"},
"size": 500,
2021-02-22 06:13:30 +00:00
},
2021-03-28 05:00:44 +00:00
"aggs": {
"1": {
"top_hits": {
"_source": False,
"size": 1,
"docvalue_fields": [
2021-04-05 03:52:37 +00:00
"uploader_position_elk",
2021-03-28 05:00:44 +00:00
"uploader_alt",
"uploader_antenna.keyword",
"software_name.keyword",
"software_version.keyword",
2021-04-05 03:52:37 +00:00
"ts",
2021-03-28 05:00:44 +00:00
],
2021-04-05 03:52:37 +00:00
"sort": [{"ts": {"order": "desc"}}],
2021-02-22 06:13:30 +00:00
}
}
2021-03-28 05:00:44 +00:00
},
2021-02-22 06:13:30 +00:00
}
},
"size": 0,
"query": {
"bool": {
2021-03-28 05:00:44 +00:00
"must": [],
"filter": [
{"match_all": {}},
2021-04-05 03:52:37 +00:00
{"exists": {"field": "uploader_position_elk"},},
2021-03-28 05:00:44 +00:00
{"exists": {"field": "uploader_antenna.keyword"},},
{"exists": {"field": "software_name.keyword"},},
{"exists": {"field": "software_version.keyword"},},
2021-04-05 03:52:37 +00:00
{"exists": {"field": "ts"},},
2021-03-28 05:00:44 +00:00
{
"range": {
2021-04-05 03:52:37 +00:00
"ts": {
2021-04-09 07:30:01 +00:00
"gte": "now-24h",
2021-07-24 09:33:29 +00:00
"lte": "now+1m",
2021-03-28 05:00:44 +00:00
"format": "strict_date_optional_time",
}
}
2021-02-22 06:13:30 +00:00
},
2021-04-09 07:30:01 +00:00
2021-03-28 05:00:44 +00:00
],
"should": [],
2021-04-09 07:30:01 +00:00
"must_not": [
{"match_phrase": {"mobile": "true"}},
],
2021-02-22 06:13:30 +00:00
}
2021-03-28 05:00:44 +00:00
},
2021-02-22 06:13:30 +00:00
}
2021-03-28 05:00:44 +00:00
2021-02-22 06:13:30 +00:00
results = es_request(payload, path, "GET")
2021-03-28 05:00:44 +00:00
2021-02-22 06:13:30 +00:00
output = [
{
2021-04-27 22:30:28 +00:00
"name": html.escape(listener["key"]),
2021-03-28 05:00:44 +00:00
"tdiff_hours": (
datetime.now(timezone.utc)
- datetime.fromisoformat(
2021-04-05 03:52:37 +00:00
listener["1"]["hits"]["hits"][0]["fields"]["ts"][0].replace(
2021-03-28 05:00:44 +00:00
"Z", "+00:00"
)
)
).seconds
/ 60
/ 60,
"lon": float(
2021-04-05 03:52:37 +00:00
listener["1"]["hits"]["hits"][0]["fields"]["uploader_position_elk"][0]
2021-03-28 05:00:44 +00:00
.replace(" ", "")
.split(",")[1]
),
"lat": float(
2021-04-05 03:52:37 +00:00
listener["1"]["hits"]["hits"][0]["fields"]["uploader_position_elk"][0]
2021-03-28 05:00:44 +00:00
.replace(" ", "")
.split(",")[0]
),
"alt": float(listener["1"]["hits"]["hits"][0]["fields"]["uploader_alt"][0]) if "uploader_alt" in listener["1"]["hits"]["hits"][0]["fields"] else 0,
2021-02-22 06:13:30 +00:00
"description": f"""\n
<font size=\"-2\"><BR>\n
2021-04-27 22:30:28 +00:00
<B>Radio: {html.escape(listener["1"]["hits"]["hits"][0]["fields"]["software_name.keyword"][0])}-{html.escape(listener["1"]["hits"]["hits"][0]["fields"]["software_version.keyword"][0])}</B><BR>\n
<B>Antenna: </B>{html.escape(listener["1"]["hits"]["hits"][0]["fields"]["uploader_antenna.keyword"][0])}<BR>\n
<B>Last Contact: </B>{html.escape(listener["1"]["hits"]["hits"][0]["fields"]["ts"][0])} <BR>\n
2021-02-22 06:13:30 +00:00
</font>\n
2021-03-28 05:00:44 +00:00
""",
2021-02-22 06:13:30 +00:00
}
for listener in results["aggregations"]["2"]["buckets"]
]
return json.dumps(output)
2021-02-02 10:44:39 +00:00
2021-02-02 07:14:38 +00:00
def es_request(payload, path, method):
2021-02-02 10:44:39 +00:00
# get aws creds
2021-02-02 07:14:38 +00:00
session = boto3.Session()
params = json.dumps(payload)
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(params.encode('utf-8'))
params = compressed.getvalue()
headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"}
2021-02-02 10:44:39 +00:00
request = AWSRequest(
method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
2021-02-02 07:14:38 +00:00
session = URLLib3Session()
r = session.send(request.prepare())
return json.loads(r.text)
if __name__ == "__main__":
#print(get_sondes({"queryStringParameters":{"lat":"-32.7933","lon":"151.8358","distance":"5000", "last":"604800"}}, {}))
2021-02-22 06:13:30 +00:00
# mode: 6hours
2021-03-28 05:00:44 +00:00
# type: positions
# format: json
# max_positions: 0
# position_id: 0
# vehicles: RS_*;*chase
2021-07-21 09:54:09 +00:00
# print(
# datanew(
# {
# "queryStringParameters": {
# "mode": "single",
# "format": "json",
# "position_id": "S1443103-2021-07-20T12:46:19.040000Z"
# }
# },
# {},
# )
# )
print(
2021-10-04 08:08:54 +00:00
get_telem(
{
"queryStringParameters": {
"duration": "3d",
"serial": "P4120469"
}},{}
)
)
2021-07-24 09:33:29 +00:00
# print (
# get_chase(
# {"queryStringParameters": {
# "duration": "1d"
# }
# },
# {}
# )
# )
2021-07-21 09:54:09 +00:00
# print(
# datanew(
# {
# "queryStringParameters": {
# "type": "positions",
# "mode": "3hours",
2021-07-24 09:33:29 +00:00
# "position_id": "0"
2021-07-21 09:54:09 +00:00
# }
# },
# {},
# )
# )
# print(
# get_telem(
# {
# "queryStringParameters":{
# # "serial": "S3210639",
# "duration": "3h",
# # "datetime": "2021-07-26T06:49:29.001000Z"
# }
# }, {}
# )
# )
2021-07-21 09:54:09 +00:00