2021-08-01 13:47:51 +00:00
import json
import boto3
import os
2021-12-20 06:02:02 +00:00
import es
2021-11-29 10:09:32 +00:00
2021-08-01 13:47:51 +00:00
HOST = os . getenv ( " ES " )
sqs = boto3 . client ( ' sqs ' , region_name = " us-east-1 " )
def batch ( iterable , n = 1 ) :
l = len ( iterable )
for ndx in range ( 0 , l , n ) :
yield iterable [ ndx : min ( ndx + n , l ) ]
def handler ( event , context ) :
query = {
" aggs " : {
" serials " : {
" terms " : {
" field " : " serial.keyword " ,
2021-08-12 07:46:06 +00:00
" size " : 10000
2021-08-01 13:47:51 +00:00
}
}
} ,
" size " : 0 ,
" _source " : {
" excludes " : [ ]
} ,
" query " : {
" bool " : {
2023-07-11 23:59:43 +00:00
" must_not " : [ { " term " : { " serial " : " xxxxxxxx " } } ] ,
2021-08-01 13:47:51 +00:00
" filter " : [
{
" range " : {
" datetime " : {
2021-08-12 07:46:06 +00:00
" gte " : " now-24h " ,
2021-08-01 13:47:51 +00:00
" format " : " strict_date_optional_time "
}
}
}
]
}
}
}
2021-08-12 07:46:06 +00:00
2021-12-20 06:02:02 +00:00
results = es . request ( json . dumps ( query ) , " telm-*/_search " , " POST " )
2021-08-01 13:47:51 +00:00
serials = [ x [ ' key ' ] for x in results [ ' aggregations ' ] [ ' serials ' ] [ ' buckets ' ] ]
for serial_batch in batch ( serials , 10 ) :
sqs . send_message_batch (
QueueUrl = " https://sqs.us-east-1.amazonaws.com/143841941773/update-history " ,
Entries = [
{
2021-08-12 07:46:06 +00:00
" Id " : str ( serial_batch . index ( x ) ) ,
2021-08-01 13:47:51 +00:00
" MessageBody " : x
}
for x in serial_batch ]
)
return [ x [ ' key ' ] for x in results [ ' aggregations ' ] [ ' serials ' ] [ ' buckets ' ] ]
#TODO add to SQS queue
2021-12-20 06:02:02 +00:00
2021-08-01 13:47:51 +00:00
# this script will find list of sondes seen in the last 48 hours and add them to the queue to be updated (including the first and last date they were seen)