add full ham telm dump endpoints

This commit is contained in:
xss 2022-05-11 13:34:57 +10:00
parent e4218a65e4
commit 6e636fd2f5
4 changed files with 242 additions and 15 deletions

View File

@ -48,6 +48,28 @@ resource "aws_lambda_function" "ham_telem" {
} }
} }
resource "aws_lambda_function" "ham_telem_full" {
function_name = "ham_get_telem_full"
handler = "query_ham.get_telem_full"
s3_bucket = aws_s3_bucket_object.lambda.bucket
s3_key = aws_s3_bucket_object.lambda.key
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 1024
role = aws_iam_role.basic_lambda_role.arn
runtime = "python3.9"
timeout = 30
architectures = ["arm64"]
environment {
variables = {
"ES" = "es.${local.domain_name}"
}
}
tags = {
Name = "ham_get_telem_full"
}
}
resource "aws_lambda_permission" "ham_get" { resource "aws_lambda_permission" "ham_get" {
action = "lambda:InvokeFunction" action = "lambda:InvokeFunction"
function_name = aws_lambda_function.ham_get.arn function_name = aws_lambda_function.ham_get.arn
@ -62,6 +84,13 @@ resource "aws_lambda_permission" "ham_telem" {
source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.main.id}/*/*/amateur/telemetry" source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.main.id}/*/*/amateur/telemetry"
} }
resource "aws_lambda_permission" "ham_telem_full" {
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.ham_telem_full.arn
principal = "apigateway.amazonaws.com"
source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.main.id}/*/*/amateur/telemetry/*"
}
resource "aws_apigatewayv2_route" "ham_get" { resource "aws_apigatewayv2_route" "ham_get" {
api_id = aws_apigatewayv2_api.main.id api_id = aws_apigatewayv2_api.main.id
@ -79,6 +108,14 @@ resource "aws_apigatewayv2_route" "ham_telem" {
target = "integrations/${aws_apigatewayv2_integration.ham_telem.id}" target = "integrations/${aws_apigatewayv2_integration.ham_telem.id}"
} }
resource "aws_apigatewayv2_route" "ham_telem_full" {
api_id = aws_apigatewayv2_api.main.id
api_key_required = false
authorization_type = "NONE"
route_key = "GET /amateur/telemetry/{payload_callsign}"
target = "integrations/${aws_apigatewayv2_integration.ham_telem_full.id}"
}
resource "aws_apigatewayv2_integration" "ham_get" { resource "aws_apigatewayv2_integration" "ham_get" {
api_id = aws_apigatewayv2_api.main.id api_id = aws_apigatewayv2_api.main.id
connection_type = "INTERNET" connection_type = "INTERNET"
@ -99,6 +136,15 @@ resource "aws_apigatewayv2_integration" "ham_telem" {
payload_format_version = "2.0" payload_format_version = "2.0"
} }
resource "aws_apigatewayv2_integration" "ham_telem_full" {
api_id = aws_apigatewayv2_api.main.id
connection_type = "INTERNET"
integration_method = "POST"
integration_type = "AWS_PROXY"
integration_uri = aws_lambda_function.ham_telem_full.arn
timeout_milliseconds = 30000
payload_format_version = "2.0"
}
resource "aws_lambda_function" "ham_get_listener_telemetry" { resource "aws_lambda_function" "ham_get_listener_telemetry" {
function_name = "ham_get_listener_telemetry" function_name = "ham_get_listener_telemetry"

View File

@ -2,7 +2,7 @@ import json
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
import base64 import base64
import gzip import gzip
from io import BytesIO from io import BytesIO, StringIO
import es import es
def get(event, context): def get(event, context):
@ -245,6 +245,151 @@ def get_telem(event, context):
} }
} }
def get_telem_full(event, context):
if (
"queryStringParameters" in event
and "last" in event["queryStringParameters"]
):
last = int(event["queryStringParameters"]["last"])
else:
last = 21600 # 6 hours
if (
"queryStringParameters" in event
and "datetime" in event["queryStringParameters"]
):
requested_time = datetime.fromisoformat(
event["queryStringParameters"]["datetime"].replace("Z", "+00:00")
)
else:
requested_time = datetime.now(timezone.utc)
lt = requested_time + timedelta(0, 1)
gte = requested_time - timedelta(0, last)
path = f"ham-telm-{lt.year:2}-{lt.month:02},telm-{gte.year:2}-{gte.month:02}/_search"
payload = {
"timeout": "30s",
"size": 10000,
"query": {
"bool": {
"minimum_should_match": 1,
"must_not": [{"match_phrase": {"software_name": "SondehubV1"}}, {"match_phrase": {"payload_callsign": "xxxxxxxx"}}],
"should": [
{
"bool": {
"must": [
{
"exists": {
"field": "sats"
}
},
{
"range": {
"sats": {
"gte": 1,
"lt": None
}
}
}
]
}
},
{
"bool": {
"must_not": [
{
"exists": {
"field": "sats"
}
}
]
}
}
],
"filter": [
{"match_all": {}},
{
"range": {
"datetime": {"gte": gte.isoformat(), "lt": lt.isoformat()}
}
}
]
}
},
}
payload["query"]["bool"]["filter"].append(
{
"match_phrase": {
"payload_callsign": str(event["pathParameters"]["payload_callsign"])
}
}
)
data = []
response = es.request(json.dumps(payload), path, "POST", params={"scroll": "1m"})
scroll_id = response['_scroll_id']
scroll_ids = [scroll_id]
data += [ x["_source"] for x in response['hits']['hits']]
while response['hits']['hits']:
response = es.request(json.dumps({"scroll": "1m", "scroll_id": scroll_id }),
"_search/scroll", "POST")
scroll_id = response['_scroll_id']
scroll_ids.append(scroll_id)
data += [ x["_source"] for x in response['hits']['hits']]
for scroll_id in scroll_ids: # clean up scrolls
try:
scroll_delete = es.request(json.dumps({"scroll_id": scroll_id }),
"_search/scroll", "DELETE")
print(scroll_delete)
except RuntimeError:
pass
content_type = "application/json"
# convert to CSV if requested
if (
"queryStringParameters" in event
and "format" in event["queryStringParameters"]
and event["queryStringParameters"]['format'] == "csv"
):
import csv
content_type = "text/csv"
csv_keys = list(set().union(*(d.keys() for d in data)))
csv_keys.remove("datetime")
csv_keys.insert(0,"datetime") # datetime should be at the front of the CSV
csv_output = StringIO(newline='')
fc = csv.DictWriter(csv_output, fieldnames=csv_keys)
fc.writeheader()
fc.writerows(data)
data = csv_output.getvalue()
else:
data = json.dumps(data)
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(data.encode('utf-8'))
gzippedResponse = compressed.getvalue()
body = base64.b64encode(gzippedResponse).decode()
if len(body) > (1024 * 1024 * 6) - 1000 : # check if payload is too big
content_type = "text/plain"
body = "Output is too large, try a smaller time frame"
return {
"body": body,
"isBase64Encoded": True,
"statusCode": 400,
"headers": {
"Content-Encoding": "gzip",
"content-type": content_type
}
}
def get_listener_telemetry(event, context): def get_listener_telemetry(event, context):

View File

@ -3,24 +3,20 @@ import base64
import zlib import zlib
response = get_telem( response = get_telem_full(
{ {
"pathParameters": {
"payload_callsign" : "HORUS-V2"
},
"queryStringParameters":{ "queryStringParameters":{
# "payload_callsign": "HORUS-V2", "last": "3600",
"duration": "3d" "datetime": "2022-05-07T04:18:10.000000Z",
"format": "csv"
} }
}, {}) }, {})
print(len(response['body']))
# response = get_listener_telemetry(
# {
# "queryStringParameters":{
# # "payload_callsign": "HORUS-V2",
# "duration": "3h"
# }
# }, {})
compressed = base64.b64decode(response['body']) compressed = base64.b64decode(response['body'])
decompressed = (zlib.decompress(compressed, 16 + zlib.MAX_WBITS)) decompressed = (zlib.decompress(compressed, 16 + zlib.MAX_WBITS))
print(json.loads(decompressed) #print(json.loads(decompressed))
) print(decompressed.decode().splitlines()[:5])
print(len(json.dumps(response)))

View File

@ -75,6 +75,42 @@ paths:
description: Returns a dictionary keyed by serial number of a dictionary of times with SondeHub Telemetry values description: Returns a dictionary keyed by serial number of a dictionary of times with SondeHub Telemetry values
schema: schema:
$ref: "#/definitions/amateur_query_results_format" $ref: "#/definitions/amateur_query_results_format"
/amateur/telemetry/{payload_callsign}:
get:
summary: Request Amateur Radiosonde Telemetry Data
tags:
- amateur
description: >
Use this to get the current state of all the radiosondes then use the realtime API to access streaming data. Do not regularly poll this endpoint, it is rate limited.
produces:
- "application/json"
parameters:
- in: query
name: last
description: "How far back to search in seconds. Defaults to 24hrs"
type: number
- in: query
name: datetime
description: "End time to query as an ISO-8601 time string. Defaults to now. Example: `2021-02-02T11:27:38.634Z`"
required: false
type: string
format: date-time
- in: path
name: payload_callsign
description: Specific callsign to query (if wanted). Requests for data for a single sonde will return the highest time resolution data available.
required: true
type: string
- in: query
name: format
description: "If set to csv will provide a CSV of the results rather than JSON"
required: false
type: string
format: string
responses:
200:
description: Returns a list of all data received
schema:
$ref: "#/definitions/amateur_query_full_results_format"
/sondes/telemetry: /sondes/telemetry:
put: put:
summary: Upload Radiosonde Telemetry to Sondehub database. summary: Upload Radiosonde Telemetry to Sondehub database.
@ -667,6 +703,10 @@ definitions:
properties: properties:
datetime: datetime:
$ref: "#/definitions/listener" $ref: "#/definitions/listener"
amateur_query_full_results_format:
type: array
items:
$ref: "#/definitions/amateur_telemetry_format"
amateur_query_results_format: amateur_query_results_format:
type: object type: object
properties: properties: