Use flightdoc if avaliable (#73)

* Use flightdoc if avaliable

* Terraform fmt [skip ci]

Co-authored-by: xss <michaela@michaela.lgbt>
Co-authored-by: TheSkorm <TheSkorm@users.noreply.github.com>
This commit is contained in:
Michaela Wheeler 2022-05-05 18:06:02 +10:00 committed by GitHub
parent c6d5ea17f8
commit 2923f30654
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 213 additions and 82 deletions

18
es.tf
View File

@ -109,8 +109,8 @@ resource "aws_cognito_identity_pool" "CognitoIdentityPool" {
}
cognito_identity_providers {
client_id = "u3ggvo1spp1e6cffbietq7fbm"
provider_name = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM"
client_id = "u3ggvo1spp1e6cffbietq7fbm"
provider_name = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM"
}
}
@ -127,16 +127,16 @@ resource "aws_cognito_identity_pool_roles_attachment" "CognitoIdentityPoolRoleAt
type = "Token"
}
role_mapping {
ambiguous_role_resolution = "AuthenticatedRole"
identity_provider = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM:227g2bbcb2tqjfii1ipt2tj5m6"
type = "Token"
ambiguous_role_resolution = "AuthenticatedRole"
identity_provider = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM:227g2bbcb2tqjfii1ipt2tj5m6"
type = "Token"
}
role_mapping {
ambiguous_role_resolution = "AuthenticatedRole"
identity_provider = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM:u3ggvo1spp1e6cffbietq7fbm"
type = "Token"
}
ambiguous_role_resolution = "AuthenticatedRole"
identity_provider = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM:u3ggvo1spp1e6cffbietq7fbm"
type = "Token"
}
role_mapping {
ambiguous_role_resolution = "AuthenticatedRole"
identity_provider = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM:7v892rnrta8ms785pl0aaqo8ke"

View File

@ -130,20 +130,20 @@ resource "aws_lambda_permission" "ham_get_listener_telemetry" {
}
resource "aws_apigatewayv2_route" "ham_get_listener_telemetry" {
api_id = aws_apigatewayv2_api.main.id
api_key_required = false
authorization_type = "NONE"
route_key = "GET /amateur/listeners/telemetry"
target = "integrations/${aws_apigatewayv2_integration.ham_get_listener_telemetry.id}"
api_id = aws_apigatewayv2_api.main.id
api_key_required = false
authorization_type = "NONE"
route_key = "GET /amateur/listeners/telemetry"
target = "integrations/${aws_apigatewayv2_integration.ham_get_listener_telemetry.id}"
}
resource "aws_apigatewayv2_integration" "ham_get_listener_telemetry" {
api_id = aws_apigatewayv2_api.main.id
connection_type = "INTERNET"
integration_method = "POST"
integration_type = "AWS_PROXY"
integration_uri = aws_lambda_function.ham_get_listener_telemetry.arn
timeout_milliseconds = 30000
payload_format_version = "2.0"
api_id = aws_apigatewayv2_api.main.id
connection_type = "INTERNET"
integration_method = "POST"
integration_type = "AWS_PROXY"
integration_uri = aws_lambda_function.ham_get_listener_telemetry.arn
timeout_milliseconds = 30000
payload_format_version = "2.0"
}

View File

@ -1,5 +1,6 @@
import json
from datetime import datetime
from datetime import timedelta
import http.client
import math
import logging
@ -29,18 +30,44 @@ LAUNCH_ALLOCATE_RANGE_MAX = 30000 # metres
LAUNCH_ALLOCATE_RANGE_SCALING = 1.5 # Scaling factor - launch allocation range is min(current alt * this value , launch allocate range max)
# Do not run predictions if the ascent or descent rate is less than this value
ASCENT_RATE_THRESHOLD = 0.8
ASCENT_RATE_THRESHOLD = 0.5
def flight_profile_by_type(sonde_type):
"""
Determine the appropriate flight profile based on radiosonde type
"""
def get_flight_docs():
path = "flight-doc/_search"
payload = {
"aggs": {
"payload_callsign": {
"terms": {
"field": "payload_callsign.keyword",
"order": {
"_key": "desc"
},
"size": 10000
},
"aggs": {
"flight_doc": {
"top_hits": {
"_source": True,
"size": 1,
"sort": [
{
"datetime": {
"order": "desc"
}
}
]
}
}
}
}
},
"size": 0
}
logging.debug("Start ES Request")
results = es.request(json.dumps(payload), path, "POST")
logging.debug("Finished ES Request")
return { x['key'] : x['flight_doc']['hits']['hits'][0]['_source'] for x in results['aggregations']['payload_callsign']['buckets']}
for _def_type in SONDE_TYPE_PREDICT_DEFAULTS:
if _def_type in sonde_type:
return SONDE_TYPE_PREDICT_DEFAULTS[_def_type].copy()
return PREDICT_DEFAULTS.copy()
def getDensity(altitude):
@ -201,6 +228,55 @@ def position_info(listener, balloon):
"elevation_radians": elevation,
}
def get_float_prediction(timestamp, latitude, longitude, altitude, current_rate=5.0, ascent_rate=PREDICT_DEFAULTS['ascent_rate'], burst_altitude=PREDICT_DEFAULTS['burst_altitude']):
"""
Request a float flight path prediction from Tawhiri.
Notes:
- The burst_altitude must be higher than the current altitude.
- Longitude is in the range 0-360.0
- All ascent/descent rates must be positive.
"""
# Shift longitude into the appropriate range for Tawhiri
if longitude < 0:
longitude += 360.0
# Generate the prediction URL
url = f"/api/v1/?launch_altitude={altitude}&launch_latitude={latitude}&launch_longitude={longitude}&launch_datetime={timestamp}&float_altitude={burst_altitude:.2f}&stop_time={(datetime.now() + timedelta(days=1)).isoformat()}Z&ascent_rate={ascent_rate:.2f}&profile=float_profile"
logging.debug(url)
conn = http.client.HTTPSConnection("tawhiri.v2.sondehub.org")
conn.request("GET", url)
res = conn.getresponse()
data = res.read()
if res.code != 200:
logging.debug(data)
return None
pred_data = json.loads(data.decode("utf-8"))
path = []
if 'prediction' in pred_data:
for stage in pred_data['prediction']:
# Probably don't need to worry about this, it should only result in one or two points
# in 'ascent'.
if stage['stage'] == 'ascent' and current_rate < 0: # ignore ascent stage if we have already burst
continue
else:
for item in stage['trajectory']:
path.append({
"time": int(datetime.fromisoformat(item['datetime'].split(".")[0].replace("Z","")).timestamp()),
"lat": item['latitude'],
"lon": item['longitude'] - 360 if item['longitude'] > 180 else item['longitude'],
"alt": item['altitude'],
})
pred_data['path'] = path
return pred_data
else:
return None
def get_standard_prediction(timestamp, latitude, longitude, altitude, current_rate=5.0, ascent_rate=PREDICT_DEFAULTS['ascent_rate'], burst_altitude=PREDICT_DEFAULTS['burst_altitude'], descent_rate=PREDICT_DEFAULTS['descent_rate']):
"""
@ -261,6 +337,7 @@ def get_standard_prediction(timestamp, latitude, longitude, altitude, current_ra
def bulk_upload_es(index_prefix,payloads):
body=""
for payload in payloads:
@ -282,6 +359,8 @@ def predict(event, context):
return result
async def predict_async(event, context):
flight_docs = get_flight_docs()
sem = asyncio.Semaphore(5)
path = "ham-telm-*/_search"
interval = 60 # because some aprs balloons are only every minute
@ -445,7 +524,7 @@ async def predict_async(event, context):
logging.debug("Start Predict")
jobs=[]
for serial in serials:
jobs.append(run_predictions_for_serial(sem, serial, serials[serial]))
jobs.append(run_predictions_for_serial(sem, flight_docs, serial, serials[serial]))
output = await asyncio.gather(*jobs)
for data in output:
if data:
@ -489,61 +568,113 @@ async def predict_async(event, context):
async def run_predictions_for_serial(sem, serial, value):
async def run_predictions_for_serial(sem, flight_docs, serial, value):
async with sem:
loop = asyncio.get_event_loop()
#
# Flight Profile selection
#
# Fallback Option - use flight profile data based on sonde type.
_flight_profile = flight_profile_by_type("")
#print(value)
#print(_flight_profile)
logging.debug(f"Running prediction for {serial} using flight profile {str(_flight_profile)}.")
# skip when close to 0.
if value['rate'] < ASCENT_RATE_THRESHOLD and value['rate'] > -ASCENT_RATE_THRESHOLD:
logging.debug(f"Skipping {serial} due to ascent rate limiting.")
return False
# Determine current ascent rate
# If the value is < 0.5 (e.g. we are on descent, or not moving), we just use a default value.
ascent_rate=value['rate'] if value['rate'] > ASCENT_RATE_THRESHOLD else _flight_profile['ascent_rate']
logging.debug(f"Starting prediction for {serial}")
try:
_flight_doc = flight_docs[serial]
logging.debug(f"Found flight doc for {serial}")
except:
logging.debug(f"Using default flight doc for {serial}")
_flight_doc = {
"float_expected": False,
"peak_altitude": 30000,
"descent_rate": 5
}
# If we are on descent, estimate the sea-level descent rate from the current descent rate
# Otherwise, use the flight profile descent rate
descent_rate= seaLevelDescentRate(abs(value['rate']),value['alt']) if value['rate'] < 0 else _flight_profile['descent_rate']
# If the resultant sea-level descent rate is very small, it means we're probably landed
# so dont run a prediction for this sonde.
if descent_rate < ASCENT_RATE_THRESHOLD:
return False
# Now to determine the burst altitude
if value['rate'] < 0:
# On descent (rate < 0), we need to set the burst altitude just higher than our current altitude for
# the predictor to be happy
burst_altitude = value['alt']+0.05
else:
# Otherwise, on ascent we either use the expected burst altitude, or we
# add a little bit on to our current altitude.
burst_altitude = (value['alt']+0.05) if value['alt'] > _flight_profile['burst_altitude'] else _flight_profile['burst_altitude']
longitude = float(value['position'][1].strip())
latitude = float(value['position'][0].strip())
#print(f"Prediction Parameters for {serial} at {latitude}, {longitude}, {value['alt']}: {ascent_rate}/{burst_altitude}/{descent_rate}")
# Now to determine the burst altitude
if value['rate'] > 0:
if value['alt'] > _flight_doc['peak_altitude']:
burst_altitude = value['alt']+0.05 # balloon past the prediction - use just a little bit higher
else:
burst_altitude = _flight_doc['peak_altitude']
else: #descending
burst_altitude = value['alt']+0.05
# Run prediction! This will return None if there is an error
return [serial, await loop.run_in_executor(None, functools.partial(get_standard_prediction,
value['time'],
latitude,
longitude,
value['alt'],
current_rate=value['rate'],
ascent_rate=ascent_rate,
burst_altitude=burst_altitude,
descent_rate=descent_rate
))]
logging.debug(f"Burst alt for {serial} is {burst_altitude}")
# balloon on descent
if value['rate'] < -ASCENT_RATE_THRESHOLD:
logging.debug(f"{serial} running normal descent profile")
descent_rate= seaLevelDescentRate(abs(value['rate']),value['alt'])
return [
serial,
await loop.run_in_executor(
None,
functools.partial(
get_standard_prediction,
value['time'],
latitude,
longitude,
value['alt'],
current_rate=value['rate'],
ascent_rate=5, # this doesn't matter because we are burst
burst_altitude=burst_altitude,
descent_rate=descent_rate
)
)
]
if _flight_doc["float_expected"]:
if abs(value['rate']) < ASCENT_RATE_THRESHOLD:
logging.debug(f"{serial} running float profile")
return [
serial,
await loop.run_in_executor(
None,
functools.partial(
get_float_prediction,
value['time'],
latitude,
longitude,
value['alt'],
current_rate=value['rate'],
ascent_rate=1, # this doesn't matter because we are floating
burst_altitude=burst_altitude,
)
)
]
else: # ascending to float
logging.debug(f"{serial} running ascending float profile")
if abs(value['rate']) > ASCENT_RATE_THRESHOLD:
return [
serial,
await loop.run_in_executor(
None,
functools.partial(
get_float_prediction,
value['time'],
latitude,
longitude,
value['alt'],
current_rate=value['rate'],
ascent_rate=value['rate'],
burst_altitude=burst_altitude,
)
)
]
logging.debug(f"{serial} running standard ascent profile")
# standard balloon on ascent
return [
serial,
await loop.run_in_executor(
None,
functools.partial(
get_standard_prediction,
value['time'],
latitude,
longitude,
value['alt'],
current_rate=value['rate'],
ascent_rate=value['rate'],
burst_altitude=burst_altitude,
descent_rate=_flight_doc['descent_rate']
)
)
]