2021-08-01 23:47:51 +10:00
import json
import boto3
import botocore . credentials
from botocore . awsrequest import AWSRequest
from botocore . endpoint import URLLib3Session
from botocore . auth import SigV4Auth
import zlib
import base64
import datetime
import os
2021-09-12 23:25:34 +10:00
import gzip
from io import BytesIO
2021-08-01 23:47:51 +10:00
2021-11-29 21:09:32 +11:00
from multiprocessing import Process
http_session = URLLib3Session ( )
def mirror ( path , params ) :
session = boto3 . Session ( )
headers = { " Host " : " search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com " , " Content-Type " : " application/json " , " Content-Encoding " : " gzip " }
request = AWSRequest (
method = " POST " , url = f " https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/ { path } " , data = params , headers = headers
)
SigV4Auth ( boto3 . Session ( ) . get_credentials ( ) , " es " , " us-east-1 " ) . add_auth ( request )
session = URLLib3Session ( )
r = session . send ( request . prepare ( ) )
2021-08-01 23:47:51 +10:00
HOST = os . getenv ( " ES " )
sqs = boto3 . client ( ' sqs ' , region_name = " us-east-1 " )
def batch ( iterable , n = 1 ) :
l = len ( iterable )
for ndx in range ( 0 , l , n ) :
yield iterable [ ndx : min ( ndx + n , l ) ]
def es_request ( payload , path , method ) :
session = boto3 . Session ( )
params = json . dumps ( payload )
2021-09-12 23:25:34 +10:00
compressed = BytesIO ( )
with gzip . GzipFile ( fileobj = compressed , mode = ' w ' ) as f :
f . write ( params . encode ( ' utf-8 ' ) )
params = compressed . getvalue ( )
headers = { " Host " : HOST , " Content-Type " : " application/json " , " Content-Encoding " : " gzip " }
2021-08-01 23:47:51 +10:00
request = AWSRequest (
2021-08-12 17:46:06 +10:00
method = method , url = f " https:// { HOST } / { path } " , data = params , headers = headers
2021-08-01 23:47:51 +10:00
)
SigV4Auth ( boto3 . Session ( ) . get_credentials ( ) ,
" es " , " us-east-1 " ) . add_auth ( request )
2021-11-29 21:09:32 +11:00
p = Process ( target = mirror , args = ( path , params ) ) . start ( )
r = http_session . send ( request . prepare ( ) )
2021-08-01 23:47:51 +10:00
return json . loads ( r . text )
def handler ( event , context ) :
query = {
" aggs " : {
" serials " : {
" terms " : {
" field " : " serial.keyword " ,
2021-08-12 17:46:06 +10:00
" size " : 10000
2021-08-01 23:47:51 +10:00
}
}
} ,
" size " : 0 ,
" _source " : {
" excludes " : [ ]
} ,
" query " : {
" bool " : {
" must_not " : [ { " match_phrase " : { " serial " : " xxxxxxxx " } } ] ,
" filter " : [
{
" range " : {
" datetime " : {
2021-08-12 17:46:06 +10:00
" gte " : " now-24h " ,
2021-08-01 23:47:51 +10:00
" format " : " strict_date_optional_time "
}
}
}
]
}
}
}
2021-08-12 17:46:06 +10:00
2021-08-01 23:47:51 +10:00
results = es_request ( query , " telm-*/_search " , " POST " )
serials = [ x [ ' key ' ] for x in results [ ' aggregations ' ] [ ' serials ' ] [ ' buckets ' ] ]
for serial_batch in batch ( serials , 10 ) :
sqs . send_message_batch (
QueueUrl = " https://sqs.us-east-1.amazonaws.com/143841941773/update-history " ,
Entries = [
{
2021-08-12 17:46:06 +10:00
" Id " : str ( serial_batch . index ( x ) ) ,
2021-08-01 23:47:51 +10:00
" MessageBody " : x
}
for x in serial_batch ]
)
return [ x [ ' key ' ] for x in results [ ' aggregations ' ] [ ' serials ' ] [ ' buckets ' ] ]
#TODO add to SQS queue
if __name__ == " __main__ " :
print ( handler ( { } , { } ) )
# this script will find list of sondes seen in the last 48 hours and add them to the queue to be updated (including the first and last date they were seen)