diff --git a/lambda/rollup_predictions/__init__.py b/lambda/rollup_predictions/__init__.py index ca11719..6a2ec3e 100644 --- a/lambda/rollup_predictions/__init__.py +++ b/lambda/rollup_predictions/__init__.py @@ -10,15 +10,14 @@ s3 = boto3.resource('s3') serials = {} -def fetch_es(): +def fetch_es(index=f"predictions-*,-predictions-{datetime.now().strftime('%Y-%m')},-predictions-{(datetime.now() - timedelta(days=27)).strftime('%Y-%m')},-predictions-*-rollup/_search"): payload = { "size": 1000 } data = [] indexes = [] response = es.request(json.dumps(payload), - #f"predictions-*,-predictions-{datetime.now().strftime('%Y-%m')},-predictions-{(datetime.now() - timedelta(days=27)).strftime('%Y-%m')},-predictions-*-rollup/_search", - "predictions-2021-12/_search", + index, "POST", params={"scroll": "1m"}) try: add_unique([x["_source"] for x in response['hits']['hits']]) @@ -56,7 +55,6 @@ def fetch_es(): result = es.request(body, f"_bulk", "POST") if 'errors' in result and result['errors'] == True: error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types - print(event) print(result) error_types = [a for a in error_types if a != 'mapper_parsing_exception'] # filter out mapper failures since they will never succeed if error_types: diff --git a/lambda/rollup_predictions/__main__.py b/lambda/rollup_predictions/__main__.py index 8f26898..18f3f99 100644 --- a/lambda/rollup_predictions/__main__.py +++ b/lambda/rollup_predictions/__main__.py @@ -1,23 +1,37 @@ from . import * -print(handler( - { - "Records": [ - { - "messageId": "3b5853b3-369c-40bf-8746-130c918fbb5c", - "receiptHandle": "AQEBg+/MIA2rSNmlrpXvk7pbi26kgIzqhairaHWGSpMgLzf2T54PLUmG+eG6CDOv35e42scDH0gppmS9RTQVu8D161oHYohhd1+0S4LtFJdgXr3At86NBIky5+y1A/wVuUm1FNQSvGKIDRgBhCgcCzLkEMNYIWWeDmg2ez2SCPu/3hmY5sc7eC32uqz5es9PspgQXOTykmoNv/q37iy2RBRDdu51Tq7yIxEr+Us3qkQrddAJ7qsA0l09aRL+/PJe1V/7MMN3CFyBATpRP/G3Gjn0Iuu4i2UhkRx2pF+0Hj8yhhHbqTMcw5sbbGIWMdsMXFQKUCHNx6HPkbuwIWo0TsINQjY7IXeZM/mNq65xC4avSlctJ/9BMzOBtFwbnRPZfHmlS5Al2nF1Vu3RecFGbTm1nQ==", - "body": "R5130039", - "attributes": { - "ApproximateReceiveCount": "1", - "SentTimestamp": "1627873604999", - "SenderId": "AROASC7NF3EG5DNHEPSYZ:queue_data_update", - "ApproximateFirstReceiveTimestamp": "1627873751266" - }, - "messageAttributes": {}, - "md5OfBody": "b3d67879b6a2e7f3abd62d404e53f71f", - "md5OfMessageAttributes": None, - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-1:143841941773:update-history", - "awsRegion": "us-east-1" - } - ] -}, {})) \ No newline at end of file +# print(handler( +# { +# "Records": [ +# { +# "messageId": "3b5853b3-369c-40bf-8746-130c918fbb5c", +# "receiptHandle": "AQEBg+/MIA2rSNmlrpXvk7pbi26kgIzqhairaHWGSpMgLzf2T54PLUmG+eG6CDOv35e42scDH0gppmS9RTQVu8D161oHYohhd1+0S4LtFJdgXr3At86NBIky5+y1A/wVuUm1FNQSvGKIDRgBhCgcCzLkEMNYIWWeDmg2ez2SCPu/3hmY5sc7eC32uqz5es9PspgQXOTykmoNv/q37iy2RBRDdu51Tq7yIxEr+Us3qkQrddAJ7qsA0l09aRL+/PJe1V/7MMN3CFyBATpRP/G3Gjn0Iuu4i2UhkRx2pF+0Hj8yhhHbqTMcw5sbbGIWMdsMXFQKUCHNx6HPkbuwIWo0TsINQjY7IXeZM/mNq65xC4avSlctJ/9BMzOBtFwbnRPZfHmlS5Al2nF1Vu3RecFGbTm1nQ==", +# "body": "R5130039", +# "attributes": { +# "ApproximateReceiveCount": "1", +# "SentTimestamp": "1627873604999", +# "SenderId": "AROASC7NF3EG5DNHEPSYZ:queue_data_update", +# "ApproximateFirstReceiveTimestamp": "1627873751266" +# }, +# "messageAttributes": {}, +# "md5OfBody": "b3d67879b6a2e7f3abd62d404e53f71f", +# "md5OfMessageAttributes": None, +# "eventSource": "aws:sqs", +# "eventSourceARN": "arn:aws:sqs:us-east-1:143841941773:update-history", +# "awsRegion": "us-east-1" +# } +# ] +# }, {})) + +def find_indexes(): + + response = es.request("", + "_cat/indices/predictions-*,-predictions-*-rollup?format=json", + "GET") + return [x['index'] for x in response] + print(response) + +indexes = find_indexes() +for index in indexes: + print(f"Doing rollup {index}") + fetch_es(index+"/_search") + serials = {}