mirror of
https://github.com/projecthorus/sondehub-infra.git
synced 2024-12-26 08:11:08 +00:00
52 lines
1.7 KiB
Python
52 lines
1.7 KiB
Python
import json
|
|
from botocore.awsrequest import AWSRequest
|
|
from botocore.endpoint import URLLib3Session
|
|
from botocore.auth import SigV4Auth
|
|
import boto3
|
|
import botocore.credentials
|
|
import os
|
|
|
|
HOST = os.getenv("ES")
|
|
|
|
def es_request(payload, path, method):
|
|
# get aws creds
|
|
session = boto3.Session()
|
|
|
|
headers = {"Host": HOST, "Content-Type": "application/json"}
|
|
request = AWSRequest(
|
|
method="POST", url=f"https://{HOST}/{path}", data=payload, headers=headers
|
|
)
|
|
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
|
|
|
|
session = URLLib3Session()
|
|
r = session.send(request.prepare())
|
|
return json.loads(r.text)
|
|
|
|
|
|
def lambda_handler(event, context):
|
|
payloads = {}
|
|
for record in event['Records']:
|
|
sns_message = json.loads(record["body"])
|
|
payload = json.loads(sns_message["Message"])
|
|
index = payload['datetime'][:7]
|
|
|
|
if index not in payloads: # create index if not exists
|
|
payloads[index] = []
|
|
|
|
payloads[index].append(payload)
|
|
|
|
for index in payloads:
|
|
body=""
|
|
for payload in payloads[index]:
|
|
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
|
|
body += "\n"
|
|
|
|
result = es_request(body, f"telm-{index}/_doc/_bulk", "POST")
|
|
if 'errors' in result and result['errors'] == True:
|
|
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
|
|
error_types = [a for a in error_types if a != 'mapper_parsing_exception'] # filter out mapper failures since they will never succeed
|
|
if error_types:
|
|
print(event)
|
|
print(result)
|
|
raise RuntimeError
|
|
|