diff --git a/lambda/sns_to_mqtt/__init__.py b/lambda/sns_to_mqtt/__init__.py index 6ad4362..4240fa0 100644 --- a/lambda/sns_to_mqtt/__init__.py +++ b/lambda/sns_to_mqtt/__init__.py @@ -23,6 +23,9 @@ logs = boto3.client('logs') sequenceToken = None log_stream_name = str(uuid.uuid4()) +MAX_CACHE = 10000 # how many serials should we cache +cache = {} + def handle_error(message, event, stream_name): global sequenceToken print(message) @@ -125,6 +128,17 @@ def lambda_handler(event, context): qos=0, retain=False ) + if serial not in cache: # low bandwidth feeds with just the first packet + client.publish( + topic=f'{os.getenv("MQTT_PREFIX")}-new/{serial}', + payload=body, + qos=0, + retain=False + ) + cache[serial] = body + # clean up cache if its too long + while len(cache) > MAX_CACHE: + del cache[next(iter(cache))] client.publish( topic=os.getenv("MQTT_BATCH"), payload=json.dumps(payloads),