first pass at recovery endpoint

This commit is contained in:
Michaela 2021-06-06 11:19:58 +10:00
parent 6da25b61ba
commit 021415aa6a
3 changed files with 415 additions and 1 deletions

139
recovered.tf Normal file
View File

@ -0,0 +1,139 @@
data "archive_file" "recovered" {
type = "zip"
source_file = "recovered/lambda_function.py"
output_path = "${path.module}/build/recovered.zip"
}
resource "aws_iam_role" "recovered" {
path = "/service-role/"
name = "recovered"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}
EOF
max_session_duration = 3600
}
resource "aws_iam_role_policy" "recovered" {
name = "recovered"
role = aws_iam_role.recovered.name
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:log-group:/aws/lambda/*"
]
},
{
"Effect": "Allow",
"Action": "es:*",
"Resource": "*"
}
]
}
EOF
}
resource "aws_lambda_function" "recovered_get" {
function_name = "recovered_get"
handler = "lambda_function.get"
filename = "${path.module}/build/recovered.zip"
source_code_hash = data.archive_file.recovered.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.recovered.arn
runtime = "python3.8"
timeout = 30
reserved_concurrent_executions = 100
environment {
variables = {
"ES" = aws_route53_record.Route53RecordSet7.fqdn
}
}
}
resource "aws_lambda_function" "recovered_put" {
function_name = "recovered_put"
handler = "lambda_function.put"
filename = "${path.module}/build/recovered.zip"
source_code_hash = data.archive_file.recovered.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.recovered.arn
runtime = "python3.8"
timeout = 30
reserved_concurrent_executions = 100
environment {
variables = {
"ES" = aws_route53_record.Route53RecordSet7.fqdn
}
}
}
resource "aws_lambda_permission" "recovered_get" {
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.recovered_get.arn
principal = "apigateway.amazonaws.com"
source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.ApiGatewayV2Api.id}/*/*/recovered"
}
resource "aws_lambda_permission" "recovered_put" {
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.recovered_put.arn
principal = "apigateway.amazonaws.com"
source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.ApiGatewayV2Api.id}/*/*/recovered"
}
resource "aws_apigatewayv2_integration" "recovered_get" {
api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id
connection_type = "INTERNET"
integration_method = "POST"
integration_type = "AWS_PROXY"
integration_uri = aws_lambda_function.recovered_get.arn
timeout_milliseconds = 30000
payload_format_version = "2.0"
}
resource "aws_apigatewayv2_integration" "recovered_put" {
api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id
connection_type = "INTERNET"
integration_method = "POST"
integration_type = "AWS_PROXY"
integration_uri = aws_lambda_function.recovered_put.arn
timeout_milliseconds = 30000
payload_format_version = "2.0"
}
resource "aws_apigatewayv2_route" "recovered_get" {
api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id
api_key_required = false
authorization_type = "NONE"
route_key = "GET /recovered"
target = "integrations/${aws_apigatewayv2_integration.recovered_get.id}"
}
resource "aws_apigatewayv2_route" "recovered_put" {
api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id
api_key_required = false
authorization_type = "NONE"
route_key = "PUT /recovered"
target = "integrations/${aws_apigatewayv2_integration.recovered_put.id}"
}

View File

@ -0,0 +1,271 @@
import json
import boto3
import botocore.credentials
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import zlib
import base64
import datetime
import os
HOST = os.getenv("ES")
def es_request(payload, path, method):
# get aws creds
session = boto3.Session()
params = json.dumps(payload)
headers = {"Host": HOST, "Content-Type": "application/json"}
request = AWSRequest(
method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(),
"es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
return json.loads(r.text)
def getSonde(serial):
query = {
"aggs": {
"1": {
"top_hits": {
"docvalue_fields": [
{
"field": "position"
},
{
"field": "alt"
}
],
"_source": "position",
"size": 1,
"sort": [
{
"datetime": {
"order": "desc"
}
}
]
}
}
},
"query": {
"bool": {
"filter": [
{
"match_phrase": {
"serial.keyword": serial
}
}
]
}
}
}
results = es_request(query, "telm-*/_search", "POST")
return results["aggregations"]["1"]["hits"]["hits"]
def getRecovered(serial):
query = {
"aggs": {
"1": {
"top_hits": {
"docvalue_fields": [
{
"field": "recovered_by.keyword"
}
],
"size": 1,
"sort": [
{
"datetime": {
"order": "desc"
}
}
]
}
}
},
"query": {
"bool": {
"filter": [
{
"match_phrase": {
"serial.keyword": serial
}
},
{
"match_phrase": {
"recovered": True
}
},
]
}
}
}
results = es_request(query, "recovered*/_search", "POST")
return results["aggregations"]["1"]["hits"]["hits"]
def put(event, context):
if "isBase64Encoded" in event and event["isBase64Encoded"] == True:
event["body"] = base64.b64decode(event["body"])
if (
"content-encoding" in event["headers"]
and event["headers"]["content-encoding"] == "gzip"
):
event["body"] = zlib.decompress(event["body"], 16 + zlib.MAX_WBITS)
time_delta = None
if "date" in event["headers"]:
try:
time_delta_header = event["headers"]["date"]
time_delta = (
datetime.datetime(*parsedate(time_delta_header)[:7])
- datetime.datetime.utcnow()
).total_seconds()
except:
pass
recovered = json.loads(event["body"])
sonde_last_data = getSonde(recovered["serial"])
if len(sonde_last_data) == 0:
return {"statusCode": 400, "body": "serial not found in db"}
already_recovered = getRecovered(recovered["serial"])
if len(already_recovered) != 0:
recovered_by = already_recovered[0]['fields']['recovered_by.keyword'][0]
return {"statusCode": 400, "body": f"Already recovered by {recovered_by}"}
recovered['position'] = [recovered['lon'], recovered['lat']]
result = es_request(recovered, "recovered/_doc", "POST")
# add in elasticsearch extra position field
return {"statusCode": 200, "body": "^v^ telm logged"}
def get(event, context):
filters = []
last = 259200
serial = None
lat = None
lon = None
distance = None
# grab query parameters
if "queryStringParameters" in event:
if "last" in event["queryStringParameters"]:
last = int(event['queryStringParameters']['last'])
if "serial" in event["queryStringParameters"]:
serial = event['queryStringParameters']['serial']
if "lat" in event["queryStringParameters"]:
lat = float(event["queryStringParameters"]['lat'])
if "lon" in event["queryStringParameters"]:
lon = float(event["queryStringParameters"]['lon'])
if "distance" in event["queryStringParameters"]:
distance = int(event["queryStringParameters"]['distance'])
if last != 0:
filters.append(
{
"range": {
"datetime": {
"gte": f"now-{last}s",
"lte": "now",
}
}
}
)
if serial:
filters.append(
{
"match_phrase": {
"serial.keyword": serial
}
}
)
if lat and lon and distance:
filters.append(
{
"geo_distance": {
"distance": f"{distance}m",
"position": {
"lat": lat,
"lon": lon,
},
}
}
)
query = {
"query": {
"bool": {
"filter": filters
}
}
}
results = es_request(query, "recovered*/_search", "POST")
output = [x["_source"] for x in results['hits']['hits']]
return {"statusCode": 200, "body": json.dumps(output)}
if __name__ == "__main__":
payload = {
"version": "2.0",
"routeKey": "PUT /recovered",
"rawPath": "/recovered",
"rawQueryString": "",
"headers": {
"accept": "*/*",
"accept-encoding": "deflate",
"content-encoding": "",
"content-length": "2135",
"content-type": "application/json",
"host": "api.v2.sondehub.org",
"user-agent": "autorx-1.4.1-beta4",
"x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a",
"x-forwarded-for": "103.107.130.22",
"x-forwarded-port": "443",
"x-forwarded-proto": "https",
"date": "Sun, 31 Jan 2021 00:21:45 GMT",
},
"requestContext": {
"accountId": "143841941773",
"apiId": "r03szwwq41",
"domainName": "api.v2.sondehub.org",
"domainPrefix": "api",
"http": {
"method": "PUT",
"path": "/sondes/telemetry",
"protocol": "HTTP/1.1",
"sourceIp": "103.107.130.22",
"userAgent": "autorx-1.4.1-beta4",
},
"requestId": "Z_NJvh0RoAMEJaw=",
"routeKey": "PUT /sondes/telemetry",
"stage": "$default",
"time": "31/Jan/2021:00:10:25 +0000",
"timeEpoch": 1612051825409,
},
"body": json.dumps({
"datetime": "2021-06-06T01:10:07.629Z",
"serial": "string",
"lat": 0,
"lon": 0,
"alt": 0,
"recovered": True,
"recovered_by": "string",
"description": "string"
}),
"isBase64Encoded": False,
}
print(put(payload, {}))
# print(get(payload,{}))

View File

@ -288,6 +288,10 @@ paths:
produces:
- "application/json"
parameters:
- in: query
name: serial
type: string
description: radiosonde serial number to filter on. If none provided all serials will be presented
- in: query
name: lat
type: number
@ -302,7 +306,7 @@ paths:
type: number
- in: query
name: last
description: "How far back to search in seconds. Defaults to 14 days"
description: "How far back to search in seconds. Defaults to 3 days. Set to 0 for all"
type: number
responses: