From d327519d2f6cb85d8b65050734d4d98c9beca102 Mon Sep 17 00:00:00 2001 From: xss Date: Tue, 1 Feb 2022 21:11:33 +1100 Subject: [PATCH] use gzip compression for receiving data from ES --- lambda/es/__init__.py | 14 ++++- lambda/query/__main__.py | 108 ++++++++++++++++----------------- lambda/sns_to_mqtt/__init__.py | 2 +- 3 files changed, 66 insertions(+), 58 deletions(-) diff --git a/lambda/es/__init__.py b/lambda/es/__init__.py index 3bc3548..ba54c73 100644 --- a/lambda/es/__init__.py +++ b/lambda/es/__init__.py @@ -1,3 +1,4 @@ +import zlib import boto3 import gzip from botocore.awsrequest import AWSRequest @@ -6,6 +7,7 @@ from botocore.auth import SigV4Auth from io import BytesIO import json import os +import zlib es_session = URLLib3Session() ES_HOST = os.getenv("ES") @@ -18,7 +20,7 @@ def request(payload, path, method, params=None): payload = compressed.getvalue() headers = {"Host": ES_HOST, "Content-Type": "application/json", - "Content-Encoding": "gzip"} + "Content-Encoding": "gzip", 'Accept-Encoding': 'gzip'} request = AWSRequest( method=method, url=f"https://{ES_HOST}/{path}", data=payload, headers=headers, params=params @@ -30,5 +32,11 @@ def request(payload, path, method, params=None): if r.status_code != 200 and r.status_code != 201: raise RuntimeError - - return json.loads(r.text) + + if ( + 'Content-Encoding' in r.headers + and r.headers['Content-Encoding'] == 'gzip' + ): + return json.loads(zlib.decompress(r.content, 16 + zlib.MAX_WBITS)) + else: + return json.loads(r.text) diff --git a/lambda/query/__main__.py b/lambda/query/__main__.py index ea54452..a01907b 100644 --- a/lambda/query/__main__.py +++ b/lambda/query/__main__.py @@ -1,49 +1,49 @@ from . import * #print(get_listener_telemetry({"queryStringParameters":{}}, {})) -print(telm_stats({ - "version": "2.0", - "routeKey": "GET /sondes", - "rawPath": "/sondes", - "rawQueryString": "lat=49.827648&lon=6.106842&distance=400000&last=-60", - "headers": { - "cache-control": "no-cache", - "content-length": "0", - "host": "api-raw.v2.sondehub.org", - "user-agent": "Amazon CloudFront", - "via": "1.1 ee4db0d243ceb0d1993e5f46ad6c0f01.cloudfront.net (CloudFront)", - "x-amz-cf-id": "KF68O6r-OP5oTosFLdix7-RWM6xeW08ZF48fgvwLkj9f3s4fJuCFKg==", - "x-amzn-trace-id": "Root=1-61d14df5-0f9dbfe563e89f170e65a3bf", - "x-forwarded-for": "94.252.35.58, 64.252.86.150", - "x-forwarded-port": "443", - "x-forwarded-proto": "https" - }, - "queryStringParameters": { - "distance": "400000", - "last": "-60", - "lat": "49.827648", - "lon": "6.106842" - }, - "requestContext": { - "accountId": "143841941773", - "apiId": "r03szwwq41", - "domainName": "api-raw.v2.sondehub.org", - "domainPrefix": "api-raw", - "http": { - "method": "GET", - "path": "/sondes", - "protocol": "HTTP/1.1", - "sourceIp": "94.252.35.58", - "userAgent": "Amazon CloudFront" - }, - "requestId": "LTkeXjgXIAMEVzw=", - "routeKey": "GET /sondes", - "stage": "$default", - "time": "02/Jan/2022:07:02:13 +0000", - "timeEpoch": 1641106933368 - }, - "isBase64Encoded": False -}, None)) +# print(telm_stats({ +# "version": "2.0", +# "routeKey": "GET /sondes", +# "rawPath": "/sondes", +# "rawQueryString": "lat=49.827648&lon=6.106842&distance=400000&last=-60", +# "headers": { +# "cache-control": "no-cache", +# "content-length": "0", +# "host": "api-raw.v2.sondehub.org", +# "user-agent": "Amazon CloudFront", +# "via": "1.1 ee4db0d243ceb0d1993e5f46ad6c0f01.cloudfront.net (CloudFront)", +# "x-amz-cf-id": "KF68O6r-OP5oTosFLdix7-RWM6xeW08ZF48fgvwLkj9f3s4fJuCFKg==", +# "x-amzn-trace-id": "Root=1-61d14df5-0f9dbfe563e89f170e65a3bf", +# "x-forwarded-for": "94.252.35.58, 64.252.86.150", +# "x-forwarded-port": "443", +# "x-forwarded-proto": "https" +# }, +# "queryStringParameters": { +# "distance": "400000", +# "last": "-60", +# "lat": "49.827648", +# "lon": "6.106842" +# }, +# "requestContext": { +# "accountId": "143841941773", +# "apiId": "r03szwwq41", +# "domainName": "api-raw.v2.sondehub.org", +# "domainPrefix": "api-raw", +# "http": { +# "method": "GET", +# "path": "/sondes", +# "protocol": "HTTP/1.1", +# "sourceIp": "94.252.35.58", +# "userAgent": "Amazon CloudFront" +# }, +# "requestId": "LTkeXjgXIAMEVzw=", +# "routeKey": "GET /sondes", +# "stage": "$default", +# "time": "02/Jan/2022:07:02:13 +0000", +# "timeEpoch": 1641106933368 +# }, +# "isBase64Encoded": False +# }, None)) # mode: 6hours # type: positions # format: json @@ -96,14 +96,14 @@ print(telm_stats({ # {}, # ) # ) -# print( -# get_telem( -# { -# "queryStringParameters":{ -# # "serial": "S3210639", -# "duration": "3h", -# # "datetime": "2021-07-26T06:49:29.001000Z" -# } -# }, {} -# ) -# ) +print( + get_telem( + { + "queryStringParameters":{ + # "serial": "S3210639", + "duration": "3h", + # "datetime": "2021-07-26T06:49:29.001000Z" + } + }, {} + ) +) diff --git a/lambda/sns_to_mqtt/__init__.py b/lambda/sns_to_mqtt/__init__.py index fbb7111..4cc7ee9 100644 --- a/lambda/sns_to_mqtt/__init__.py +++ b/lambda/sns_to_mqtt/__init__.py @@ -47,7 +47,7 @@ def lambda_handler(event, context): incoming_payloads = json.loads(sns_message["Message"]) #send only the first, last and every 5th packet - payloads = [incoming_payloads[0]] + incoming_payloads[1:-1:5][1:] + [incoming_payloads[-1]] + payloads = [incoming_payloads[-1]] for payload in payloads: body = json.dumps(payload)