mirror of
https://github.com/projecthorus/sondehub-infra.git
synced 2025-02-21 17:26:37 +00:00
add launchsites to history data
This commit is contained in:
parent
a4de33c33a
commit
010bed2658
@ -93,20 +93,111 @@ def fetch_s3(serial):
|
|||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def write_s3(serial, data):
|
def fetch_launch_sites():
|
||||||
|
payload = {
|
||||||
|
"aggs": {
|
||||||
|
"2": {
|
||||||
|
"terms": {
|
||||||
|
"field": "serial.keyword",
|
||||||
|
"order": {
|
||||||
|
"_key": "desc"
|
||||||
|
},
|
||||||
|
"size": 10000
|
||||||
|
},
|
||||||
|
"aggs": {
|
||||||
|
"1": {
|
||||||
|
"top_hits": {
|
||||||
|
"docvalue_fields": [
|
||||||
|
{
|
||||||
|
"field": "launch_site.keyword"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"_source": "launch_site.keyword",
|
||||||
|
"size": 1,
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"datetime": {
|
||||||
|
"order": "desc"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"3": {
|
||||||
|
"top_hits": {
|
||||||
|
"docvalue_fields": [
|
||||||
|
{
|
||||||
|
"field": "launch_site_range_estimate"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"_source": "launch_site_range_estimate",
|
||||||
|
"size": 1,
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"datetime": {
|
||||||
|
"order": "desc"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"size": 0,
|
||||||
|
"_source": {
|
||||||
|
"excludes": []
|
||||||
|
},
|
||||||
|
"query": {
|
||||||
|
"bool": {
|
||||||
|
"must": [],
|
||||||
|
"filter": [
|
||||||
|
{
|
||||||
|
"match_all": {}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"range": {
|
||||||
|
"datetime": {
|
||||||
|
"gte": "now-24h",
|
||||||
|
"lte": "now",
|
||||||
|
"format": "strict_date_optional_time"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"should": [],
|
||||||
|
"must_not": []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
response = es_request(json.dumps(payload),
|
||||||
|
"reverse-prediction-*/_search", "POST")
|
||||||
|
data = { x['key'] : x for x in response['aggregations']['2']['buckets']}
|
||||||
|
output = {}
|
||||||
|
for serial in data:
|
||||||
|
try:
|
||||||
|
output[serial] = {
|
||||||
|
"launch_site": data[serial]['1']['hits']['hits'][0]['fields']['launch_site.keyword'][0],
|
||||||
|
"launch_site_range_estimate": data[serial]['3']['hits']['hits'][0]['fields']['launch_site_range_estimate'][0]
|
||||||
|
}
|
||||||
|
except:
|
||||||
|
continue
|
||||||
|
return output
|
||||||
|
|
||||||
|
def write_s3(serial, data, launch_sites):
|
||||||
#get max alt
|
#get max alt
|
||||||
|
if serial in launch_sites:
|
||||||
|
for x in data:
|
||||||
|
x["launch_site"] = launch_sites[serial]["launch_site"]
|
||||||
|
x["launch_site_range_estimate"] = launch_sites[serial]["launch_site_range_estimate"]
|
||||||
max_alt = sorted(data, key=lambda k: int(k['alt']))[-1]
|
max_alt = sorted(data, key=lambda k: int(k['alt']))[-1]
|
||||||
summary = [
|
summary = [
|
||||||
data[0],
|
data[0],
|
||||||
max_alt,
|
max_alt,
|
||||||
data[-1]
|
data[-1]
|
||||||
]
|
]
|
||||||
dates = set([x['datetime'].split("T")[0].replace("-","/") for x in data])
|
metadata = {
|
||||||
for date in dates:
|
|
||||||
object = s3.Object(BUCKET,f'date/{date}/{serial}.json')
|
|
||||||
object.put(
|
|
||||||
Body=json.dumps(summary).encode("utf-8"),
|
|
||||||
Metadata={
|
|
||||||
"first-lat": str(summary[0]['lat']),
|
"first-lat": str(summary[0]['lat']),
|
||||||
"first-lon": str(summary[0]['lon']),
|
"first-lon": str(summary[0]['lon']),
|
||||||
"first-alt": str(summary[0]['alt']),
|
"first-alt": str(summary[0]['alt']),
|
||||||
@ -117,29 +208,40 @@ def write_s3(serial, data):
|
|||||||
"last-lon": str(summary[2]['lon']),
|
"last-lon": str(summary[2]['lon']),
|
||||||
"last-alt": str(summary[2]['alt'])
|
"last-alt": str(summary[2]['alt'])
|
||||||
}
|
}
|
||||||
|
if serial in launch_sites:
|
||||||
|
metadata["launch_site"] = launch_sites[serial]["launch_site"]
|
||||||
|
|
||||||
|
dates = set([x['datetime'].split("T")[0].replace("-","/") for x in data])
|
||||||
|
|
||||||
|
for date in dates:
|
||||||
|
object = s3.Object(BUCKET,f'date/{date}/{serial}.json')
|
||||||
|
object.put(
|
||||||
|
Body=json.dumps(summary).encode("utf-8"),
|
||||||
|
Metadata=metadata
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if serial in launch_sites:
|
||||||
|
object = s3.Object(BUCKET,f'launchsites/{launch_sites[serial]["launch_site"]}/{date}/{serial}.json')
|
||||||
|
object.put(
|
||||||
|
Body=json.dumps(summary).encode("utf-8"),
|
||||||
|
Metadata=metadata
|
||||||
|
)
|
||||||
|
|
||||||
gz_data = gzip.compress(json.dumps(data).encode('utf-8'))
|
gz_data = gzip.compress(json.dumps(data).encode('utf-8'))
|
||||||
object = s3.Object(BUCKET,f'serial/{serial}.json.gz')
|
object = s3.Object(BUCKET,f'serial/{serial}.json.gz')
|
||||||
object.put(
|
object.put(
|
||||||
Body=gz_data,
|
Body=gz_data,
|
||||||
ContentType='application/json',
|
ContentType='application/json',
|
||||||
ContentEncoding='gzip',
|
ContentEncoding='gzip',
|
||||||
Metadata={
|
Metadata=metadata
|
||||||
"first-lat": str(summary[0]['lat']),
|
|
||||||
"first-lon": str(summary[0]['lon']),
|
|
||||||
"first-alt": str(summary[0]['alt']),
|
|
||||||
"max-lat": str(summary[1]['lat']),
|
|
||||||
"max-lon": str(summary[1]['lon']),
|
|
||||||
"max-alt": str(summary[1]['alt']),
|
|
||||||
"last-lat": str(summary[2]['lat']),
|
|
||||||
"last-lon": str(summary[2]['lon']),
|
|
||||||
"last-alt": str(summary[2]['alt'])
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def handler(event, context):
|
def handler(event, context):
|
||||||
print(json.dumps(event))
|
print(json.dumps(event))
|
||||||
payloads = {}
|
payloads = {}
|
||||||
|
launch_sites = fetch_launch_sites()
|
||||||
for record in event['Records']:
|
for record in event['Records']:
|
||||||
serial = record["body"]
|
serial = record["body"]
|
||||||
print(f"Getting {serial} S3")
|
print(f"Getting {serial} S3")
|
||||||
@ -151,7 +253,7 @@ def handler(event, context):
|
|||||||
data = [dict(t) for t in {tuple(d.items()) for d in data}]
|
data = [dict(t) for t in {tuple(d.items()) for d in data}]
|
||||||
data = sorted(data, key=lambda k: k['datetime']) # sort by datetime
|
data = sorted(data, key=lambda k: k['datetime']) # sort by datetime
|
||||||
print(f"Writing {serial} to s3")
|
print(f"Writing {serial} to s3")
|
||||||
write_s3(serial, data)
|
write_s3(serial, data, launch_sites)
|
||||||
print(f"{serial} done")
|
print(f"{serial} done")
|
||||||
|
|
||||||
|
|
||||||
@ -162,7 +264,7 @@ if __name__ == "__main__":
|
|||||||
{
|
{
|
||||||
"messageId": "3b5853b3-369c-40bf-8746-130c918fbb5c",
|
"messageId": "3b5853b3-369c-40bf-8746-130c918fbb5c",
|
||||||
"receiptHandle": "AQEBg+/MIA2rSNmlrpXvk7pbi26kgIzqhairaHWGSpMgLzf2T54PLUmG+eG6CDOv35e42scDH0gppmS9RTQVu8D161oHYohhd1+0S4LtFJdgXr3At86NBIky5+y1A/wVuUm1FNQSvGKIDRgBhCgcCzLkEMNYIWWeDmg2ez2SCPu/3hmY5sc7eC32uqz5es9PspgQXOTykmoNv/q37iy2RBRDdu51Tq7yIxEr+Us3qkQrddAJ7qsA0l09aRL+/PJe1V/7MMN3CFyBATpRP/G3Gjn0Iuu4i2UhkRx2pF+0Hj8yhhHbqTMcw5sbbGIWMdsMXFQKUCHNx6HPkbuwIWo0TsINQjY7IXeZM/mNq65xC4avSlctJ/9BMzOBtFwbnRPZfHmlS5Al2nF1Vu3RecFGbTm1nQ==",
|
"receiptHandle": "AQEBg+/MIA2rSNmlrpXvk7pbi26kgIzqhairaHWGSpMgLzf2T54PLUmG+eG6CDOv35e42scDH0gppmS9RTQVu8D161oHYohhd1+0S4LtFJdgXr3At86NBIky5+y1A/wVuUm1FNQSvGKIDRgBhCgcCzLkEMNYIWWeDmg2ez2SCPu/3hmY5sc7eC32uqz5es9PspgQXOTykmoNv/q37iy2RBRDdu51Tq7yIxEr+Us3qkQrddAJ7qsA0l09aRL+/PJe1V/7MMN3CFyBATpRP/G3Gjn0Iuu4i2UhkRx2pF+0Hj8yhhHbqTMcw5sbbGIWMdsMXFQKUCHNx6HPkbuwIWo0TsINQjY7IXeZM/mNq65xC4avSlctJ/9BMzOBtFwbnRPZfHmlS5Al2nF1Vu3RecFGbTm1nQ==",
|
||||||
"body": "S2710639",
|
"body": "R0230678",
|
||||||
"attributes": {
|
"attributes": {
|
||||||
"ApproximateReceiveCount": "1",
|
"ApproximateReceiveCount": "1",
|
||||||
"SentTimestamp": "1627873604999",
|
"SentTimestamp": "1627873604999",
|
||||||
|
@ -20,6 +20,47 @@ HOST = os.getenv("ES")
|
|||||||
|
|
||||||
def predict(event, context):
|
def predict(event, context):
|
||||||
path = "reverse-prediction-*/_search"
|
path = "reverse-prediction-*/_search"
|
||||||
|
|
||||||
|
|
||||||
|
durations = { # ideally we shouldn't need to predefine these, but it's a shit load of data and we don't need want to overload ES
|
||||||
|
"3d": (259200, 1200), # 3d, 20m
|
||||||
|
"1d": (86400, 600), # 1d, 10m
|
||||||
|
"12h": (43200, 600), # 1d, 10m
|
||||||
|
"6h": (21600, 120), # 6h, 1m
|
||||||
|
"3h": (10800, 60), # 3h, 10s
|
||||||
|
"1h": (3600, 40),
|
||||||
|
"30m": (1800, 20),
|
||||||
|
"1m": (60, 1),
|
||||||
|
"15s": (15, 1),
|
||||||
|
"0": (0, 1) # for getting a single time point
|
||||||
|
}
|
||||||
|
duration_query = "6h"
|
||||||
|
|
||||||
|
if (
|
||||||
|
"queryStringParameters" in event
|
||||||
|
and "duration" in event["queryStringParameters"]
|
||||||
|
):
|
||||||
|
if event["queryStringParameters"]["duration"] in durations:
|
||||||
|
duration_query = event["queryStringParameters"]["duration"]
|
||||||
|
else:
|
||||||
|
return f"Duration must be either {', '.join(durations.keys())}"
|
||||||
|
|
||||||
|
if (
|
||||||
|
"queryStringParameters" in event
|
||||||
|
and "datetime" in event["queryStringParameters"]
|
||||||
|
):
|
||||||
|
requested_time = datetime.fromisoformat(
|
||||||
|
event["queryStringParameters"]["datetime"].replace("Z", "+00:00")
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
requested_time = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
(duration, interval) = durations[duration_query]
|
||||||
|
|
||||||
|
lt = requested_time + timedelta(0, 1)
|
||||||
|
gte = requested_time - timedelta(0, duration)
|
||||||
|
|
||||||
|
|
||||||
payload = {
|
payload = {
|
||||||
"aggs": {
|
"aggs": {
|
||||||
"2": {
|
"2": {
|
||||||
@ -67,11 +108,7 @@ def predict(event, context):
|
|||||||
"filter": [
|
"filter": [
|
||||||
{
|
{
|
||||||
"range": {
|
"range": {
|
||||||
"datetime": {
|
"datetime": {"gte": gte.isoformat(), "lt": lt.isoformat()}
|
||||||
"gte": "now-6h",
|
|
||||||
"lte": "now",
|
|
||||||
"format": "strict_date_optional_time"
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
|
Loading…
x
Reference in New Issue
Block a user