mirror of
https://github.com/projecthorus/sondehub-infra.git
synced 2024-12-18 20:57:56 +00:00
Add WIP roll up
This commit is contained in:
parent
5612f9971f
commit
df07800d37
77
lambda/rollup_predictions/__init__.py
Normal file
77
lambda/rollup_predictions/__init__.py
Normal file
@ -0,0 +1,77 @@
|
||||
import json
|
||||
import boto3
|
||||
import gzip
|
||||
from botocore.exceptions import ClientError
|
||||
import es
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
s3 = boto3.resource('s3')
|
||||
|
||||
serials = {}
|
||||
|
||||
def fetch_es():
|
||||
payload = {
|
||||
"size": 1000
|
||||
}
|
||||
data = []
|
||||
indexes = []
|
||||
response = es.request(json.dumps(payload),
|
||||
#f"predictions-*,-predictions-{datetime.now().strftime('%Y-%m')},-predictions-{(datetime.now() - timedelta(days=27)).strftime('%Y-%m')},-predictions-*-rollup/_search",
|
||||
"predictions-2021-12/_search",
|
||||
"POST", params={"scroll": "1m"})
|
||||
try:
|
||||
add_unique([x["_source"] for x in response['hits']['hits']])
|
||||
except:
|
||||
print(response)
|
||||
raise
|
||||
scroll_id = response['_scroll_id']
|
||||
scroll_ids = [scroll_id]
|
||||
while response['hits']['hits']:
|
||||
print("Fetching more")
|
||||
response = es.request(json.dumps({"scroll": "1m", "scroll_id": scroll_id }),
|
||||
"_search/scroll", "POST")
|
||||
scroll_id = response['_scroll_id']
|
||||
scroll_ids.append(scroll_id)
|
||||
add_unique([x["_source"] for x in response['hits']['hits']])
|
||||
indexes += [x["_index"] for x in response['hits']['hits']]
|
||||
for scroll_id in scroll_ids:
|
||||
try:
|
||||
scroll_delete = es.request(json.dumps({"scroll_id": scroll_id }),
|
||||
"_search/scroll", "DELETE")
|
||||
print(scroll_delete)
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
# post data to ES bulk
|
||||
|
||||
# dedupe indexes to clean up
|
||||
indexes = list( dict.fromkeys(indexes) )
|
||||
body=""
|
||||
for key, doc in serials.items():
|
||||
index = "predictions-" + "-".join(doc['datetime'].split("-")[0:2]) + "-rollup"
|
||||
body += f'{{"index":{{"_index":"{index}"}}}}' + "\n" + json.dumps(doc) + "\n"
|
||||
|
||||
body += "\n"
|
||||
result = es.request(body, f"_bulk", "POST")
|
||||
if 'errors' in result and result['errors'] == True:
|
||||
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
|
||||
print(event)
|
||||
print(result)
|
||||
error_types = [a for a in error_types if a != 'mapper_parsing_exception'] # filter out mapper failures since they will never succeed
|
||||
if error_types:
|
||||
raise RuntimeError
|
||||
for index in indexes:
|
||||
if "predictions-" in index: # safety check
|
||||
result = es.request(body, f"{index}", "DELETE")
|
||||
|
||||
def add_unique(es_r):
|
||||
for row in es_r:
|
||||
serial = row['serial']
|
||||
if serial not in serials or datetime.fromisoformat(serials[serial]['datetime'].replace("Z", "+00:00")) < datetime.fromisoformat(row['datetime'].replace("Z", "+00:00")):
|
||||
serials[serial] = row
|
||||
print(f"Number of serials: {len(serials)}")
|
||||
|
||||
def handler(event, context):
|
||||
print(json.dumps(event))
|
||||
fetch_es()
|
23
lambda/rollup_predictions/__main__.py
Normal file
23
lambda/rollup_predictions/__main__.py
Normal file
@ -0,0 +1,23 @@
|
||||
from . import *
|
||||
print(handler(
|
||||
{
|
||||
"Records": [
|
||||
{
|
||||
"messageId": "3b5853b3-369c-40bf-8746-130c918fbb5c",
|
||||
"receiptHandle": "AQEBg+/MIA2rSNmlrpXvk7pbi26kgIzqhairaHWGSpMgLzf2T54PLUmG+eG6CDOv35e42scDH0gppmS9RTQVu8D161oHYohhd1+0S4LtFJdgXr3At86NBIky5+y1A/wVuUm1FNQSvGKIDRgBhCgcCzLkEMNYIWWeDmg2ez2SCPu/3hmY5sc7eC32uqz5es9PspgQXOTykmoNv/q37iy2RBRDdu51Tq7yIxEr+Us3qkQrddAJ7qsA0l09aRL+/PJe1V/7MMN3CFyBATpRP/G3Gjn0Iuu4i2UhkRx2pF+0Hj8yhhHbqTMcw5sbbGIWMdsMXFQKUCHNx6HPkbuwIWo0TsINQjY7IXeZM/mNq65xC4avSlctJ/9BMzOBtFwbnRPZfHmlS5Al2nF1Vu3RecFGbTm1nQ==",
|
||||
"body": "R5130039",
|
||||
"attributes": {
|
||||
"ApproximateReceiveCount": "1",
|
||||
"SentTimestamp": "1627873604999",
|
||||
"SenderId": "AROASC7NF3EG5DNHEPSYZ:queue_data_update",
|
||||
"ApproximateFirstReceiveTimestamp": "1627873751266"
|
||||
},
|
||||
"messageAttributes": {},
|
||||
"md5OfBody": "b3d67879b6a2e7f3abd62d404e53f71f",
|
||||
"md5OfMessageAttributes": None,
|
||||
"eventSource": "aws:sqs",
|
||||
"eventSourceARN": "arn:aws:sqs:us-east-1:143841941773:update-history",
|
||||
"awsRegion": "us-east-1"
|
||||
}
|
||||
]
|
||||
}, {}))
|
Loading…
Reference in New Issue
Block a user