diff --git a/main.tf b/main.tf index 0d75ba7..5ac6da2 100644 --- a/main.tf +++ b/main.tf @@ -493,7 +493,7 @@ resource "aws_elasticsearch_domain" "ElasticsearchDomain" { elasticsearch_version = "7.9" cluster_config { dedicated_master_count = 3 - dedicated_master_enabled = true + dedicated_master_enabled = false dedicated_master_type = "t3.small.elasticsearch" instance_count = 2 instance_type = "m5.large.elasticsearch" diff --git a/sonde-api-to-iot-core/lambda_function.py b/sonde-api-to-iot-core/lambda_function.py index d97c8b5..3c66dad 100644 --- a/sonde-api-to-iot-core/lambda_function.py +++ b/sonde-api-to-iot-core/lambda_function.py @@ -36,27 +36,37 @@ patch_all() event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) -client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) -credentials_provider = auth.AwsCredentialsProvider.new_default_chain(client_bootstrap) + io.init_logging(io.LogLevel.Error, 'stderr') -session = boto3.session.Session() -mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing( -endpoint="a2sgq5szfqum7p-ats.iot.us-east-1.amazonaws.com", -client_bootstrap=client_bootstrap, -region="us-east-1", -credentials_provider=credentials_provider, -client_id=str(uuid.uuid4()), -clean_session=False, -keep_alive_secs=6) -connect_future = mqtt_connection.connect() + +def connect(): + global connect_future, mqtt_connection + session = boto3.session.Session() + client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) + credentials_provider = auth.AwsCredentialsProvider.new_default_chain(client_bootstrap) + mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing( + endpoint="a2sgq5szfqum7p-ats.iot.us-east-1.amazonaws.com", + client_bootstrap=client_bootstrap, + region="us-east-1", + credentials_provider=credentials_provider, + client_id=str(uuid.uuid4()), + clean_session=False, + keep_alive_secs=6) + connect_future = mqtt_connection.connect() + connect_future.result() + +connect() def lambda_handler(event, context): - global connect_future, mqtt_connection + # Future.result() waits until a result is available - connect_future.result() + try: + connect_future.result() + except: + connect() print(json.dumps(event)) if "isBase64Encoded" in event and event["isBase64Encoded"] == True: @@ -84,7 +94,8 @@ def lambda_handler(event, context): if "uploader_position" in payload: if not payload["uploader_position"]: payload.pop("uploader_position") - (payload["uploader_alt"], payload["uploader_position"]) = payload["uploader_position"][2], f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}" + else: + (payload["uploader_alt"], payload["uploader_position"]) = payload["uploader_position"][2], f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}" (msg, x) = mqtt_connection.publish( topic=f'sondes/{payload["serial"]}', payload=json.dumps(payload), @@ -92,20 +103,11 @@ def lambda_handler(event, context): try: msg.result() except (RuntimeError, AwsCrtError): - mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing( - endpoint="a2sgq5szfqum7p-ats.iot.us-east-1.amazonaws.com", - client_bootstrap=client_bootstrap, - region="us-east-1", - credentials_provider=credentials_provider, - client_id=str(uuid.uuid4()), - clean_session=False, - keep_alive_secs=6) - connect_future = mqtt_connection.connect() - connect_future.result() + connect() (msg, x) = mqtt_connection.publish( topic=f'sondes/{payload["serial"]}', payload=json.dumps(payload), - qos=mqtt.QoS.AT_LEAST_ONCE) + qos=mqtt.QoS.AT_MOST_ONCE) msg.result()