history work from a couple of days ago, and further optimizations

This commit is contained in:
Michaela 2021-08-12 17:46:06 +10:00
parent 8d2fbcb164
commit b4360d1290
6 changed files with 226 additions and 20 deletions

View File

@ -0,0 +1,135 @@
data "archive_file" "historic_to_s3" {
type = "zip"
source_file = "historic/historic_es_to_s3/index.py"
output_path = "${path.module}/build/historic_to_s3.zip"
}
data "archive_file" "queue_data_update" {
type = "zip"
source_file = "historic/queue_data_update/index.py"
output_path = "${path.module}/build/queue_data_update.zip"
}
resource "aws_iam_role" "historic" {
path = "/service-role/"
name = "historic"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}
EOF
max_session_duration = 3600
}
resource "aws_iam_role_policy" "historic" {
name = "historic"
role = aws_iam_role.historic.name
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:log-group:/aws/lambda/*"
]
},
{
"Effect": "Allow",
"Action": "es:*",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "sqs:*",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": "*"
}
]
}
EOF
}
resource "aws_lambda_function" "historic_to_s3" {
function_name = "historic_to_s3"
handler = "index.handler"
filename = "${path.module}/build/historic_to_s3.zip"
source_code_hash = data.archive_file.historic_to_s3.output_base64sha256
publish = true
memory_size = 512
role = aws_iam_role.historic.arn
runtime = "python3.8"
timeout = 30
reserved_concurrent_executions = 8
environment {
variables = {
"ES" = aws_route53_record.Route53RecordSet7.fqdn
}
}
}
resource "aws_lambda_function" "queue_data_update" {
function_name = "queue_data_update"
handler = "index.handler"
filename = "${path.module}/build/queue_data_update.zip"
source_code_hash = data.archive_file.queue_data_update.output_base64sha256
publish = true
memory_size = 256
role = aws_iam_role.historic.arn
runtime = "python3.8"
timeout = 30
reserved_concurrent_executions = 1
environment {
variables = {
"ES" = aws_route53_record.Route53RecordSet7.fqdn
}
}
}
resource "aws_lambda_event_source_mapping" "historic_to_s3" {
event_source_arn = "arn:aws:sqs:us-east-1:143841941773:update-history"
function_name = aws_lambda_function.historic_to_s3.arn
batch_size = 1
maximum_batching_window_in_seconds = 30
}
resource "aws_cloudwatch_event_rule" "history" {
name = "history_queue"
description = "History Queue"
schedule_expression = "cron(0 13,20,3,9 * * ? *)"
}
resource "aws_cloudwatch_event_target" "sns" {
rule = aws_cloudwatch_event_rule.history.name
target_id = "SendToLambda"
arn = aws_lambda_function.queue_data_update.arn
}
resource "aws_lambda_permission" "history_cron" {
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.queue_data_update.function_name
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.history.arn
}

View File

@ -19,7 +19,7 @@ def es_request(payload, path, method, params=None):
headers = {"Host": HOST, "Content-Type": "application/json"}
request = AWSRequest(
method="POST", url=f"https://{HOST}/{path}", data=payload, headers=headers, params=params
method=method, url=f"https://{HOST}/{path}", data=payload, headers=headers, params=params
)
SigV4Auth(boto3.Session().get_credentials(),
"es", "us-east-1").add_auth(request)
@ -56,13 +56,23 @@ def fetch_es(serial):
data = []
response = es_request(json.dumps(payload),
"telm-*/_search", "POST", params={"scroll": "1m"})
data += [x["_source"] for x in response['hits']['hits']]
try:
data += [x["_source"] for x in response['hits']['hits']]
except:
print(response)
raise
scroll_id = response['_scroll_id']
scroll_ids = [scroll_id]
while response['hits']['hits']:
response = es_request(json.dumps({"scroll": "1m", "scroll_id": scroll_id }),
"_search/scroll", "POST")
scroll_id = response['_scroll_id']
scroll_ids.append(scroll_id)
data += [x["_source"] for x in response['hits']['hits']]
for scroll_id in scroll_ids:
scroll_delete = es_request(json.dumps({"scroll_id": scroll_id }),
"_search/scroll", "DELETE")
print(scroll_delete)
return data
def fetch_s3(serial):
@ -105,7 +115,8 @@ def write_s3(serial, data):
object = s3.Object(BUCKET,f'serial/{serial}.json.gz')
object.put(
Body=gz_data,
ContentType='application/x-gzip',
ContentType='application/json',
ContentEncoding='gzip',
Metadata={
"first-lat": str(summary[0]['lat']),
"first-lon": str(summary[0]['lon']),
@ -119,19 +130,44 @@ def write_s3(serial, data):
}
)
def lambda_handler(event, context):
def handler(event, context):
print(json.dumps(event))
payloads = {}
for record in event['Records']:
sns_message = json.loads(record["body"])
serial = sns_message["Message"]
serial = record["body"]
print(f"Getting {serial} S3")
s3_data = fetch_s3(serial)
print(f"Getting {serial} ES")
es = fetch_es(serial)
print(f"Combining data {serial}")
data = s3_data + es
data = [dict(t) for t in {tuple(d.items()) for d in data}]
data = sorted(data, key=lambda k: k['datetime']) # sort by datetime
print(f"Writing {serial} to s3")
write_s3(serial, data)
print(f"{serial} done")
if __name__ == "__main__":
print(lambda_handler(
{'Records': [{"body": "{\"Message\":\"S4520727\"}"}]}, {}))
print(handler(
{
"Records": [
{
"messageId": "3b5853b3-369c-40bf-8746-130c918fbb5c",
"receiptHandle": "AQEBg+/MIA2rSNmlrpXvk7pbi26kgIzqhairaHWGSpMgLzf2T54PLUmG+eG6CDOv35e42scDH0gppmS9RTQVu8D161oHYohhd1+0S4LtFJdgXr3At86NBIky5+y1A/wVuUm1FNQSvGKIDRgBhCgcCzLkEMNYIWWeDmg2ez2SCPu/3hmY5sc7eC32uqz5es9PspgQXOTykmoNv/q37iy2RBRDdu51Tq7yIxEr+Us3qkQrddAJ7qsA0l09aRL+/PJe1V/7MMN3CFyBATpRP/G3Gjn0Iuu4i2UhkRx2pF+0Hj8yhhHbqTMcw5sbbGIWMdsMXFQKUCHNx6HPkbuwIWo0TsINQjY7IXeZM/mNq65xC4avSlctJ/9BMzOBtFwbnRPZfHmlS5Al2nF1Vu3RecFGbTm1nQ==",
"body": "S2710639",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1627873604999",
"SenderId": "AROASC7NF3EG5DNHEPSYZ:queue_data_update",
"ApproximateFirstReceiveTimestamp": "1627873751266"
},
"messageAttributes": {},
"md5OfBody": "b3d67879b6a2e7f3abd62d404e53f71f",
"md5OfMessageAttributes": None,
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:143841941773:update-history",
"awsRegion": "us-east-1"
}
]
}, {}))

View File

@ -25,7 +25,7 @@ def es_request(payload, path, method):
params = json.dumps(payload)
headers = {"Host": HOST, "Content-Type": "application/json"}
request = AWSRequest(
method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers
method=method, url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(),
"es", "us-east-1").add_auth(request)
@ -41,7 +41,7 @@ def handler(event, context):
"serials": {
"terms": {
"field": "serial.keyword",
"size": 5000
"size": 10000
}
}
},
@ -56,8 +56,7 @@ def handler(event, context):
{
"range": {
"datetime": {
"gte": "2021-07-23T06:45:17.308Z",
"lte": "2021-07-25T06:45:17.308Z",
"gte": "now-24h",
"format": "strict_date_optional_time"
}
}
@ -66,6 +65,7 @@ def handler(event, context):
}
}
}
results = es_request(query, "telm-*/_search", "POST")
serials = [ x['key'] for x in results['aggregations']['serials']['buckets'] ]
for serial_batch in batch(serials, 10):
@ -73,7 +73,7 @@ def handler(event, context):
QueueUrl="https://sqs.us-east-1.amazonaws.com/143841941773/update-history",
Entries=[
{
"Id": x,
"Id": str(serial_batch.index(x)),
"MessageBody": x
}
for x in serial_batch]

View File

@ -3,18 +3,33 @@ import botocore.credentials
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
import json
import os
from datetime import datetime, timedelta, timezone
import sys, traceback
import uuid
# TODO , HEAD S3 object, if it's less than 24 hours check ES, else 302 to bucket
HOST = os.getenv("ES")
# get current sondes, filter by date, location
s3 = boto3.resource('s3')
def history(event, context):
s3 = boto3.resource('s3')
serial = str(event["pathParameters"]["serial"])
# if there's a historic file created for this sonde, use that instead
try:
object = s3.Object('sondehub-history', f'serial/{serial}.json.gz')
object.load()
return {"statusCode": 302, "headers": {"Location": f'https://{object.bucket_name}.s3.amazonaws.com/{object.key}'}}
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
pass
else:
# Something else has gone wrong.
raise
path = "telm-*/_search"
payload = {
"aggs": {
@ -80,7 +95,7 @@ def es_request(payload, path, method):
if __name__ == "__main__":
print(
history(
{"pathParameters": {"serial": "S3530044"}}, {}
{"pathParameters": {"serial": "N4750769"}}, {}
)
)

13
main.tf
View File

@ -738,6 +738,7 @@ resource "aws_lambda_function" "history" {
role = aws_iam_role.IAMRole5.arn
runtime = "python3.7"
timeout = 30
reserved_concurrent_executions = 4
environment {
variables = {
"ES" = "es.${local.domain_name}"
@ -1152,13 +1153,23 @@ EOF
ebs_options {
ebs_enabled = true
volume_type = "gp2"
volume_size = 250
volume_size = 500
}
log_publishing_options {
cloudwatch_log_group_arn = "arn:aws:logs:us-east-1:143841941773:log-group:/aws/aes/domains/sondes-v2/application-logs"
enabled = true
log_type = "ES_APPLICATION_LOGS"
}
log_publishing_options {
cloudwatch_log_group_arn = "arn:aws:logs:us-east-1:143841941773:log-group:/aws/aes/domains/sondes-v2/index-logs"
enabled = true
log_type = "INDEX_SLOW_LOGS"
}
log_publishing_options {
cloudwatch_log_group_arn = "arn:aws:logs:us-east-1:143841941773:log-group:/aws/aes/domains/sondes-v2/search-logs"
enabled = true
log_type = "SEARCH_SLOW_LOGS"
}
}
data "aws_kms_key" "es" {
key_id = "alias/aws/es"

View File

@ -45,7 +45,10 @@ def lambda_handler(event, context):
incoming_payloads = [json.loads(sns_message["Message"])]
else:
incoming_payloads = json.loads(sns_message["Message"])
for payload in incoming_payloads:
#send only the first, last and every 5th packet
payloads = [incoming_payloads[0]] + incoming_payloads[1:-1:5][1:] + [incoming_payloads[-1]]
for payload in payloads:
body = json.dumps(payload)
@ -58,7 +61,13 @@ def lambda_handler(event, context):
qos=0,
retain=False
)
time.sleep(0.1) # give paho mqtt 100ms to send messages this could be improved on but paho mqtt is a pain to interface with
client.publish(
topic=f'batch',
payload=json.dumps(payloads),
qos=0,
retain=False
)
time.sleep(0.05) # give paho mqtt 100ms to send messages this could be improved on but paho mqtt is a pain to interface with