mirror of
https://github.com/projecthorus/sondehub-infra.git
synced 2024-12-19 05:07:55 +00:00
Batch up payloads in sns messages to reduce SQS and SNS spend
This commit is contained in:
parent
27824549af
commit
81b85fb05e
@ -14,19 +14,7 @@ import threading
|
||||
from email.utils import parsedate
|
||||
import os
|
||||
import asyncio
|
||||
import aioboto3
|
||||
|
||||
import asyncio
|
||||
from functools import wraps, partial
|
||||
|
||||
def async_wrap(func):
|
||||
@wraps(func)
|
||||
async def run(*args, loop=None, executor=None, **kwargs):
|
||||
if loop is None:
|
||||
loop = asyncio.get_event_loop()
|
||||
pfunc = partial(func, *args, **kwargs)
|
||||
return await loop.run_in_executor(executor, pfunc)
|
||||
return run
|
||||
|
||||
# this needs a bunch of refactor but the general approach is
|
||||
# connect to mqtt via websockets during init
|
||||
@ -48,9 +36,6 @@ event_loop_group = io.EventLoopGroup(1)
|
||||
host_resolver = io.DefaultHostResolver(event_loop_group)
|
||||
|
||||
|
||||
#io.init_logging(io.LogLevel.Error, "stderr")
|
||||
|
||||
|
||||
def connect():
|
||||
global connect_future, mqtt_connection
|
||||
session = boto3.session.Session()
|
||||
@ -75,7 +60,6 @@ connect()
|
||||
sns = boto3.client("sns",region_name="us-east-1")
|
||||
sns.meta.events.register('request-created.dynamodb', set_connection_header)
|
||||
|
||||
@async_wrap
|
||||
def post(payload):
|
||||
sns.publish(
|
||||
TopicArn=os.getenv("SNS_TOPIC"),
|
||||
@ -83,68 +67,68 @@ def post(payload):
|
||||
)
|
||||
|
||||
async def upload(event, context):
|
||||
global connect_future, mqtt_connection
|
||||
|
||||
tasks = []
|
||||
# Future.result() waits until a result is available
|
||||
global connect_future, mqtt_connection
|
||||
|
||||
tasks = []
|
||||
# Future.result() waits until a result is available
|
||||
try:
|
||||
connect_future.result()
|
||||
except:
|
||||
connect()
|
||||
|
||||
if "isBase64Encoded" in event and event["isBase64Encoded"] == True:
|
||||
event["body"] = base64.b64decode(event["body"])
|
||||
if (
|
||||
"content-encoding" in event["headers"]
|
||||
and event["headers"]["content-encoding"] == "gzip"
|
||||
):
|
||||
event["body"] = zlib.decompress(event["body"], 16 + zlib.MAX_WBITS)
|
||||
time_delta = None
|
||||
if "date" in event["headers"]:
|
||||
try:
|
||||
connect_future.result()
|
||||
time_delta_header = event["headers"]["date"]
|
||||
time_delta = (
|
||||
datetime.datetime(*parsedate(time_delta_header)[:7])
|
||||
- datetime.datetime.utcnow()
|
||||
).total_seconds()
|
||||
except:
|
||||
pass
|
||||
payloads = json.loads(event["body"])
|
||||
to_sns = []
|
||||
first = False
|
||||
for payload in payloads:
|
||||
if "user-agent" in event["headers"]:
|
||||
event["time_server"] = datetime.datetime.now().isoformat()
|
||||
payload["user-agent"] = event["headers"]["user-agent"]
|
||||
payload["position"] = f'{payload["lat"]},{payload["lon"]}'
|
||||
if time_delta:
|
||||
payload["upload_time_delta"] = time_delta
|
||||
if "uploader_position" in payload:
|
||||
if not payload["uploader_position"]:
|
||||
payload.pop("uploader_position")
|
||||
else:
|
||||
(payload["uploader_alt"], payload["uploader_position"]) = (
|
||||
payload["uploader_position"][2],
|
||||
f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}",
|
||||
)
|
||||
to_sns.append(payload)
|
||||
(msg, x) = mqtt_connection.publish(
|
||||
topic=f'sondes/{payload["serial"]}',
|
||||
payload=json.dumps(payload),
|
||||
qos=mqtt.QoS.AT_MOST_ONCE,
|
||||
)
|
||||
try:
|
||||
msg.result()
|
||||
except (RuntimeError, AwsCrtError):
|
||||
connect()
|
||||
|
||||
if "isBase64Encoded" in event and event["isBase64Encoded"] == True:
|
||||
event["body"] = base64.b64decode(event["body"])
|
||||
if (
|
||||
"content-encoding" in event["headers"]
|
||||
and event["headers"]["content-encoding"] == "gzip"
|
||||
):
|
||||
event["body"] = zlib.decompress(event["body"], 16 + zlib.MAX_WBITS)
|
||||
time_delta = None
|
||||
if "date" in event["headers"]:
|
||||
try:
|
||||
time_delta_header = event["headers"]["date"]
|
||||
time_delta = (
|
||||
datetime.datetime(*parsedate(time_delta_header)[:7])
|
||||
- datetime.datetime.utcnow()
|
||||
).total_seconds()
|
||||
except:
|
||||
pass
|
||||
payloads = json.loads(event["body"])
|
||||
|
||||
first = False
|
||||
for payload in payloads:
|
||||
if "user-agent" in event["headers"]:
|
||||
event["time_server"] = datetime.datetime.now().isoformat()
|
||||
payload["user-agent"] = event["headers"]["user-agent"]
|
||||
payload["position"] = f'{payload["lat"]},{payload["lon"]}'
|
||||
if time_delta:
|
||||
payload["upload_time_delta"] = time_delta
|
||||
if "uploader_position" in payload:
|
||||
if not payload["uploader_position"]:
|
||||
payload.pop("uploader_position")
|
||||
else:
|
||||
(payload["uploader_alt"], payload["uploader_position"]) = (
|
||||
payload["uploader_position"][2],
|
||||
f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}",
|
||||
)
|
||||
tasks.append(post(payload))
|
||||
(msg, x) = mqtt_connection.publish(
|
||||
topic=f'sondes/{payload["serial"]}',
|
||||
payload=json.dumps(payload),
|
||||
qos=mqtt.QoS.AT_MOST_ONCE,
|
||||
)
|
||||
try:
|
||||
msg.result()
|
||||
except (RuntimeError, AwsCrtError):
|
||||
connect()
|
||||
(msg, x) = mqtt_connection.publish(
|
||||
topic=f'sondes/{payload["serial"]}',
|
||||
payload=json.dumps(payload),
|
||||
qos=mqtt.QoS.AT_MOST_ONCE,
|
||||
)
|
||||
msg.result()
|
||||
|
||||
await asyncio.gather(*tasks)
|
||||
msg.result()
|
||||
|
||||
post(to_sns)
|
||||
def lambda_handler(event, context):
|
||||
asyncio.run(upload(event, context))
|
||||
return {"statusCode": 200, "body": "^v^ telm logged"}
|
||||
|
@ -45,25 +45,29 @@ async def upload(event, context):
|
||||
payloads = {}
|
||||
for record in event['Records']:
|
||||
sns_message = json.loads(record["body"])
|
||||
payload = json.loads(sns_message["Message"])
|
||||
if type(json.loads(sns_message["Message"])) == dict:
|
||||
incoming_payloads = [json.loads(sns_message["Message"])]
|
||||
else:
|
||||
incoming_payloads = json.loads(sns_message["Message"])
|
||||
for payload in incoming_payloads:
|
||||
|
||||
body = json.dumps(payload)
|
||||
id = str(uuid.uuid4())
|
||||
hash = hashlib.sha256(values_to_hash(payload).encode("utf-8")).hexdigest()
|
||||
|
||||
body = json.dumps(payload)
|
||||
id = str(uuid.uuid4())
|
||||
hash = hashlib.sha256(values_to_hash(payload).encode("utf-8")).hexdigest()
|
||||
|
||||
filenames = [
|
||||
f"date/{payload['datetime']}-{payload['serial']}-{id}.json",
|
||||
f"serial/{payload['serial']}/{payload['datetime']}-{id}.json",
|
||||
f"serial-hashed/{payload['serial']}/{payload['datetime']}-{hash}.json"
|
||||
]
|
||||
|
||||
for filename in filenames:
|
||||
tasks.append(s3.put_object(
|
||||
ACL="public-read",
|
||||
Bucket=BUCKET,
|
||||
Body=body,
|
||||
Key=filename
|
||||
))
|
||||
filenames = [
|
||||
f"date/{payload['datetime']}-{payload['serial']}-{id}.json",
|
||||
f"serial/{payload['serial']}/{payload['datetime']}-{id}.json",
|
||||
f"serial-hashed/{payload['serial']}/{payload['datetime']}-{hash}.json"
|
||||
]
|
||||
|
||||
for filename in filenames:
|
||||
tasks.append(s3.put_object(
|
||||
ACL="public-read",
|
||||
Bucket=BUCKET,
|
||||
Body=body,
|
||||
Key=filename
|
||||
))
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
|
||||
@ -79,7 +83,7 @@ if __name__ == "__main__":
|
||||
{
|
||||
"messageId": "262d4090-e23b-4907-b677-3c94334dc899",
|
||||
"receiptHandle": "AQEBL1FXHS4m+Om59KZH9ayxC5VBqDEDh6DgXUZuBhV2uQJS312bhOTpLvptuCCIWaeLkfHU+7NajqV2kTVhnz5lehE/zfQ8OU1jqqm+cHxyul99MxA7K7+C+ww2Ri9KSbgaAgqvZzcLbwpW8rP0MNhrBcIQAE5Pz1urfTZKx1RVnv/XQHbR2ARPwocOzk2yEexa0y2f7FedS4F10gju8Ypp0Zr4DSRb1zUkES3QJGiSJakaO1QJT5npRySjAd0CUSPXw7IDTejolfGkItQG5eMRx0enELTUDv8LPsHJkr7ha3DHNfbvxTtdk406nWFn8U8DW515emp7+Y+AD469OnceIMdVC62GHwrpMkedXzLEH0C8TOXHQ+WuRkhR1dauwKqO",
|
||||
"body": "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"65147554-e06d-5324-a87d-2da107fea807\",\n \"TopicArn\" : \"arn:aws:sns:us-east-1:143841941773:sonde-telem\",\n \"Message\" : \"{\\\"software_name\\\":\\\"radiosonde_auto_rx\\\",\\\"software_version\\\":\\\"1.5.1\\\",\\\"uploader_callsign\\\":\\\"BIOWL1\\\",\\\"uploader_position\\\":\\\"52.014417,8.47351\\\",\\\"uploader_antenna\\\":\\\"SirioCX395\\\",\\\"time_received\\\":\\\"2021-04-18T07:52:37.196266Z\\\",\\\"datetime\\\":\\\"2021-04-18T07:52:53.001000Z\\\",\\\"manufacturer\\\":\\\"Vaisala\\\",\\\"type\\\":\\\"RS41\\\",\\\"serial\\\":\\\"meowmeowtest\\\",\\\"subtype\\\":\\\"RS41-SGP\\\",\\\"frame\\\":12781,\\\"lat\\\":50.65064,\\\"lon\\\":6.60805,\\\"alt\\\":2954.44289,\\\"temp\\\":-9.3,\\\"humidity\\\":75.4,\\\"pressure\\\":709.79,\\\"vel_v\\\":-2.85326,\\\"vel_h\\\":8.53055,\\\"heading\\\":236.0122,\\\"sats\\\":9,\\\"batt\\\":2.7,\\\"frequency\\\":405.3,\\\"burst_timer\\\":25423,\\\"snr\\\":12.5,\\\"user-agent\\\":\\\"Amazon CloudFront\\\",\\\"position\\\":\\\"50.65064,6.60805\\\",\\\"upload_time_delta\\\":-0.713689,\\\"uploader_alt\\\":340}\",\n \"Timestamp\" : \"2021-04-18T07:52:51.776Z\",\n \"SignatureVersion\" : \"1\",\n \"Signature\" : \"qXuYwDAGPYYLjKXfDtF69AWKDEhhz9MXlqxO2nBwJ/dgOqNSUZtDPqOYSuge3jVCoTSRY5qGw38gg2G+JnEbJd8SVvp9GRsFre8MKWu8T0obq3rj8S0YAh7dTqi4EILIMmi2KziasCDQlrVuZvCSgPnC+hYF3GByI626QW6m3a4E2igclvbE+O6x6qvVDKwmf/eh+8LRiH1PCrEckiXthnr+qOCiTcstyZoOqMOShJBun9k0DK07+Yf1tYDPSHnqZSIaOvAMSjIKKXfGCkel3SWieO7Zgk7xQuo9Z1bcV8Miu4uEvge4G9HKU3S41zaVcQjYvEhQLxxgd1x3HxXImA==\",\n \"SigningCertURL\" : \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService-010a507c1833636cd94bdb98bd93083a.pem\",\n \"UnsubscribeURL\" : \"https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:143841941773:sonde-telem:1a52ac41-6e17-43da-bfb6-114577c94ca6\"\n}",
|
||||
"body": "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"65147554-e06d-5324-a87d-2da107fea807\",\n \"TopicArn\" : \"arn:aws:sns:us-east-1:143841941773:sonde-telem\",\n \"Message\" : \"[{\\\"software_name\\\":\\\"radiosonde_auto_rx\\\",\\\"software_version\\\":\\\"1.5.1\\\",\\\"uploader_callsign\\\":\\\"BIOWL1\\\",\\\"uploader_position\\\":\\\"52.014417,8.47351\\\",\\\"uploader_antenna\\\":\\\"SirioCX395\\\",\\\"time_received\\\":\\\"2021-04-18T07:52:37.196266Z\\\",\\\"datetime\\\":\\\"2021-04-18T07:52:53.001000Z\\\",\\\"manufacturer\\\":\\\"Vaisala\\\",\\\"type\\\":\\\"RS41\\\",\\\"serial\\\":\\\"meowmeowtest\\\",\\\"subtype\\\":\\\"RS41-SGP\\\",\\\"frame\\\":12781,\\\"lat\\\":50.65064,\\\"lon\\\":6.60805,\\\"alt\\\":2954.44289,\\\"temp\\\":-9.3,\\\"humidity\\\":75.4,\\\"pressure\\\":709.79,\\\"vel_v\\\":-2.85326,\\\"vel_h\\\":8.53055,\\\"heading\\\":236.0122,\\\"sats\\\":9,\\\"batt\\\":2.7,\\\"frequency\\\":405.3,\\\"burst_timer\\\":25423,\\\"snr\\\":12.5,\\\"user-agent\\\":\\\"Amazon CloudFront\\\",\\\"position\\\":\\\"50.65064,6.60805\\\",\\\"upload_time_delta\\\":-0.713689,\\\"uploader_alt\\\":340}]\",\n \"Timestamp\" : \"2021-04-18T07:52:51.776Z\",\n \"SignatureVersion\" : \"1\",\n \"Signature\" : \"qXuYwDAGPYYLjKXfDtF69AWKDEhhz9MXlqxO2nBwJ/dgOqNSUZtDPqOYSuge3jVCoTSRY5qGw38gg2G+JnEbJd8SVvp9GRsFre8MKWu8T0obq3rj8S0YAh7dTqi4EILIMmi2KziasCDQlrVuZvCSgPnC+hYF3GByI626QW6m3a4E2igclvbE+O6x6qvVDKwmf/eh+8LRiH1PCrEckiXthnr+qOCiTcstyZoOqMOShJBun9k0DK07+Yf1tYDPSHnqZSIaOvAMSjIKKXfGCkel3SWieO7Zgk7xQuo9Z1bcV8Miu4uEvge4G9HKU3S41zaVcQjYvEhQLxxgd1x3HxXImA==\",\n \"SigningCertURL\" : \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService-010a507c1833636cd94bdb98bd93083a.pem\",\n \"UnsubscribeURL\" : \"https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:143841941773:sonde-telem:1a52ac41-6e17-43da-bfb6-114577c94ca6\"\n}",
|
||||
"attributes": {
|
||||
"ApproximateReceiveCount": "2",
|
||||
"SentTimestamp": "1618732371814",
|
||||
|
@ -27,13 +27,17 @@ def lambda_handler(event, context):
|
||||
payloads = {}
|
||||
for record in event['Records']:
|
||||
sns_message = json.loads(record["body"])
|
||||
payload = json.loads(sns_message["Message"])
|
||||
index = payload['datetime'][:7]
|
||||
|
||||
if index not in payloads: # create index if not exists
|
||||
payloads[index] = []
|
||||
if type(json.loads(sns_message["Message"])) == dict:
|
||||
incoming_payloads = [json.loads(sns_message["Message"])]
|
||||
else:
|
||||
incoming_payloads = json.loads(sns_message["Message"])
|
||||
for payload in incoming_payloads:
|
||||
index = payload['datetime'][:7]
|
||||
|
||||
payloads[index].append(payload)
|
||||
if index not in payloads: # create index if not exists
|
||||
payloads[index] = []
|
||||
|
||||
payloads[index].append(payload)
|
||||
|
||||
for index in payloads:
|
||||
body=""
|
||||
@ -49,4 +53,3 @@ def lambda_handler(event, context):
|
||||
print(event)
|
||||
print(result)
|
||||
raise RuntimeError
|
||||
|
Loading…
Reference in New Issue
Block a user