mirror of
https://github.com/projecthorus/sondehub-infra.git
synced 2025-04-07 19:14:15 +00:00
add a way to upload and download reverse predictions
This commit is contained in:
parent
9e8db61ed5
commit
35d695cf82
@ -306,6 +306,54 @@ def get_launch_estimate(conn, timestamp, latitude, longitude, altitude, ascent_r
|
||||
else:
|
||||
return None
|
||||
|
||||
# return a dict key'd by serial with reverse prediction data
|
||||
def get_reverse_predictions():
|
||||
path = "reverse-prediction-*/_search"
|
||||
payload = {
|
||||
"size": 1000,
|
||||
"sort": [
|
||||
{
|
||||
"datetime": {
|
||||
"order": "asc",
|
||||
"unmapped_type": "boolean"
|
||||
}
|
||||
}
|
||||
],
|
||||
"query": {
|
||||
"bool": {
|
||||
"filter": [
|
||||
{
|
||||
"range": {
|
||||
"datetime": {
|
||||
"gte": "now-1d",
|
||||
"lte": "now",
|
||||
"format": "strict_date_optional_time"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
logging.debug("Start ES Request")
|
||||
results = es_request(json.dumps(payload), path, "POST")
|
||||
logging.debug("Finished ES Request")
|
||||
return { x['_source']['serial'] : x['_source'] for x in results['hits']['hits']}
|
||||
|
||||
def bulk_upload_es(index_prefix,payloads):
|
||||
body=""
|
||||
for payload in payloads:
|
||||
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
|
||||
body += "\n"
|
||||
date_prefix = datetime.now().strftime("%Y-%m")
|
||||
result = es_request(body, f"{index_prefix}-{date_prefix}/_doc/_bulk", "POST")
|
||||
|
||||
if 'errors' in result and result['errors'] == True:
|
||||
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
|
||||
error_types = [a for a in error_types if a != 'mapper_parsing_exception'] # filter out mapper failures since they will never succeed
|
||||
if error_types:
|
||||
print(result)
|
||||
raise RuntimeError
|
||||
|
||||
def predict(event, context):
|
||||
path = "telm-*/_search"
|
||||
@ -553,21 +601,7 @@ def predict(event, context):
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
# ES bulk update
|
||||
body=""
|
||||
for payload in output:
|
||||
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
|
||||
body += "\n"
|
||||
index = datetime.now().strftime("%Y-%m")
|
||||
result = es_request(body, f"predictions-{index}/_doc/_bulk", "POST")
|
||||
if 'errors' in result and result['errors'] == True:
|
||||
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
|
||||
error_types = [a for a in error_types if a != 'mapper_parsing_exception'] # filter out mapper failures since they will never succeed
|
||||
if error_types:
|
||||
print(event)
|
||||
print(result)
|
||||
raise RuntimeError
|
||||
bulk_upload_es("predictions", output)
|
||||
|
||||
logging.debug("Finished")
|
||||
return
|
||||
@ -615,8 +649,22 @@ if __name__ == "__main__":
|
||||
# print(f"{_serial['serial']}: {len(_serial['data'])}")
|
||||
|
||||
|
||||
print(predict(
|
||||
{},{}
|
||||
))
|
||||
|
||||
# print(predict(
|
||||
# {},{}
|
||||
# ))
|
||||
bulk_upload_es("reverse-prediction",[{
|
||||
"datetime" : "2021-10-04",
|
||||
"data" : { },
|
||||
"serial" : "R12341234",
|
||||
"station" : "-2",
|
||||
"subtype" : "RS41-SGM",
|
||||
"ascent_rate" : "5",
|
||||
"alt" : 1000,
|
||||
"position" : [
|
||||
1,
|
||||
2
|
||||
],
|
||||
"type" : "RS41"
|
||||
}]
|
||||
)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user