From a1d304457f9e5a1ae5331e1222d606762583b6ef Mon Sep 17 00:00:00 2001 From: xss Date: Sun, 6 Mar 2022 10:17:35 +1100 Subject: [PATCH] History hacks --- lambda/historic_es_to_s3/__init__.py | 51 ++++++++++++++++------------ lambda/historic_es_to_s3/__main__.py | 2 +- sqs_to_elk.tf | 3 +- 3 files changed, 32 insertions(+), 24 deletions(-) diff --git a/lambda/historic_es_to_s3/__init__.py b/lambda/historic_es_to_s3/__init__.py index e484f33..1f149f2 100644 --- a/lambda/historic_es_to_s3/__init__.py +++ b/lambda/historic_es_to_s3/__init__.py @@ -8,7 +8,7 @@ BUCKET = "sondehub-history" s3 = boto3.resource('s3') -def fetch_es(serial): +def fetch_es(serial, s3_data): payload = { "size": 10000, "sort": [ @@ -36,7 +36,7 @@ def fetch_es(serial): response = es.request(json.dumps(payload), "telm-*/_search", "POST", params={"scroll": "1m"}) try: - data += [x["_source"] for x in response['hits']['hits']] + add_unique(s3_data,[x["_source"] for x in response['hits']['hits']]) except: print(response) raise @@ -47,7 +47,7 @@ def fetch_es(serial): "_search/scroll", "POST") scroll_id = response['_scroll_id'] scroll_ids.append(scroll_id) - data += [x["_source"] for x in response['hits']['hits']] + add_unique(s3_data,[x["_source"] for x in response['hits']['hits']]) for scroll_id in scroll_ids: try: scroll_delete = es.request(json.dumps({"scroll_id": scroll_id }), @@ -55,7 +55,10 @@ def fetch_es(serial): print(scroll_delete) except RuntimeError: pass - return data + +def add_unique(s3_data, es_r): + for row in es_r: + s3_data.add(json.dumps(row, sort_keys=True)) def fetch_s3(serial): try: @@ -162,15 +165,14 @@ def fetch_launch_sites(): def write_s3(serial, data, launch_sites): #get max alt + append_data = "" if serial in launch_sites: - for x in data: - x["launch_site"] = launch_sites[serial]["launch_site"] - x["launch_site_range_estimate"] = launch_sites[serial]["launch_site_range_estimate"] - max_alt = sorted(data, key=lambda k: int(k['alt']))[-1] + append_data = f"\"launch_site\": {launch_sites[serial]['launch_site']}, \"launch_site_range_estimate\": {launch_sites[serial]['launch_site_range_estimate']}" + max_alt = sorted(data, key=lambda k: json.loads(k)['alt'])[-1] summary = [ - data[0], - max_alt, - data[-1] + json.loads(data[0]), + json.loads(max_alt), + json.loads(data[-1]) ] metadata = { "first-lat": str(summary[0]['lat']), @@ -186,7 +188,7 @@ def write_s3(serial, data, launch_sites): if serial in launch_sites: metadata["launch_site"] = launch_sites[serial]["launch_site"] - dates = set([x['datetime'].split("T")[0].replace("-","/") for x in data]) + dates = set([json.loads(x)['datetime'].split("T")[0].replace("-","/") for x in data]) for date in dates: object = s3.Object(BUCKET,f'date/{date}/{serial}.json') @@ -201,18 +203,21 @@ def write_s3(serial, data, launch_sites): Body=json.dumps(summary).encode("utf-8"), Metadata=metadata ) - - gz_data = gzip.compress(json.dumps(data).encode('utf-8')) + output = "[" + if append_data: + data = [ x[:-1] + ", " + append_data + "}" for x in data ] + output += ",".join(data) + output += "]" + data = gzip.compress(output.encode('utf-8')) object = s3.Object(BUCKET,f'serial/{serial}.json.gz') object.put( - Body=gz_data, + Body=data, ContentType='application/json', ContentEncoding='gzip', Metadata=metadata ) - def handler(event, context): print(json.dumps(event)) payloads = {} @@ -221,13 +226,15 @@ def handler(event, context): serial = record["body"] print(f"Getting {serial} S3") s3_data = fetch_s3(serial) + s3_data = {json.dumps(x, sort_keys=True) for x in s3_data} # convert to a set print(f"Getting {serial} ES") - es = fetch_es(serial) - print(f"Combining data {serial}") - data = s3_data + es - data = [dict(t) for t in {tuple(d.items()) for d in data}] - data = sorted(data, key=lambda k: k['datetime']) # sort by datetime + es = fetch_es(serial, s3_data) + #print("Back to list") + #s3_data = [json.loads(x) for x in s3_data] + print("Sorting") + s3_data=list(s3_data) + s3_data.sort(key=lambda k: json.loads(k)['datetime']) print(f"Writing {serial} to s3") - write_s3(serial, data, launch_sites) + write_s3(serial, s3_data, launch_sites) print(f"{serial} done") diff --git a/lambda/historic_es_to_s3/__main__.py b/lambda/historic_es_to_s3/__main__.py index cb00287..b855860 100644 --- a/lambda/historic_es_to_s3/__main__.py +++ b/lambda/historic_es_to_s3/__main__.py @@ -5,7 +5,7 @@ print(handler( { "messageId": "3b5853b3-369c-40bf-8746-130c918fbb5c", "receiptHandle": "AQEBg+/MIA2rSNmlrpXvk7pbi26kgIzqhairaHWGSpMgLzf2T54PLUmG+eG6CDOv35e42scDH0gppmS9RTQVu8D161oHYohhd1+0S4LtFJdgXr3At86NBIky5+y1A/wVuUm1FNQSvGKIDRgBhCgcCzLkEMNYIWWeDmg2ez2SCPu/3hmY5sc7eC32uqz5es9PspgQXOTykmoNv/q37iy2RBRDdu51Tq7yIxEr+Us3qkQrddAJ7qsA0l09aRL+/PJe1V/7MMN3CFyBATpRP/G3Gjn0Iuu4i2UhkRx2pF+0Hj8yhhHbqTMcw5sbbGIWMdsMXFQKUCHNx6HPkbuwIWo0TsINQjY7IXeZM/mNq65xC4avSlctJ/9BMzOBtFwbnRPZfHmlS5Al2nF1Vu3RecFGbTm1nQ==", - "body": "T1310402", + "body": "107-2-11206", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1627873604999", diff --git a/sqs_to_elk.tf b/sqs_to_elk.tf index 16aa0a4..81598c8 100644 --- a/sqs_to_elk.tf +++ b/sqs_to_elk.tf @@ -97,9 +97,10 @@ resource "aws_sqs_queue" "sqs_to_elk" { redrive_policy = jsonencode( { deadLetterTargetArn = "arn:aws:sqs:us-east-1:143841941773:to-elk-dlq" - maxReceiveCount = 100 + maxReceiveCount = 10 } ) + visibility_timeout_seconds = 3600 } resource "aws_sqs_queue_policy" "sqs_to_elk" {