84 lines
2.3 KiB
Python
Raw Normal View History

2021-12-20 17:02:02 +11:00
import sys
2022-01-17 10:19:56 +11:00
sys.path.append("sns_to_mqtt/vendor")
2021-12-20 17:02:02 +11:00
import json
import os
import paho.mqtt.client as mqtt
import time
import random
2022-02-13 11:08:45 +11:00
import zlib
import base64
2021-12-20 17:02:02 +11:00
client = mqtt.Client(transport="websockets")
connected_flag = False
import socket
socket.setdefaulttimeout(1)
2021-12-20 17:02:02 +11:00
def connect():
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
#client.tls_set()
2021-12-20 17:02:02 +11:00
client.username_pw_set(username=os.getenv("MQTT_USERNAME"), password=os.getenv("MQTT_PASSWORD"))
HOSTS = os.getenv("MQTT_HOST").split(",")
HOST = random.choice(HOSTS)
print(f"Connecting to {HOST}")
client.connect(HOST, 8080, 5)
2021-12-20 17:02:02 +11:00
client.loop_start()
print("loop started")
def on_disconnect(client, userdata, rc):
global connected_flag
print("disconnected")
connected_flag=False #set flag
def on_connect(client, userdata, flags, rc):
global connected_flag
if rc==0:
print("connected")
connected_flag=True #set flag
else:
print("Bad connection Returned code=",rc)
def on_publish(client, userdata, mid):
pass
connect()
def lambda_handler(event, context):
client.loop(timeout=0.05, max_packets=1) # make sure it reconnects
for record in event['Records']:
sns_message = record["Sns"]
2022-02-13 11:08:45 +11:00
try:
decoded = json.loads(zlib.decompress(base64.b64decode(sns_message["Message"]), 16 + zlib.MAX_WBITS))
except:
decoded = json.loads(sns_message["Message"])
if type(decoded) == dict:
incoming_payloads = [decoded]
2021-12-20 17:02:02 +11:00
else:
2022-02-13 11:08:45 +11:00
incoming_payloads = decoded
2021-12-20 17:02:02 +11:00
2022-04-13 12:50:51 +10:00
payloads = incoming_payloads
2021-12-20 17:02:02 +11:00
for payload in payloads:
body = json.dumps(payload)
2022-01-17 10:19:56 +11:00
serial = payload[os.getenv("MQTT_ID")]
2021-12-20 17:02:02 +11:00
while not connected_flag:
time.sleep(0.01) # wait until connected
client.publish(
2022-01-17 10:19:56 +11:00
topic=f'{os.getenv("MQTT_PREFIX")}/{serial}',
2021-12-20 17:02:02 +11:00
payload=body,
qos=0,
retain=False
)
client.publish(
topic=os.getenv("MQTT_BATCH"),
2021-12-20 17:02:02 +11:00
payload=json.dumps(payloads),
qos=0,
retain=False
)
time.sleep(0.05) # give paho mqtt 100ms to send messages this could be improved on but paho mqtt is a pain to interface with