sondehub-infra/lambda/sqs_to_elk/__init__.py

35 lines
1.2 KiB
Python
Raw Normal View History

2021-05-20 08:43:50 +00:00
import json
2021-12-20 06:02:02 +00:00
import es
2022-02-13 00:08:45 +00:00
import zlib
import base64
import datetime
2021-05-20 08:43:50 +00:00
def lambda_handler(event, context):
payloads = []
2021-05-20 08:43:50 +00:00
for record in event['Records']:
sns_message = json.loads(record["body"])
2022-02-13 00:08:45 +00:00
try:
decoded = json.loads(zlib.decompress(base64.b64decode(sns_message["Message"]), 16 + zlib.MAX_WBITS))
except:
decoded = json.loads(sns_message["Message"])
if type(decoded) == dict:
incoming_payloads = [decoded]
else:
2022-02-13 00:08:45 +00:00
incoming_payloads = decoded
year, week = datetime.datetime.now().isocalendar()[:2]
index = f"{year}-{week}"
payloads += incoming_payloads
body=""
for payload in payloads:
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
body += "\n"
2021-05-20 08:43:50 +00:00
2022-11-20 02:15:53 +00:00
result = es.request(body, f"telm-{index}/_bulk", "POST")
if 'errors' in result and result['errors'] == True:
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
print(event)
print(result)
error_types = [a for a in error_types if a != 'mapper_parsing_exception'] # filter out mapper failures since they will never succeed
if error_types:
raise RuntimeError