sondehub-infra/lambda/es/__init__.py
Michaela Wheeler 512e366607 refactor
2021-12-20 17:02:02 +11:00

35 lines
972 B
Python

import boto3
import gzip
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
from io import BytesIO
import json
import os
es_session = URLLib3Session()
ES_HOST = os.getenv("ES")
def request(payload, path, method, params=None):
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(payload.encode('utf-8'))
payload = compressed.getvalue()
headers = {"Host": ES_HOST, "Content-Type": "application/json",
"Content-Encoding": "gzip"}
request = AWSRequest(
method=method, url=f"https://{ES_HOST}/{path}", data=payload, headers=headers, params=params
)
SigV4Auth(boto3.Session().get_credentials(),
"es", "us-east-1").add_auth(request)
r = es_session.send(request.prepare())
if r.status_code != 200 and r.status_code != 201:
raise RuntimeError
return json.loads(r.text)