add listener telem endpoint

This commit is contained in:
Michaela 2021-07-21 19:54:09 +10:00
parent 1e698e7403
commit 197b1ce649
3 changed files with 252 additions and 35 deletions

45
main.tf
View File

@ -685,6 +685,27 @@ resource "aws_lambda_function" "get_telem" {
]
}
resource "aws_lambda_function" "get_listener_telemetry" {
function_name = "get_listener_telemetry"
handler = "lambda_function.get_listener_telemetry"
filename = "${path.module}/build/query.zip"
source_code_hash = data.archive_file.query.output_base64sha256
publish = true
memory_size = 256
role = aws_iam_role.IAMRole5.arn
runtime = "python3.7"
timeout = 30
layers = [
"arn:aws:lambda:us-east-1:${data.aws_caller_identity.current.account_id}:layer:xray-python:1",
"arn:aws:lambda:us-east-1:${data.aws_caller_identity.current.account_id}:layer:iot:3"
]
environment {
variables = {
"ES" = "es.${local.domain_name}"
}
}
}
resource "aws_lambda_function" "sign_socket" {
function_name = "sign-websocket"
handler = "lambda_function.lambda_handler"
@ -778,6 +799,12 @@ resource "aws_lambda_permission" "get_telem" {
principal = "apigateway.amazonaws.com"
source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.ApiGatewayV2Api.id}/*/*/sondes/telemetry"
}
resource "aws_lambda_permission" "get_listener_telemetry" {
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.get_listener_telemetry.arn
principal = "apigateway.amazonaws.com"
source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.ApiGatewayV2Api.id}/*/*/listeners/telemetry"
}
resource "aws_lambda_permission" "LambdaPermission2" {
action = "lambda:InvokeFunction"
@ -947,6 +974,14 @@ resource "aws_apigatewayv2_route" "get_telem" {
target = "integrations/${aws_apigatewayv2_integration.get_telem.id}"
}
resource "aws_apigatewayv2_route" "get_listener_telemetry" {
api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id
api_key_required = false
authorization_type = "NONE"
route_key = "GET /listeners/telemetry"
target = "integrations/${aws_apigatewayv2_integration.get_listener_telemetry.id}"
}
resource "aws_apigatewayv2_integration" "sign_socket" {
api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id
connection_type = "INTERNET"
@ -1017,6 +1052,16 @@ resource "aws_apigatewayv2_integration" "get_telem" {
payload_format_version = "2.0"
}
resource "aws_apigatewayv2_integration" "get_listener_telemetry" {
api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id
connection_type = "INTERNET"
integration_method = "POST"
integration_type = "AWS_PROXY"
integration_uri = aws_lambda_function.get_listener_telemetry.arn
timeout_milliseconds = 30000
payload_format_version = "2.0"
}
resource "aws_apigatewayv2_integration" "ApiGatewayV2Integration" {
api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id
connection_type = "INTERNET"

View File

@ -165,6 +165,7 @@ def get_telem(event, context):
},
"query": {
"bool": {
"must_not": [{"match_phrase": {"software_name": "SondehubV1"}}, {"match_phrase": {"serial": "xxxxxxxx"}}],
"filter": [
{"match_all": {}},
{
@ -193,8 +194,146 @@ def get_telem(event, context):
}
for sonde in results["aggregations"]["2"]["buckets"]
}
return json.dumps(output)
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
json_response = json.dumps(output)
f.write(json_response.encode('utf-8'))
gzippedResponse = compressed.getvalue()
return {
"body": base64.b64encode(gzippedResponse).decode(),
"isBase64Encoded": True,
"statusCode": 200,
"headers": {
"Content-Encoding": "gzip",
"content-type": "application/json"
}
}
def get_listener_telemetry(event, context):
durations = { # ideally we shouldn't need to predefine these, but it's a shit load of data and we don't need want to overload ES
"3d": (259200, 1200), # 3d, 20m
"1d": (86400, 600), # 1d, 10m
"12h": (43200, 600), # 1d, 10m
"6h": (21600, 60), # 6h, 1m
"3h": (10800, 15), # 3h, 10s
"1h": (3600, 15),
"30m": (3600, 5),
"1m": (60, 1),
"15s": (15, 1)
}
duration_query = "3h"
requested_time = datetime.now(timezone.utc)
if (
"queryStringParameters" in event
and "duration" in event["queryStringParameters"]
):
if event["queryStringParameters"]["duration"] in durations:
duration_query = event["queryStringParameters"]["duration"]
else:
return f"Duration must be either {', '.join(durations.keys())}"
if (
"queryStringParameters" in event
and "datetime" in event["queryStringParameters"]
):
requested_time = datetime.fromisoformat(
event["queryStringParameters"]["datetime"].replace("Z", "+00:00")
)
(duration, interval) = durations[duration_query]
if "uploader_callsign" in event["queryStringParameters"]:
interval = 1
lt = requested_time
gte = requested_time - timedelta(0, duration)
path = "listeners-*/_search"
payload = {
"timeout": "30s",
"aggs": {
"2": {
"terms": {
"field": "uploader_callsign.keyword",
"order": {"_key": "desc"},
"size": 10000,
},
"aggs": {
"3": {
"date_histogram": {
"field": "ts",
"fixed_interval": f"{str(interval)}s",
"min_doc_count": 1,
},
"aggs": {
"1": {
"top_hits": {
# "docvalue_fields": [
# {"field": "position"},
# {"field": "alt"},
# {"field": "datetime"},
# ],
# "_source": "position",
"size": 1,
"sort": [{"ts": {"order": "desc"}}],
}
}
},
}
},
}
},
"query": {
"bool": {
"filter": [
{"match_all": {}},
{
"range": {
"ts": {"gte": gte.isoformat(), "lt": lt.isoformat()}
}
},
]
}
},
}
if "queryStringParameters" in event:
if "uploader_callsign" in event["queryStringParameters"]:
payload["query"]["bool"]["filter"].append(
{
"match_phrase": {
"uploader_callsign": str(event["queryStringParameters"]["uploader_callsign"])
}
}
)
results = es_request(payload, path, "POST")
output = {
sonde["key"]: {
data["key_as_string"]: data["1"]["hits"]["hits"][0]["_source"]
for data in sonde["3"]["buckets"]
}
for sonde in results["aggregations"]["2"]["buckets"]
}
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
json_response = json.dumps(output)
f.write(json_response.encode('utf-8'))
gzippedResponse = compressed.getvalue()
return {
"body": base64.b64encode(gzippedResponse).decode(),
"isBase64Encoded": True,
"statusCode": 200,
"headers": {
"Content-Encoding": "gzip",
"content-type": "application/json"
}
}
def datanew(event, context):
durations = { # ideally we shouldn't need to predefine these, but it's a shit load of data and we don't need want to overload ES
@ -635,46 +774,45 @@ if __name__ == "__main__":
# max_positions: 0
# position_id: 0
# vehicles: RS_*;*chase
print(
datanew(
{
"queryStringParameters": {
"mode": "single",
"format": "json",
"position_id": "S1443103-2021-07-20T12:46:19.040000Z"
}
},
{},
)
)
# print(
# datanew(
# {
# "queryStringParameters": {
# "mode": "single",
# "format": "json",
# "position_id": "S1443103-2021-07-20T12:46:19.040000Z"
# }
# },
# {},
# )
# )
# print(
# get_listeners(
# {},{}
# )
# )
# print (
# get_telem(
# {"queryStringParameters": {
# "duration": "1d",
# "serial": "T1230861"
# }
# },
# {}
# )
# )
print(
datanew(
{
"queryStringParameters": {
"type": "positions",
"mode": "3hours",
"position_id": "0",
"vehicles": "S1443103"
}
print (
get_chase(
{"queryStringParameters": {
"duration": "1d"
}
},
{},
{}
)
)
# print(
# datanew(
# {
# "queryStringParameters": {
# "type": "positions",
# "mode": "3hours",
# "position_id": "0",
# "vehicles": "S1443103"
# }
# },
# {},
# )
# )

View File

@ -174,6 +174,40 @@ paths:
responses:
200:
description: Station Position successfully uploaded.
/listeners/telemetry:
get:
summary: Request Listener Telemetry Data
description: >
Use this to get the current listener (chase car / station) telemetry
produces:
- "application/json"
parameters:
- in: query
name: duration
description: How far back in time to receive data from. A shorter time period will result is higher time resolution data.
required: false
type: string
enum:
- "3h"
- "6h"
- "1d"
- "3d"
- in: query
name: uploader_callsign
description: Specific callsign number to query (if wanted). Requests for data for a single uploader will return the highest time resolution data available.
required: false
type: string
- in: query
name: datetime
description: "End time to query as an ISO-8601 time string. Defaults to now. Example: `2021-02-02T11:27:38.634Z`"
required: false
type: string
format: date-time
responses:
200:
description: Returns a dictionary keyed by uploader_callsign of a dictionary of times with listener data.
schema:
$ref: "#/definitions/listener"
/sondes/websocket:
get:
description: Gets a presigned URL for use in connecting to the MQTT websocket endpoint.