add main for rollup

This commit is contained in:
xssfox 2024-04-26 19:17:23 +10:00
parent 70d001798c
commit 5dda39ddc6
2 changed files with 38 additions and 26 deletions

View File

@ -10,15 +10,14 @@ s3 = boto3.resource('s3')
serials = {} serials = {}
def fetch_es(): def fetch_es(index=f"predictions-*,-predictions-{datetime.now().strftime('%Y-%m')},-predictions-{(datetime.now() - timedelta(days=27)).strftime('%Y-%m')},-predictions-*-rollup/_search"):
payload = { payload = {
"size": 1000 "size": 1000
} }
data = [] data = []
indexes = [] indexes = []
response = es.request(json.dumps(payload), response = es.request(json.dumps(payload),
#f"predictions-*,-predictions-{datetime.now().strftime('%Y-%m')},-predictions-{(datetime.now() - timedelta(days=27)).strftime('%Y-%m')},-predictions-*-rollup/_search", index,
"predictions-2021-12/_search",
"POST", params={"scroll": "1m"}) "POST", params={"scroll": "1m"})
try: try:
add_unique([x["_source"] for x in response['hits']['hits']]) add_unique([x["_source"] for x in response['hits']['hits']])
@ -56,7 +55,6 @@ def fetch_es():
result = es.request(body, f"_bulk", "POST") result = es.request(body, f"_bulk", "POST")
if 'errors' in result and result['errors'] == True: if 'errors' in result and result['errors'] == True:
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
print(event)
print(result) print(result)
error_types = [a for a in error_types if a != 'mapper_parsing_exception'] # filter out mapper failures since they will never succeed error_types = [a for a in error_types if a != 'mapper_parsing_exception'] # filter out mapper failures since they will never succeed
if error_types: if error_types:

View File

@ -1,23 +1,37 @@
from . import * from . import *
print(handler( # print(handler(
{ # {
"Records": [ # "Records": [
{ # {
"messageId": "3b5853b3-369c-40bf-8746-130c918fbb5c", # "messageId": "3b5853b3-369c-40bf-8746-130c918fbb5c",
"receiptHandle": "AQEBg+/MIA2rSNmlrpXvk7pbi26kgIzqhairaHWGSpMgLzf2T54PLUmG+eG6CDOv35e42scDH0gppmS9RTQVu8D161oHYohhd1+0S4LtFJdgXr3At86NBIky5+y1A/wVuUm1FNQSvGKIDRgBhCgcCzLkEMNYIWWeDmg2ez2SCPu/3hmY5sc7eC32uqz5es9PspgQXOTykmoNv/q37iy2RBRDdu51Tq7yIxEr+Us3qkQrddAJ7qsA0l09aRL+/PJe1V/7MMN3CFyBATpRP/G3Gjn0Iuu4i2UhkRx2pF+0Hj8yhhHbqTMcw5sbbGIWMdsMXFQKUCHNx6HPkbuwIWo0TsINQjY7IXeZM/mNq65xC4avSlctJ/9BMzOBtFwbnRPZfHmlS5Al2nF1Vu3RecFGbTm1nQ==", # "receiptHandle": "AQEBg+/MIA2rSNmlrpXvk7pbi26kgIzqhairaHWGSpMgLzf2T54PLUmG+eG6CDOv35e42scDH0gppmS9RTQVu8D161oHYohhd1+0S4LtFJdgXr3At86NBIky5+y1A/wVuUm1FNQSvGKIDRgBhCgcCzLkEMNYIWWeDmg2ez2SCPu/3hmY5sc7eC32uqz5es9PspgQXOTykmoNv/q37iy2RBRDdu51Tq7yIxEr+Us3qkQrddAJ7qsA0l09aRL+/PJe1V/7MMN3CFyBATpRP/G3Gjn0Iuu4i2UhkRx2pF+0Hj8yhhHbqTMcw5sbbGIWMdsMXFQKUCHNx6HPkbuwIWo0TsINQjY7IXeZM/mNq65xC4avSlctJ/9BMzOBtFwbnRPZfHmlS5Al2nF1Vu3RecFGbTm1nQ==",
"body": "R5130039", # "body": "R5130039",
"attributes": { # "attributes": {
"ApproximateReceiveCount": "1", # "ApproximateReceiveCount": "1",
"SentTimestamp": "1627873604999", # "SentTimestamp": "1627873604999",
"SenderId": "AROASC7NF3EG5DNHEPSYZ:queue_data_update", # "SenderId": "AROASC7NF3EG5DNHEPSYZ:queue_data_update",
"ApproximateFirstReceiveTimestamp": "1627873751266" # "ApproximateFirstReceiveTimestamp": "1627873751266"
}, # },
"messageAttributes": {}, # "messageAttributes": {},
"md5OfBody": "b3d67879b6a2e7f3abd62d404e53f71f", # "md5OfBody": "b3d67879b6a2e7f3abd62d404e53f71f",
"md5OfMessageAttributes": None, # "md5OfMessageAttributes": None,
"eventSource": "aws:sqs", # "eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:143841941773:update-history", # "eventSourceARN": "arn:aws:sqs:us-east-1:143841941773:update-history",
"awsRegion": "us-east-1" # "awsRegion": "us-east-1"
} # }
] # ]
}, {})) # }, {}))
def find_indexes():
response = es.request("",
"_cat/indices/predictions-*,-predictions-*-rollup?format=json",
"GET")
return [x['index'] for x in response]
print(response)
indexes = find_indexes()
for index in indexes:
print(f"Doing rollup {index}")
fetch_es(index+"/_search")
serials = {}