sondehub-infra/lambda/ham_sqs_to_elk/__init__.py

38 lines
1.4 KiB
Python
Raw Permalink Normal View History

2022-01-12 10:00:42 +00:00
import json
import es
def lambda_handler(event, context):
payloads = {}
for record in event['Records']:
sns_message = json.loads(record["body"])
if type(json.loads(sns_message["Message"])) == dict:
incoming_payloads = [json.loads(sns_message["Message"])]
else:
incoming_payloads = json.loads(sns_message["Message"])
for payload in incoming_payloads:
index = payload['datetime'][:7]
2022-01-16 21:49:52 +00:00
try:
payload['position'] = f'{payload["lat"]},{payload["lon"]}' # make elk happy location
except:
pass
2022-01-12 10:00:42 +00:00
if index not in payloads: # create index if not exists
payloads[index] = []
2022-01-16 21:49:52 +00:00
2022-01-12 10:00:42 +00:00
payloads[index].append(payload)
for index in payloads:
body=""
for payload in payloads[index]:
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
body += "\n"
2022-11-20 02:15:53 +00:00
result = es.request(body, f"ham-telm-{index}/_bulk", "POST")
2022-01-12 10:00:42 +00:00
if 'errors' in result and result['errors'] == True:
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
print(event)
print(result)
error_types = [a for a in error_types if a != 'mapper_parsing_exception'] # filter out mapper failures since they will never succeed
if error_types:
raise RuntimeError