diff --git a/lambda/predict_updater/__init__.py b/lambda/predict_updater/__init__.py index 6e3d841..c2d8946 100644 --- a/lambda/predict_updater/__init__.py +++ b/lambda/predict_updater/__init__.py @@ -5,7 +5,8 @@ import math import logging from math import radians, degrees, sin, cos, atan2, sqrt, pi import es - +import asyncio +import functools # FLIGHT PROFILE DEFAULTS @@ -240,7 +241,7 @@ def compare_launch_sites(sites, launch_estimate, altitude=0): return None -def get_standard_prediction(conn, timestamp, latitude, longitude, altitude, current_rate=5.0, ascent_rate=PREDICT_DEFAULTS['ascent_rate'], burst_altitude=PREDICT_DEFAULTS['burst_altitude'], descent_rate=PREDICT_DEFAULTS['descent_rate']): +def get_standard_prediction(timestamp, latitude, longitude, altitude, current_rate=5.0, ascent_rate=PREDICT_DEFAULTS['ascent_rate'], burst_altitude=PREDICT_DEFAULTS['burst_altitude'], descent_rate=PREDICT_DEFAULTS['descent_rate']): """ Request a standard flight path prediction from Tawhiri. Notes: @@ -264,6 +265,7 @@ def get_standard_prediction(conn, timestamp, latitude, longitude, altitude, curr # Generate the prediction URL url = f"/api/v1/?launch_latitude={latitude}&launch_longitude={longitude}&launch_datetime={timestamp}&launch_altitude={altitude:.2f}&ascent_rate={ascent_rate:.2f}&burst_altitude={burst_altitude:.2f}&descent_rate={descent_rate:.2f}" logging.debug(url) + conn = http.client.HTTPSConnection("tawhiri.v2.sondehub.org") conn.request("GET", url) res = conn.getresponse() data = res.read() @@ -297,7 +299,7 @@ def get_standard_prediction(conn, timestamp, latitude, longitude, altitude, curr return None -def get_launch_estimate(conn, timestamp, latitude, longitude, altitude, ascent_rate=PREDICT_DEFAULTS['ascent_rate'], current_rate=5.0): +async def get_launch_estimate(timestamp, latitude, longitude, altitude, ascent_rate=PREDICT_DEFAULTS['ascent_rate'], current_rate=5.0): """ Estimate the launch site of a sonde based on a current ascent position. @@ -320,6 +322,7 @@ def get_launch_estimate(conn, timestamp, latitude, longitude, altitude, ascent_r # Generate the prediction URL url = f"/api/v1/?profile=reverse_profile&launch_latitude={latitude}&launch_longitude={longitude}&launch_datetime={timestamp}&launch_altitude={altitude:.2f}&ascent_rate={ascent_rate:.2f}" logging.debug(url) + conn = http.client.HTTPSConnection("tawhiri.v2.sondehub.org") conn.request("GET", url) res = conn.getresponse() data = res.read() @@ -424,6 +427,11 @@ def bulk_upload_es(index_prefix,payloads): raise RuntimeError def predict(event, context): + # Use asyncio.run to synchronously "await" an async function + result = asyncio.run(predict_async(event, context)) + return result + +async def predict_async(event, context): path = "telm-*/_search" payload = { "aggs": { @@ -583,151 +591,18 @@ def predict(event, context): reverse_predictions = get_reverse_predictions() - conn = http.client.HTTPSConnection("tawhiri.v2.sondehub.org") serial_data={} reverse_serial_data = {} logging.debug("Start Predict") + jobs=[] for serial in serials: - - value = serials[serial] - - # - # Flight Profile selection - # - # Fallback Option - use flight profile data based on sonde type. - _flight_profile = flight_profile_by_type(value['type']) + jobs.append(run_predictions_for_serial(serial, serials[serial], reverse_predictions, launch_sites)) + output = await asyncio.gather(*jobs) + for data in output: + if data: + serial_data[data[0]] = data[1] - # Check if we have already run a reverse prediction on this serial - if serial in reverse_predictions: - logging.debug(f"Found reverse prediction for {serial}.") - _rev_pred = reverse_predictions[serial] - - #print(_rev_pred) - - if 'launch_site' in _rev_pred: - # This serial number has been assigned to a launch site! - # Grab the launch site information - _site_info = launch_sites[_rev_pred['launch_site']] - - # If we have flight profile data, update the default flight profile - if 'ascent_rate' in _site_info: - _flight_profile['ascent_rate'] = _site_info['ascent_rate'] - - if 'burst_altitude' in _site_info: - _flight_profile['burst_altitude'] = _site_info['burst_altitude'] - - if 'descent_rate' in _site_info: - _flight_profile['descent_rate'] = _site_info['descent_rate'] - - logging.debug(f"{serial} - Using Flight Profile data for Launch site: {_site_info['station_name']}") - else: - # No launch site was allocated... - # TODO - Try again? - pass - - else: - # No reverse prediction data! - # We can only run a reverse prediction with a sonde on ascent. - #print(f"{serial}: {value['rate']}") - if value['rate'] > 0.5: - - # Try and run a reverse prediction - logging.info(f"Running reverse predict for {serial}") - - longitude = float(value['position'][1].strip()) - latitude = float(value['position'][0].strip()) - - _rev_pred = get_launch_estimate( - conn, - value['time'], - latitude, - longitude, - value['alt'], - current_rate=value['rate'], - ascent_rate=value['rate'], - ) - - if _rev_pred: - - # Attempt to find a launch site near to the launch estimate. - _launch_estimate = [_rev_pred['launch_estimate']['latitude'], _rev_pred['launch_estimate']['longitude'], _rev_pred['launch_estimate']['altitude']] - _alloc_site = compare_launch_sites(launch_sites, _launch_estimate, value['alt']) - - if _alloc_site: - # We have found the launch site! - # {'site':_site, 'range': launch_site_range} - logging.info(f"Allocated {serial} to launch site {launch_sites[_alloc_site['site']]['station_name']} ({_alloc_site['site']}) with range {_alloc_site['range']:.1f}.") - - # Add launch site into the prediction data - _rev_pred['launch_site'] = _alloc_site['site'] - _rev_pred['launch_site_range_estimate'] = _alloc_site['range'] - - # If we have flight profile data, update the default flight profile - _site_info = launch_sites[_alloc_site['site']] - if 'ascent_rate' in _site_info: - _flight_profile['ascent_rate'] = _site_info['ascent_rate'] - - if 'burst_altitude' in _site_info: - _flight_profile['burst_altitude'] = _site_info['burst_altitude'] - - if 'descent_rate' in _site_info: - _flight_profile['descent_rate'] = _site_info['descent_rate'] - - - # Add to dict for upload later. - reverse_serial_data[serial] = _rev_pred - - else: - # Launch estimate prediction failed. - pass - - - - #print(value) - #print(_flight_profile) - logging.debug(f"Running prediction for {serial} using flight profile {str(_flight_profile)}.") - - # Determine current ascent rate - # If the value is < 0.5 (e.g. we are on descent, or not moving), we just use a default value. - ascent_rate=value['rate'] if value['rate'] > 0.5 else _flight_profile['ascent_rate'] - - # If we are on descent, estimate the sea-level descent rate from the current descent rate - # Otherwise, use the flight profile descent rate - descent_rate= seaLevelDescentRate(abs(value['rate']),value['alt']) if value['rate'] < 0 else _flight_profile['descent_rate'] - - # If the resultant sea-level descent rate is very small, it means we're probably landed - # so dont run a prediction for this sonde. - if descent_rate < 0.5: - continue - - # Now to determine the burst altitude - if value['rate'] < 0: - # On descent (rate < 0), we need to set the burst altitude just higher than our current altitude for - # the predictor to be happy - burst_altitude = value['alt']+0.05 - else: - # Otherwise, on ascent we either use the expected burst altitude, or we - # add a little bit on to our current altitude. - burst_altitude = (value['alt']+0.05) if value['alt'] > _flight_profile['burst_altitude'] else _flight_profile['burst_altitude'] - - longitude = float(value['position'][1].strip()) - latitude = float(value['position'][0].strip()) - - #print(f"Prediction Parameters for {serial} at {latitude}, {longitude}, {value['alt']}: {ascent_rate}/{burst_altitude}/{descent_rate}") - - # Run prediction! This will return None if there is an error - serial_data[serial] = get_standard_prediction( - conn, - value['time'], - latitude, - longitude, - value['alt'], - current_rate=value['rate'], - ascent_rate=ascent_rate, - burst_altitude=burst_altitude, - descent_rate=descent_rate - ) @@ -799,3 +674,143 @@ def predict(event, context): +async def run_predictions_for_serial(serial, value, reverse_predictions, launch_sites): + loop = asyncio.get_event_loop() + # + # Flight Profile selection + # + # Fallback Option - use flight profile data based on sonde type. + _flight_profile = flight_profile_by_type(value['type']) + + + # Check if we have already run a reverse prediction on this serial + if serial in reverse_predictions: + logging.debug(f"Found reverse prediction for {serial}.") + _rev_pred = reverse_predictions[serial] + + #print(_rev_pred) + + if 'launch_site' in _rev_pred: + # This serial number has been assigned to a launch site! + # Grab the launch site information + _site_info = launch_sites[_rev_pred['launch_site']] + + # If we have flight profile data, update the default flight profile + if 'ascent_rate' in _site_info: + _flight_profile['ascent_rate'] = _site_info['ascent_rate'] + + if 'burst_altitude' in _site_info: + _flight_profile['burst_altitude'] = _site_info['burst_altitude'] + + if 'descent_rate' in _site_info: + _flight_profile['descent_rate'] = _site_info['descent_rate'] + + logging.debug(f"{serial} - Using Flight Profile data for Launch site: {_site_info['station_name']}") + else: + # No launch site was allocated... + # TODO - Try again? + pass + + else: + # No reverse prediction data! + # We can only run a reverse prediction with a sonde on ascent. + #print(f"{serial}: {value['rate']}") + if value['rate'] > 0.5: + + # Try and run a reverse prediction + logging.info(f"Running reverse predict for {serial}") + + longitude = float(value['position'][1].strip()) + latitude = float(value['position'][0].strip()) + + + + _rev_pred = await loop.run_in_executor(None, functools.partial(get_launch_estimate, + value['time'], + latitude, + longitude, + value['alt'], + current_rate=value['rate'], + ascent_rate=value['rate'], + )) + + if _rev_pred: + + # Attempt to find a launch site near to the launch estimate. + _launch_estimate = [_rev_pred['launch_estimate']['latitude'], _rev_pred['launch_estimate']['longitude'], _rev_pred['launch_estimate']['altitude']] + _alloc_site = compare_launch_sites(launch_sites, _launch_estimate, value['alt']) + + if _alloc_site: + # We have found the launch site! + # {'site':_site, 'range': launch_site_range} + logging.info(f"Allocated {serial} to launch site {launch_sites[_alloc_site['site']]['station_name']} ({_alloc_site['site']}) with range {_alloc_site['range']:.1f}.") + + # Add launch site into the prediction data + _rev_pred['launch_site'] = _alloc_site['site'] + _rev_pred['launch_site_range_estimate'] = _alloc_site['range'] + + # If we have flight profile data, update the default flight profile + _site_info = launch_sites[_alloc_site['site']] + if 'ascent_rate' in _site_info: + _flight_profile['ascent_rate'] = _site_info['ascent_rate'] + + if 'burst_altitude' in _site_info: + _flight_profile['burst_altitude'] = _site_info['burst_altitude'] + + if 'descent_rate' in _site_info: + _flight_profile['descent_rate'] = _site_info['descent_rate'] + + + # Add to dict for upload later. + reverse_serial_data[serial] = _rev_pred + + else: + # Launch estimate prediction failed. + pass + + + + #print(value) + #print(_flight_profile) + logging.debug(f"Running prediction for {serial} using flight profile {str(_flight_profile)}.") + + # Determine current ascent rate + # If the value is < 0.5 (e.g. we are on descent, or not moving), we just use a default value. + ascent_rate=value['rate'] if value['rate'] > 0.5 else _flight_profile['ascent_rate'] + + # If we are on descent, estimate the sea-level descent rate from the current descent rate + # Otherwise, use the flight profile descent rate + descent_rate= seaLevelDescentRate(abs(value['rate']),value['alt']) if value['rate'] < 0 else _flight_profile['descent_rate'] + + # If the resultant sea-level descent rate is very small, it means we're probably landed + # so dont run a prediction for this sonde. + if descent_rate < 0.5: + return False + + # Now to determine the burst altitude + if value['rate'] < 0: + # On descent (rate < 0), we need to set the burst altitude just higher than our current altitude for + # the predictor to be happy + burst_altitude = value['alt']+0.05 + else: + # Otherwise, on ascent we either use the expected burst altitude, or we + # add a little bit on to our current altitude. + burst_altitude = (value['alt']+0.05) if value['alt'] > _flight_profile['burst_altitude'] else _flight_profile['burst_altitude'] + + longitude = float(value['position'][1].strip()) + latitude = float(value['position'][0].strip()) + + #print(f"Prediction Parameters for {serial} at {latitude}, {longitude}, {value['alt']}: {ascent_rate}/{burst_altitude}/{descent_rate}") + + # Run prediction! This will return None if there is an error + return [serial, await loop.run_in_executor(None, functools.partial(get_standard_prediction, + value['time'], + latitude, + longitude, + value['alt'], + current_rate=value['rate'], + ascent_rate=ascent_rate, + burst_altitude=burst_altitude, + descent_rate=descent_rate + ))] + diff --git a/predictor.tf b/predictor.tf index bc9ebae..8b8fd3a 100644 --- a/predictor.tf +++ b/predictor.tf @@ -67,7 +67,7 @@ resource "aws_lambda_function" "predict_updater" { s3_key = aws_s3_bucket_object.lambda.key source_code_hash = data.archive_file.lambda.output_base64sha256 publish = true - memory_size = 1024 + memory_size = 512 role = aws_iam_role.predict_updater.arn runtime = "python3.9" architectures = ["arm64"]