From 2923f306544943b89906dead3413ab3baee2061e Mon Sep 17 00:00:00 2001 From: Michaela Wheeler Date: Thu, 5 May 2022 18:06:02 +1000 Subject: [PATCH] Use flightdoc if avaliable (#73) * Use flightdoc if avaliable * Terraform fmt [skip ci] Co-authored-by: xss Co-authored-by: TheSkorm --- es.tf | 18 +- ham_query.tf | 24 +-- lambda/ham_predict_updater/__init__.py | 253 +++++++++++++++++++------ 3 files changed, 213 insertions(+), 82 deletions(-) diff --git a/es.tf b/es.tf index b1e65dd..13d213a 100644 --- a/es.tf +++ b/es.tf @@ -109,8 +109,8 @@ resource "aws_cognito_identity_pool" "CognitoIdentityPool" { } cognito_identity_providers { - client_id = "u3ggvo1spp1e6cffbietq7fbm" - provider_name = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM" + client_id = "u3ggvo1spp1e6cffbietq7fbm" + provider_name = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM" } } @@ -127,16 +127,16 @@ resource "aws_cognito_identity_pool_roles_attachment" "CognitoIdentityPoolRoleAt type = "Token" } role_mapping { - ambiguous_role_resolution = "AuthenticatedRole" - identity_provider = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM:227g2bbcb2tqjfii1ipt2tj5m6" - type = "Token" + ambiguous_role_resolution = "AuthenticatedRole" + identity_provider = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM:227g2bbcb2tqjfii1ipt2tj5m6" + type = "Token" } role_mapping { - ambiguous_role_resolution = "AuthenticatedRole" - identity_provider = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM:u3ggvo1spp1e6cffbietq7fbm" - type = "Token" - } + ambiguous_role_resolution = "AuthenticatedRole" + identity_provider = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM:u3ggvo1spp1e6cffbietq7fbm" + type = "Token" + } role_mapping { ambiguous_role_resolution = "AuthenticatedRole" identity_provider = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM:7v892rnrta8ms785pl0aaqo8ke" diff --git a/ham_query.tf b/ham_query.tf index 09a2563..552453d 100644 --- a/ham_query.tf +++ b/ham_query.tf @@ -130,20 +130,20 @@ resource "aws_lambda_permission" "ham_get_listener_telemetry" { } resource "aws_apigatewayv2_route" "ham_get_listener_telemetry" { - api_id = aws_apigatewayv2_api.main.id - api_key_required = false - authorization_type = "NONE" - route_key = "GET /amateur/listeners/telemetry" - target = "integrations/${aws_apigatewayv2_integration.ham_get_listener_telemetry.id}" + api_id = aws_apigatewayv2_api.main.id + api_key_required = false + authorization_type = "NONE" + route_key = "GET /amateur/listeners/telemetry" + target = "integrations/${aws_apigatewayv2_integration.ham_get_listener_telemetry.id}" } resource "aws_apigatewayv2_integration" "ham_get_listener_telemetry" { - api_id = aws_apigatewayv2_api.main.id - connection_type = "INTERNET" - integration_method = "POST" - integration_type = "AWS_PROXY" - integration_uri = aws_lambda_function.ham_get_listener_telemetry.arn - timeout_milliseconds = 30000 - payload_format_version = "2.0" + api_id = aws_apigatewayv2_api.main.id + connection_type = "INTERNET" + integration_method = "POST" + integration_type = "AWS_PROXY" + integration_uri = aws_lambda_function.ham_get_listener_telemetry.arn + timeout_milliseconds = 30000 + payload_format_version = "2.0" } \ No newline at end of file diff --git a/lambda/ham_predict_updater/__init__.py b/lambda/ham_predict_updater/__init__.py index 084d42c..87cd07f 100644 --- a/lambda/ham_predict_updater/__init__.py +++ b/lambda/ham_predict_updater/__init__.py @@ -1,5 +1,6 @@ import json from datetime import datetime +from datetime import timedelta import http.client import math import logging @@ -29,18 +30,44 @@ LAUNCH_ALLOCATE_RANGE_MAX = 30000 # metres LAUNCH_ALLOCATE_RANGE_SCALING = 1.5 # Scaling factor - launch allocation range is min(current alt * this value , launch allocate range max) # Do not run predictions if the ascent or descent rate is less than this value -ASCENT_RATE_THRESHOLD = 0.8 +ASCENT_RATE_THRESHOLD = 0.5 -def flight_profile_by_type(sonde_type): - """ - Determine the appropriate flight profile based on radiosonde type - """ +def get_flight_docs(): + path = "flight-doc/_search" + payload = { + "aggs": { + "payload_callsign": { + "terms": { + "field": "payload_callsign.keyword", + "order": { + "_key": "desc" + }, + "size": 10000 + }, + "aggs": { + "flight_doc": { + "top_hits": { + "_source": True, + "size": 1, + "sort": [ + { + "datetime": { + "order": "desc" + } + } + ] + } + } + } + } + }, + "size": 0 + } + logging.debug("Start ES Request") + results = es.request(json.dumps(payload), path, "POST") + logging.debug("Finished ES Request") + return { x['key'] : x['flight_doc']['hits']['hits'][0]['_source'] for x in results['aggregations']['payload_callsign']['buckets']} - for _def_type in SONDE_TYPE_PREDICT_DEFAULTS: - if _def_type in sonde_type: - return SONDE_TYPE_PREDICT_DEFAULTS[_def_type].copy() - - return PREDICT_DEFAULTS.copy() def getDensity(altitude): @@ -201,6 +228,55 @@ def position_info(listener, balloon): "elevation_radians": elevation, } +def get_float_prediction(timestamp, latitude, longitude, altitude, current_rate=5.0, ascent_rate=PREDICT_DEFAULTS['ascent_rate'], burst_altitude=PREDICT_DEFAULTS['burst_altitude']): + """ + Request a float flight path prediction from Tawhiri. + Notes: + - The burst_altitude must be higher than the current altitude. + - Longitude is in the range 0-360.0 + - All ascent/descent rates must be positive. + """ + + + # Shift longitude into the appropriate range for Tawhiri + if longitude < 0: + longitude += 360.0 + + # Generate the prediction URL + url = f"/api/v1/?launch_altitude={altitude}&launch_latitude={latitude}&launch_longitude={longitude}&launch_datetime={timestamp}&float_altitude={burst_altitude:.2f}&stop_time={(datetime.now() + timedelta(days=1)).isoformat()}Z&ascent_rate={ascent_rate:.2f}&profile=float_profile" + logging.debug(url) + conn = http.client.HTTPSConnection("tawhiri.v2.sondehub.org") + conn.request("GET", url) + res = conn.getresponse() + data = res.read() + + if res.code != 200: + logging.debug(data) + return None + + pred_data = json.loads(data.decode("utf-8")) + + path = [] + + if 'prediction' in pred_data: + for stage in pred_data['prediction']: + # Probably don't need to worry about this, it should only result in one or two points + # in 'ascent'. + if stage['stage'] == 'ascent' and current_rate < 0: # ignore ascent stage if we have already burst + continue + else: + for item in stage['trajectory']: + path.append({ + "time": int(datetime.fromisoformat(item['datetime'].split(".")[0].replace("Z","")).timestamp()), + "lat": item['latitude'], + "lon": item['longitude'] - 360 if item['longitude'] > 180 else item['longitude'], + "alt": item['altitude'], + }) + + pred_data['path'] = path + return pred_data + else: + return None def get_standard_prediction(timestamp, latitude, longitude, altitude, current_rate=5.0, ascent_rate=PREDICT_DEFAULTS['ascent_rate'], burst_altitude=PREDICT_DEFAULTS['burst_altitude'], descent_rate=PREDICT_DEFAULTS['descent_rate']): """ @@ -261,6 +337,7 @@ def get_standard_prediction(timestamp, latitude, longitude, altitude, current_ra + def bulk_upload_es(index_prefix,payloads): body="" for payload in payloads: @@ -282,6 +359,8 @@ def predict(event, context): return result async def predict_async(event, context): + flight_docs = get_flight_docs() + sem = asyncio.Semaphore(5) path = "ham-telm-*/_search" interval = 60 # because some aprs balloons are only every minute @@ -445,7 +524,7 @@ async def predict_async(event, context): logging.debug("Start Predict") jobs=[] for serial in serials: - jobs.append(run_predictions_for_serial(sem, serial, serials[serial])) + jobs.append(run_predictions_for_serial(sem, flight_docs, serial, serials[serial])) output = await asyncio.gather(*jobs) for data in output: if data: @@ -489,61 +568,113 @@ async def predict_async(event, context): -async def run_predictions_for_serial(sem, serial, value): +async def run_predictions_for_serial(sem, flight_docs, serial, value): async with sem: loop = asyncio.get_event_loop() - # - # Flight Profile selection - # - # Fallback Option - use flight profile data based on sonde type. - _flight_profile = flight_profile_by_type("") - - #print(value) - #print(_flight_profile) - logging.debug(f"Running prediction for {serial} using flight profile {str(_flight_profile)}.") - - # skip when close to 0. - if value['rate'] < ASCENT_RATE_THRESHOLD and value['rate'] > -ASCENT_RATE_THRESHOLD: - logging.debug(f"Skipping {serial} due to ascent rate limiting.") - return False - - # Determine current ascent rate - # If the value is < 0.5 (e.g. we are on descent, or not moving), we just use a default value. - ascent_rate=value['rate'] if value['rate'] > ASCENT_RATE_THRESHOLD else _flight_profile['ascent_rate'] + logging.debug(f"Starting prediction for {serial}") + try: + _flight_doc = flight_docs[serial] + logging.debug(f"Found flight doc for {serial}") + except: + logging.debug(f"Using default flight doc for {serial}") + _flight_doc = { + "float_expected": False, + "peak_altitude": 30000, + "descent_rate": 5 + } - # If we are on descent, estimate the sea-level descent rate from the current descent rate - # Otherwise, use the flight profile descent rate - descent_rate= seaLevelDescentRate(abs(value['rate']),value['alt']) if value['rate'] < 0 else _flight_profile['descent_rate'] - - # If the resultant sea-level descent rate is very small, it means we're probably landed - # so dont run a prediction for this sonde. - if descent_rate < ASCENT_RATE_THRESHOLD: - return False - - # Now to determine the burst altitude - if value['rate'] < 0: - # On descent (rate < 0), we need to set the burst altitude just higher than our current altitude for - # the predictor to be happy - burst_altitude = value['alt']+0.05 - else: - # Otherwise, on ascent we either use the expected burst altitude, or we - # add a little bit on to our current altitude. - burst_altitude = (value['alt']+0.05) if value['alt'] > _flight_profile['burst_altitude'] else _flight_profile['burst_altitude'] - longitude = float(value['position'][1].strip()) latitude = float(value['position'][0].strip()) - #print(f"Prediction Parameters for {serial} at {latitude}, {longitude}, {value['alt']}: {ascent_rate}/{burst_altitude}/{descent_rate}") + + # Now to determine the burst altitude + if value['rate'] > 0: + if value['alt'] > _flight_doc['peak_altitude']: + burst_altitude = value['alt']+0.05 # balloon past the prediction - use just a little bit higher + else: + burst_altitude = _flight_doc['peak_altitude'] + else: #descending + burst_altitude = value['alt']+0.05 - # Run prediction! This will return None if there is an error - return [serial, await loop.run_in_executor(None, functools.partial(get_standard_prediction, - value['time'], - latitude, - longitude, - value['alt'], - current_rate=value['rate'], - ascent_rate=ascent_rate, - burst_altitude=burst_altitude, - descent_rate=descent_rate - ))] + logging.debug(f"Burst alt for {serial} is {burst_altitude}") + + # balloon on descent + if value['rate'] < -ASCENT_RATE_THRESHOLD: + logging.debug(f"{serial} running normal descent profile") + descent_rate= seaLevelDescentRate(abs(value['rate']),value['alt']) + return [ + serial, + await loop.run_in_executor( + None, + functools.partial( + get_standard_prediction, + value['time'], + latitude, + longitude, + value['alt'], + current_rate=value['rate'], + ascent_rate=5, # this doesn't matter because we are burst + burst_altitude=burst_altitude, + descent_rate=descent_rate + ) + ) + ] + if _flight_doc["float_expected"]: + if abs(value['rate']) < ASCENT_RATE_THRESHOLD: + logging.debug(f"{serial} running float profile") + return [ + serial, + await loop.run_in_executor( + None, + functools.partial( + get_float_prediction, + value['time'], + latitude, + longitude, + value['alt'], + current_rate=value['rate'], + ascent_rate=1, # this doesn't matter because we are floating + burst_altitude=burst_altitude, + ) + ) + ] + else: # ascending to float + logging.debug(f"{serial} running ascending float profile") + if abs(value['rate']) > ASCENT_RATE_THRESHOLD: + return [ + serial, + await loop.run_in_executor( + None, + functools.partial( + get_float_prediction, + value['time'], + latitude, + longitude, + value['alt'], + current_rate=value['rate'], + ascent_rate=value['rate'], + burst_altitude=burst_altitude, + ) + ) + ] + + logging.debug(f"{serial} running standard ascent profile") + # standard balloon on ascent + return [ + serial, + await loop.run_in_executor( + None, + functools.partial( + get_standard_prediction, + value['time'], + latitude, + longitude, + value['alt'], + current_rate=value['rate'], + ascent_rate=value['rate'], + burst_altitude=burst_altitude, + descent_rate=_flight_doc['descent_rate'] + ) + ) + ]