switch station to post direct to elk

This commit is contained in:
Michaela 2021-10-08 19:26:04 +11:00
parent b8dfe68198
commit 14e8b050ea
2 changed files with 94 additions and 74 deletions
sonde-api-to-iot-core
station-api-to-iot-core

View File

@ -264,7 +264,7 @@ def upload(event, context):
time_delta_header = event["headers"]["date"]
time_delta = (
datetime.datetime(*parsedate(time_delta_header)[:7])
- datetime.datetime.utcnow() # TODO we should use the request context datetime here so that timedelta works for chunked requests
- datetime.datetime.utcfromtimestamp(event["requestContext"]["timeEpoch"]/1000)
).total_seconds()
except:
pass

View File

@ -4,68 +4,22 @@ import zlib
import base64
import datetime
import functools
from awscrt import io, mqtt, auth, http
from awscrt.exceptions import AwsCrtError
from awsiot import mqtt_connection_builder
import uuid
import threading
from email.utils import parsedate
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import boto3
import botocore.credentials
import os
from io import BytesIO
import gzip
# this needs a bunch of refactor but the general approach is
# connect to mqtt via websockets during init
# if we detect that we are disconnected then reconnect
# this is to make the lambda function nice and quick when during
# peak load
# todo
# we should add some value checking
# we typically perform version banning here based on user agent
# xray doesn't know about mqtt, we should teach it
# we should probably get sondehub v1 stuff in here as well
# error handling - at the moment we bail on a single failure
# report to the user what's happened
# probably turn down logging since cloudwatch costs $$$
# env variable some of this
# work out how to have a dev env
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
def connect():
global connect_future, mqtt_connection
session = boto3.session.Session()
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
credentials_provider = auth.AwsCredentialsProvider.new_default_chain(
client_bootstrap
)
mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing(
endpoint=os.getenv("IOT_ENDPOINT"),
client_bootstrap=client_bootstrap,
region="us-east-1",
credentials_provider=credentials_provider,
client_id=str(uuid.uuid4()),
clean_session=False,
keep_alive_secs=6,
)
connect_future = mqtt_connection.connect()
connect_future.result()
connect()
HOST = os.getenv("ES")
def lambda_handler(event, context):
# Future.result() waits until a result is available
try:
connect_future.result()
except:
connect()
if "isBase64Encoded" in event and event["isBase64Encoded"] == True:
event["body"] = base64.b64decode(event["body"])
if (
@ -79,14 +33,12 @@ def lambda_handler(event, context):
time_delta_header = event["headers"]["date"]
time_delta = (
datetime.datetime(*parsedate(time_delta_header)[:7])
- datetime.datetime.utcnow()
- datetime.datetime.utcfromtimestamp(event["requestContext"]["timeEpoch"]/1000)
).total_seconds()
except:
pass
payload = json.loads(event["body"])
print(payload)
tasks = []
first = False
if "user-agent" in event["headers"]:
event["time_server"] = datetime.datetime.now().isoformat()
payload["user-agent"] = event["headers"]["user-agent"]
@ -96,21 +48,89 @@ def lambda_handler(event, context):
payload["uploader_position"][2],
f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}",
)
(msg, x) = mqtt_connection.publish(
topic=f'stations/station_position',
payload=json.dumps(payload),
qos=mqtt.QoS.AT_MOST_ONCE,
)
try:
msg.result()
except (RuntimeError, AwsCrtError):
connect()
(msg, x) = mqtt_connection.publish(
topic=f'stations/station_position',
payload=json.dumps(payload),
qos=mqtt.QoS.AT_MOST_ONCE,
)
msg.result()
payload.pop("uploader_contact_email", None)
index = datetime.datetime.utcnow().strftime("listeners-%Y-%m")
print(payload)
print(index)
payload["ts"] = datetime.datetime.utcnow().isoformat()
es_request(json.dumps(payload),f"{index}/_doc","POST")
return {"statusCode": 200, "body": "^v^ telm logged"}
def es_request(payload, path, method):
# get aws creds
session = boto3.Session()
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(payload.encode('utf-8'))
payload = compressed.getvalue()
headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://{HOST}/{path}", data=payload, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
if r.status_code != 200 and r.status_code != 201:
raise RuntimeError
return json.loads(r.text)
if __name__ == "__main__":
payload = {
"version": "2.0",
"routeKey": "PUT /sondes/telemetry",
"rawPath": "/sondes/telemetry",
"rawQueryString": "",
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate",
"content-length": "2135",
"content-type": "application/json",
"host": "api.v2.sondehub.org",
"user-agent": "autorx-1.4.1-beta4",
"x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a",
"x-forwarded-for": "103.107.130.22",
"x-forwarded-port": "443",
"x-forwarded-proto": "https",
"date": "Sun, 31 Jan 2021 00:21:45 GMT",
},
"requestContext": {
"accountId": "143841941773",
"apiId": "r03szwwq41",
"domainName": "api.v2.sondehub.org",
"domainPrefix": "api",
"http": {
"method": "PUT",
"path": "/sondes/telemetry",
"protocol": "HTTP/1.1",
"sourceIp": "103.107.130.22",
"userAgent": "autorx-1.4.1-beta4",
},
"requestId": "Z_NJvh0RoAMEJaw=",
"routeKey": "PUT /sondes/telemetry",
"stage": "$default",
"time": "31/Jan/2021:00:10:25 +0000",
"timeEpoch": 1612051825409,
},
"body": """
{
"software_name": "radiosonde_auto_rx",
"software_version": "1.5.5",
"uploader_callsign": "mwheeler",
"uploader_position": [
-37.8136,
144.9631,
90
],
"uploader_antenna": "mwheeler",
"uploader_contact_email": "none@none.com",
"mobile": false
}
""",
"isBase64Encoded": False,
}
print(lambda_handler(payload, {}))