sns api to iot

This commit is contained in:
Michaela 2021-05-23 23:14:15 +10:00
parent 860025145c
commit c886065f09
6 changed files with 78 additions and 66 deletions

View File

@ -337,6 +337,11 @@ resource "aws_iam_role_policy" "IAMPolicy4" {
"Effect": "Allow",
"Action": "s3:*",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "sns:*",
"Resource": "*"
}
]
}
@ -491,7 +496,7 @@ resource "aws_route53_record" "Route53RecordSet7" {
data "archive_file" "api_to_iot" {
type = "zip"
source_file = "sonde-api-to-iot-core/lambda_function.py"
source_dir = "sonde-api-to-iot-core/"
output_path = "${path.module}/build/sonde-api-to-iot-core.zip"
}
@ -539,6 +544,7 @@ resource "aws_lambda_function" "LambdaFunction" {
environment {
variables = {
"IOT_ENDPOINT" = data.aws_iot_endpoint.endpoint.endpoint_address
"SNS_TOPIC" = aws_sns_topic.sonde_telem.arn
}
}
layers = [

View File

@ -263,7 +263,7 @@ def datanew(event, context):
"aggs": {
"1": {
"top_hits": {
"size": 8,
"size": 5,
"sort": [{"pressure": {"order": "desc","mode" : "median"}},{"humidity": {"order": "desc","mode" : "median"}},{"temp": {"order": "desc","mode" : "median"}},{"alt": {"order": "desc","mode" : "median"}}],
}
}

View File

@ -1,11 +1,11 @@
import sys
sys.path.append("vendor/lib/python3.9/site-packages")
import json
import boto3
import zlib
import base64
import datetime
import functools
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
from awscrt import io, mqtt, auth, http
from awscrt.exceptions import AwsCrtError
from awsiot import mqtt_connection_builder
@ -13,6 +13,8 @@ import uuid
import threading
from email.utils import parsedate
import os
import asyncio
import aioboto3
# this needs a bunch of refactor but the general approach is
# connect to mqtt via websockets during init
@ -23,17 +25,12 @@ import os
# todo
# we should add some value checking
# we typically perform version banning here based on user agent
# xray doesn't know about mqtt, we should teach it
# we should probably get sondehub v1 stuff in here as well
# error handling - at the moment we bail on a single failure
# report to the user what's happened
# probably turn down logging since cloudwatch costs $$$
# env variable some of this
# work out how to have a dev env
patch_all()
def set_connection_header(request, operation_name, **kwargs):
request.headers['Connection'] = 'keep-alive'
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
@ -65,69 +62,79 @@ def connect():
connect()
def lambda_handler(event, context):
# Future.result() waits until a result is available
try:
connect_future.result()
except:
connect()
if "isBase64Encoded" in event and event["isBase64Encoded"] == True:
event["body"] = base64.b64decode(event["body"])
if (
"content-encoding" in event["headers"]
and event["headers"]["content-encoding"] == "gzip"
):
event["body"] = zlib.decompress(event["body"], 16 + zlib.MAX_WBITS)
time_delta = None
if "date" in event["headers"]:
async def upload(event, context):
global connect_future, mqtt_connection
async with aioboto3.client("sns",region_name="us-east-1") as sns:
sns.meta.events.register('request-created.dynamodb', set_connection_header)
tasks = []
# Future.result() waits until a result is available
try:
time_delta_header = event["headers"]["date"]
time_delta = (
datetime.datetime(*parsedate(time_delta_header)[:7])
- datetime.datetime.utcnow()
).total_seconds()
connect_future.result()
except:
pass
payloads = json.loads(event["body"])
tasks = []
first = False
for payload in payloads:
if "user-agent" in event["headers"]:
event["time_server"] = datetime.datetime.now().isoformat()
payload["user-agent"] = event["headers"]["user-agent"]
payload["position"] = f'{payload["lat"]},{payload["lon"]}'
if time_delta:
payload["upload_time_delta"] = time_delta
if "uploader_position" in payload:
if not payload["uploader_position"]:
payload.pop("uploader_position")
else:
(payload["uploader_alt"], payload["uploader_position"]) = (
payload["uploader_position"][2],
f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}",
)
(msg, x) = mqtt_connection.publish(
topic=f'sondes/{payload["serial"]}',
payload=json.dumps(payload),
qos=mqtt.QoS.AT_MOST_ONCE,
)
try:
msg.result()
except (RuntimeError, AwsCrtError):
connect()
if "isBase64Encoded" in event and event["isBase64Encoded"] == True:
event["body"] = base64.b64decode(event["body"])
if (
"content-encoding" in event["headers"]
and event["headers"]["content-encoding"] == "gzip"
):
event["body"] = zlib.decompress(event["body"], 16 + zlib.MAX_WBITS)
time_delta = None
if "date" in event["headers"]:
try:
time_delta_header = event["headers"]["date"]
time_delta = (
datetime.datetime(*parsedate(time_delta_header)[:7])
- datetime.datetime.utcnow()
).total_seconds()
except:
pass
payloads = json.loads(event["body"])
first = False
for payload in payloads:
if "user-agent" in event["headers"]:
event["time_server"] = datetime.datetime.now().isoformat()
payload["user-agent"] = event["headers"]["user-agent"]
payload["position"] = f'{payload["lat"]},{payload["lon"]}'
if time_delta:
payload["upload_time_delta"] = time_delta
if "uploader_position" in payload:
if not payload["uploader_position"]:
payload.pop("uploader_position")
else:
(payload["uploader_alt"], payload["uploader_position"]) = (
payload["uploader_position"][2],
f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}",
)
tasks.append(sns.publish(
TopicArn=os.getenv("SNS_TOPIC"),
Message=json.dumps(payload)
))
(msg, x) = mqtt_connection.publish(
topic=f'sondes/{payload["serial"]}',
payload=json.dumps(payload),
qos=mqtt.QoS.AT_MOST_ONCE,
)
msg.result()
try:
msg.result()
except (RuntimeError, AwsCrtError):
connect()
(msg, x) = mqtt_connection.publish(
topic=f'sondes/{payload["serial"]}',
payload=json.dumps(payload),
qos=mqtt.QoS.AT_MOST_ONCE,
)
msg.result()
await asyncio.gather(*tasks)
def lambda_handler(event, context):
asyncio.run(upload(event, context))
return {"statusCode": 200, "body": "^v^ telm logged"}
if __name__ == "__main__":
payload = {
"version": "2.0",
@ -169,4 +176,4 @@ if __name__ == "__main__":
"body": "H4sIAHD1FWAC/8WbX2/byBXFv0rg5/Vg7v87eduHYh8LJNui2KIwlFjZGHDsVJbTLop+996hszVHXpECKI7hJ1GURiR/vnPOmTt//8/Fw+OH/W9ftxdv31y8e89w+f6nix/eXOy3X77GoUvDVOLll83d46fNx/3jbrurZ/51c/Owud3UMx+2u5vNbT34HpAyKNSjt5t9/ThxMs/OceTTbvOljsLKVj92V78IOEG8+La9vfpW30uGkOvZ15v9dn8zfOACM8JlhkuCn3N+C/lt5pQz5Jx/qUN92OzrWJiGr73/tP/XZre9unsa7WK3ub65f7i/u95ebR7391e7f18MP2b7z8ft3cff6qAZkuT6Mz5v4+S7X+OYSyoqWr/x8+OXm+ubfT3Tk8aBzW0dDoyLpWwu/v0CPtejORURpTj0+PX2fnO93V193NzePtz8eld/zY/v/vTT1Y9/+fnPV+/+Nty8zf4hjtdxbu/rGUAlgZfSXMu37e7hZnj7AlLcscsP2/2Gh2t/3D3sr+qNqndTRUjqwxs9z+FhxvtXu+3H7c237fXLO5rLW/YU912h/HLx3x/e9IGCWij8GBSe4lYjzUIhq0OhiXPJ5RAKG0EhmVKxLDSGAhI+XcACJkoeBu7JRIwKQCKvx0R5ZsIbJiiRGJRZJvQPmfCzMuGifIiENEiU5BnxsE4UEF3KBEJfJiQHE04Mr8aE5GNMlFQgyvEsE7Z2nSg5sRr4JBQQaAoUaKFwRFgMBVFnKKK+ocSTOhUKOzsU8AwFDpP071BgEmHzWSi8AxQU8/oLKHgMBWIoipg9DiaPbAWXQsHSGQpMQKgsrwcFHlMUFrzSvMosqzMR0xiY6SET1DBR65UXbpmIp4plKRPSWWUK1d+tjKcyoWdngsZMeKMoOP79ZlVm1Oj1oWBGmYaCIGRHztbOHiI2TCiLoNDOMlPiW9lyplOhkHP7UeFnKKSBIlwgC8/KzPh/XFlmFklmBodMQMNESFF4AmesKNSGB7OICe+MhCQQRuSOSEiLhDwjYYlGSGgq7jw7eYQMWR8JRi0vIgocM8FRccPClXbyIAspspSJQZL0hEJTPDgi6AiFtlDoMSh4SE9wFgpaffIIaUMmk3WCPVm2gzIhmGFpQvGU3PVEwgIJx9ONxxmQ8BaJUZQZhq5JrbyIzk8dq0eZpcY4eRDDx5moFRcdxqlVSTEx0lI5AdDbd8SVGZeipzLBy5koLRM+ZmLsO6BO4jTPxOpJZgm7W1ym64TmhOrSMmEU9WUpE6idmQgn52Deceo4QKIcQ8KTepbZ0Ap0bSQgjqH44IAmmAgdKiZj3xGFjtmWhlZAPZmo7j4ncA+c+9WJQTE9Q6F5bDu08aI59O+8xrS1NSbkkLEiRSdFpoX9CbVeGiiIoggvhaJraFWhgBhVXE42HrQcijagUBhXCm6MB0fN1lkofP1KkWveizpZKSy8s/FBpYjJAxdXCuHOUGCKY+X0NY8zQNG6UcVjUIQADmDnK8X6UWZMayAEk6mVx5AAhg0TJX4WL2VCqTMT4atzMT959sjLmWjNqDZJprZNFApoc0xg7mBGTc18sk547TxQ54YJycSLJw/DzkzEt4LF32lMwBlWPA6ZOBpkhsOnImU2oUBYHYowNwh0sGJuw0Lx/6GIk0ifnNJIZpaSly6DgfeGYjDWRfVUKHA5FNZCIcegqKtPJc8qCsQOTDDyYaGwpttK43PsTZJZEokPfUOLkCi9kYipkjTu/KlIwHIk2tRKdYwEtamVUpnVE0irp9tcF7helokWiZA/XA4SiuIOS5nA3JuJeiXop/bf5TP01ZQ2otBRkknJ2haKKGHzZWL9JJMTm2WbZAJiyLCs2qZWhrqYCeitMb3Szcqvx4SP60RpzGgIt+HxTDMhHZhwwMPU6pAJC+/s1K540BmSTMTeXjRGFYWOTEBLxHOQGf5Bm6XyzEizjVaoHZwomTEcEjHu0tU6gRk5tkjE1crSBQ+kzplV/O6wv9mlHxK5hcLycSiETWebatA6lImiXnAaijCe8tT0P4Iii+JS24HcOd2uHWKWKXesE7lNtw2OLXmEbOenNbNpKHx1jQmpEM9AQcGwFm71BIdxWqwnunbfPbWkQAihUxut1mACjzFBdY8HzxeK9XPMuPmWdVpPMIToKGgNE8IkSzsoUHszEfNg+KxTV8vXYOJoR2ZJITfybGRFq+eYbuGKRWGaCU+Y2ds6kUNRL22+Q/POTISRK3El8npMNDmmNF7098bnaSagwwZBA+LpyEpqwiYibYyZwRYz4b3rhATdYJxPZcLPLjLl2NxhQ2Y1iwR2QCKTwAs5IQ0SlgpIaZc7yCkvXRbF0hsJDSQcvfRDogVCjwGBCUV4tqeGqMPmwO+txMeB0BjRm7gqSgQuXxJ9IqonD3UtmuPjp/JgZ+bBjvEQr7DML4gSr85DlKqC05m2Wt035QdLX7WRcikQ0BsIj/sO+fWA8GObAuu/peNsok3SQVkWIPBJB2pUUxRop4zamLnUgRL2jqpiVDSR0o2Iw0TbyniVg5uuCTHKs/klaQcHGhdSaJqJUnMpxdZt+FOn3iImeseXmKPexaj8SlXC89h/SuM/UVFn1zjIOiTaHFcCUy0TUQ/CWZC0mYTL8u3k1LsJE7/vbj3Zf+q5vYbDMSbqLlHP89LSO+wSrcvUNslEwbpaxN7mVLp4PyBJZymBcSGs2TtKiUMkms3kuSkTwjq/m5xKhzUOLPzSbrRIeG21Odjno8ulhPYmIvgXLIAdi0QbUjkdKxKYxGG+KZdzByKgiL3Y0pFHRFhtW6TcJBIlFV7uN6xzIIHxrSGA2AKJf/wPWvB/4NlMAAA=",
"isBase64Encoded": True,
}
lambda_handler(payload, {})
print(lambda_handler(payload, {}))

View File

@ -58,7 +58,6 @@ async def upload(event, context):
]
for filename in filenames:
print(filename)
tasks.append(s3.put_object(
ACL="public-read",
Bucket=BUCKET,

View File

@ -70,7 +70,7 @@ resource "aws_lambda_function" "sqs_to_elk" {
role = aws_iam_role.sqs_to_elk.arn
runtime = "python3.8"
timeout = 5
reserved_concurrent_executions = 30
reserved_concurrent_executions = 100
environment {
variables = {
"ES" = aws_route53_record.Route53RecordSet7.fqdn

View File

@ -81,7 +81,7 @@ resource "aws_lambda_function" "sqs_to_s3" {
role = aws_iam_role.sqs_to_s3.arn
runtime = "python3.8"
timeout = 30
reserved_concurrent_executions = 30
reserved_concurrent_executions = 100
vpc_config {
security_group_ids = ["sg-772f357f"]
subnet_ids = ["subnet-5c34ec6d", "subnet-7b1c3836", "subnet-204b052e", "subnet-de4ddeff", "subnet-408d1c1f", "subnet-a7f460c1"]