sondehub-infra/recovered/lambda_function.py

343 lines
10 KiB
Python
Raw Normal View History

2021-06-06 01:19:58 +00:00
import json
import boto3
import botocore.credentials
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import zlib
import base64
import datetime
import os
from io import BytesIO
import gzip
2021-06-06 01:19:58 +00:00
HOST = os.getenv("ES")
http_session = URLLib3Session()
2021-06-06 01:19:58 +00:00
2021-12-20 00:04:26 +00:00
# get aws creds
aws_session = boto3.Session()
from multiprocessing import Process
def mirror(path,params):
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
2021-12-20 00:04:26 +00:00
SigV4Auth(aws_session.get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
2021-06-06 01:19:58 +00:00
def es_request(payload, path, method):
2021-12-20 00:04:26 +00:00
2021-06-06 01:19:58 +00:00
params = json.dumps(payload)
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(params.encode('utf-8'))
params = compressed.getvalue()
headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"}
2021-06-06 01:19:58 +00:00
request = AWSRequest(
method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers
)
2021-12-20 00:04:26 +00:00
SigV4Auth(aws_session.get_credentials(),
2021-06-06 01:19:58 +00:00
"es", "us-east-1").add_auth(request)
2021-12-11 05:17:41 +00:00
#p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
2021-06-06 01:19:58 +00:00
return json.loads(r.text)
def getSonde(serial):
query = {
"aggs": {
"1": {
"top_hits": {
"docvalue_fields": [
{
"field": "position"
},
{
"field": "alt"
}
],
"_source": "position",
"size": 1,
"sort": [
{
"datetime": {
"order": "desc"
}
}
]
}
}
},
"query": {
"bool": {
"filter": [
{
"match_phrase": {
"serial.keyword": serial
}
}
]
}
}
}
results = es_request(query, "telm-*/_search", "POST")
return results["aggregations"]["1"]["hits"]["hits"]
def getRecovered(serial):
query = {
"aggs": {
"1": {
"top_hits": {
"docvalue_fields": [
{
"field": "recovered_by.keyword"
}
],
"size": 1,
"sort": [
{
"datetime": {
"order": "desc"
}
}
]
}
}
},
"query": {
"bool": {
"filter": [
{
"match_phrase": {
"serial.keyword": serial
}
},
{
"match_phrase": {
"recovered": True
}
},
]
}
}
}
results = es_request(query, "recovered*/_search", "POST")
return results["aggregations"]["1"]["hits"]["hits"]
def put(event, context):
if "isBase64Encoded" in event and event["isBase64Encoded"] == True:
event["body"] = base64.b64decode(event["body"])
if (
"content-encoding" in event["headers"]
and event["headers"]["content-encoding"] == "gzip"
):
event["body"] = zlib.decompress(event["body"], 16 + zlib.MAX_WBITS)
time_delta = None
if "date" in event["headers"]:
try:
time_delta_header = event["headers"]["date"]
time_delta = (
datetime.datetime(*parsedate(time_delta_header)[:7])
- datetime.datetime.utcnow()
).total_seconds()
except:
pass
recovered = json.loads(event["body"])
2021-06-06 01:48:23 +00:00
if "datetime" not in recovered:
recovered["datetime"] = datetime.datetime.now().isoformat()
2021-06-06 01:19:58 +00:00
sonde_last_data = getSonde(recovered["serial"])
2021-06-08 08:52:17 +00:00
if recovered["serial"] == "":
return {"statusCode": 400, "body": json.dumps({"message": "serial cannot be empty"})}
2021-06-06 01:19:58 +00:00
if len(sonde_last_data) == 0:
2021-06-08 08:52:17 +00:00
return {"statusCode": 400, "body": json.dumps({"message": "serial not found in db"})}
2021-06-06 01:19:58 +00:00
already_recovered = getRecovered(recovered["serial"])
if len(already_recovered) != 0:
recovered_by = already_recovered[0]['fields']['recovered_by.keyword'][0]
2021-06-08 08:52:17 +00:00
return {"statusCode": 400, "body": json.dumps({"message": f"Already recovered by {recovered_by}"})}
2021-06-06 01:19:58 +00:00
recovered['position'] = [recovered['lon'], recovered['lat']]
result = es_request(recovered, "recovered/_doc", "POST")
# add in elasticsearch extra position field
2021-06-08 08:52:17 +00:00
return {"statusCode": 200, "body": json.dumps({"message": "telm logged. Have a good day ^_^"})}
2021-06-06 01:19:58 +00:00
def get(event, context):
filters = []
2021-12-20 00:04:26 +00:00
should = []
2021-06-06 01:19:58 +00:00
last = 259200
2021-12-20 00:04:26 +00:00
serials = None
2021-06-06 01:19:58 +00:00
lat = None
lon = None
distance = None
# grab query parameters
if "queryStringParameters" in event:
if "last" in event["queryStringParameters"]:
last = int(event['queryStringParameters']['last'])
if "serial" in event["queryStringParameters"]:
2021-12-20 00:04:26 +00:00
serials = event['queryStringParameters']['serial'].split(",")
2021-06-06 01:19:58 +00:00
if "lat" in event["queryStringParameters"]:
lat = float(event["queryStringParameters"]['lat'])
if "lon" in event["queryStringParameters"]:
lon = float(event["queryStringParameters"]['lon'])
if "distance" in event["queryStringParameters"]:
distance = int(event["queryStringParameters"]['distance'])
if last != 0:
filters.append(
{
"range": {
"datetime": {
"gte": f"now-{last}s",
"lte": "now",
}
}
}
)
2021-12-20 00:04:26 +00:00
if serials:
for serial in serials:
should.append(
{
"match_phrase": {
"serial.keyword": serial
}
2021-06-06 01:19:58 +00:00
}
2021-12-20 00:04:26 +00:00
)
2021-06-06 01:19:58 +00:00
if lat and lon and distance:
filters.append(
{
"geo_distance": {
"distance": f"{distance}m",
"position": {
"lat": lat,
"lon": lon,
},
}
}
)
query = {
"query": {
"bool": {
2021-12-20 00:04:26 +00:00
"filter": filters,
"should": should,
2021-06-06 01:19:58 +00:00
}
2021-06-08 08:52:17 +00:00
},
"aggs": {
"2": {
"terms": {
"field": "serial.keyword",
"order": {
"2-orderAgg": "desc"
},
"size": 500
},
"aggs": {
"2-orderAgg": {
"max": {
"field": "datetime"
},
},
"1": {
"top_hits": {
"_source": True,
"size": 1,
"sort": [
{
"recovered": {
"order": "desc"
}
},
{
"datetime": {
"order": "desc"
}
}
]
}
}
}
}
2021-06-06 01:19:58 +00:00
}
}
2021-12-20 00:04:26 +00:00
if serials:
query["query"]["bool"]["minimum_should_match"] = 1
2021-06-06 01:19:58 +00:00
results = es_request(query, "recovered*/_search", "POST")
2021-06-08 08:52:17 +00:00
output = [x['1']['hits']['hits'][0]["_source"]
for x in results['aggregations']['2']['buckets']]
2021-06-06 01:19:58 +00:00
return {"statusCode": 200, "body": json.dumps(output)}
if __name__ == "__main__":
payload = {
"version": "2.0",
"routeKey": "PUT /recovered",
"rawPath": "/recovered",
"rawQueryString": "",
2021-12-20 00:04:26 +00:00
# "queryStringParameters": {
# "serial": "S1321065,010-2-12771"
# },
2021-06-06 01:19:58 +00:00
"headers": {
"accept": "*/*",
"accept-encoding": "deflate",
"content-encoding": "",
"content-length": "2135",
"content-type": "application/json",
"host": "api.v2.sondehub.org",
"user-agent": "autorx-1.4.1-beta4",
"x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a",
"x-forwarded-for": "103.107.130.22",
"x-forwarded-port": "443",
"x-forwarded-proto": "https",
"date": "Sun, 31 Jan 2021 00:21:45 GMT",
},
"requestContext": {
"accountId": "143841941773",
"apiId": "r03szwwq41",
"domainName": "api.v2.sondehub.org",
"domainPrefix": "api",
"http": {
"method": "PUT",
"path": "/sondes/telemetry",
"protocol": "HTTP/1.1",
"sourceIp": "103.107.130.22",
"userAgent": "autorx-1.4.1-beta4",
},
"requestId": "Z_NJvh0RoAMEJaw=",
"routeKey": "PUT /sondes/telemetry",
"stage": "$default",
"time": "31/Jan/2021:00:10:25 +0000",
"timeEpoch": 1612051825409,
},
"body": json.dumps({
2021-06-08 08:52:17 +00:00
"datetime": "2021-06-06T01:10:07.629Z",
"serial": "string",
"lat": 0,
"lon": 0,
"alt": 0,
"recovered": True,
"recovered_by": "string",
"description": "string"
}),
2021-06-06 01:19:58 +00:00
"isBase64Encoded": False,
}
2021-06-08 08:52:17 +00:00
# print(put(payload, {}))
print(get(payload, {}))