diff --git a/predict_updater/lambda_function.py b/predict_updater/lambda_function.py index 042d955..5923a74 100644 --- a/predict_updater/lambda_function.py +++ b/predict_updater/lambda_function.py @@ -306,6 +306,54 @@ def get_launch_estimate(conn, timestamp, latitude, longitude, altitude, ascent_r else: return None +# return a dict key'd by serial with reverse prediction data +def get_reverse_predictions(): + path = "reverse-prediction-*/_search" + payload = { + "size": 1000, + "sort": [ + { + "datetime": { + "order": "asc", + "unmapped_type": "boolean" + } + } + ], + "query": { + "bool": { + "filter": [ + { + "range": { + "datetime": { + "gte": "now-1d", + "lte": "now", + "format": "strict_date_optional_time" + } + } + } + ] + } + } + } + logging.debug("Start ES Request") + results = es_request(json.dumps(payload), path, "POST") + logging.debug("Finished ES Request") + return { x['_source']['serial'] : x['_source'] for x in results['hits']['hits']} + +def bulk_upload_es(index_prefix,payloads): + body="" + for payload in payloads: + body += "{\"index\":{}}\n" + json.dumps(payload) + "\n" + body += "\n" + date_prefix = datetime.now().strftime("%Y-%m") + result = es_request(body, f"{index_prefix}-{date_prefix}/_doc/_bulk", "POST") + + if 'errors' in result and result['errors'] == True: + error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types + error_types = [a for a in error_types if a != 'mapper_parsing_exception'] # filter out mapper failures since they will never succeed + if error_types: + print(result) + raise RuntimeError def predict(event, context): path = "telm-*/_search" @@ -553,21 +601,7 @@ def predict(event, context): } ) - - # ES bulk update - body="" - for payload in output: - body += "{\"index\":{}}\n" + json.dumps(payload) + "\n" - body += "\n" - index = datetime.now().strftime("%Y-%m") - result = es_request(body, f"predictions-{index}/_doc/_bulk", "POST") - if 'errors' in result and result['errors'] == True: - error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types - error_types = [a for a in error_types if a != 'mapper_parsing_exception'] # filter out mapper failures since they will never succeed - if error_types: - print(event) - print(result) - raise RuntimeError + bulk_upload_es("predictions", output) logging.debug("Finished") return @@ -615,8 +649,22 @@ if __name__ == "__main__": # print(f"{_serial['serial']}: {len(_serial['data'])}") - print(predict( - {},{} - )) - + # print(predict( + # {},{} + # )) + bulk_upload_es("reverse-prediction",[{ + "datetime" : "2021-10-04", + "data" : { }, + "serial" : "R12341234", + "station" : "-2", + "subtype" : "RS41-SGM", + "ascent_rate" : "5", + "alt" : 1000, + "position" : [ + 1, + 2 + ], + "type" : "RS41" + }] + )