2021-05-20 08:43:50 +00:00
|
|
|
import json
|
2021-12-20 06:02:02 +00:00
|
|
|
import es
|
2022-02-13 00:08:45 +00:00
|
|
|
import zlib
|
|
|
|
import base64
|
2021-05-20 08:43:50 +00:00
|
|
|
|
|
|
|
|
|
|
|
def lambda_handler(event, context):
|
|
|
|
payloads = {}
|
|
|
|
for record in event['Records']:
|
|
|
|
sns_message = json.loads(record["body"])
|
2022-02-13 00:08:45 +00:00
|
|
|
try:
|
|
|
|
decoded = json.loads(zlib.decompress(base64.b64decode(sns_message["Message"]), 16 + zlib.MAX_WBITS))
|
|
|
|
except:
|
|
|
|
decoded = json.loads(sns_message["Message"])
|
|
|
|
if type(decoded) == dict:
|
|
|
|
incoming_payloads = [decoded]
|
2021-05-24 10:44:29 +00:00
|
|
|
else:
|
2022-02-13 00:08:45 +00:00
|
|
|
incoming_payloads = decoded
|
2021-05-24 10:44:29 +00:00
|
|
|
for payload in incoming_payloads:
|
|
|
|
index = payload['datetime'][:7]
|
2021-05-20 08:43:50 +00:00
|
|
|
|
2021-05-24 10:44:29 +00:00
|
|
|
if index not in payloads: # create index if not exists
|
|
|
|
payloads[index] = []
|
|
|
|
|
|
|
|
payloads[index].append(payload)
|
2021-05-20 08:43:50 +00:00
|
|
|
|
|
|
|
for index in payloads:
|
|
|
|
body=""
|
|
|
|
for payload in payloads[index]:
|
|
|
|
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
|
|
|
|
body += "\n"
|
|
|
|
|
2021-12-20 06:02:02 +00:00
|
|
|
result = es.request(body, f"telm-{index}/_doc/_bulk", "POST")
|
2021-05-20 08:43:50 +00:00
|
|
|
if 'errors' in result and result['errors'] == True:
|
|
|
|
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
|
2021-12-30 20:53:00 +00:00
|
|
|
print(event)
|
|
|
|
print(result)
|
2021-05-20 08:43:50 +00:00
|
|
|
error_types = [a for a in error_types if a != 'mapper_parsing_exception'] # filter out mapper failures since they will never succeed
|
|
|
|
if error_types:
|
2021-09-12 13:25:34 +00:00
|
|
|
raise RuntimeError
|