Logging reduction

This commit is contained in:
xss 2022-11-16 13:27:34 +11:00
parent b36c6f6ab6
commit ce981593ec
5 changed files with 278 additions and 64 deletions

View File

@ -1,3 +1,100 @@
resource "aws_iam_role" "ingestion_lambda_role" { # need a specific role so that we can disable cloudwatch logs
path = "/service-role/"
name_prefix = "sonde-ingestion-"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
},
{
"Effect": "Allow",
"Principal": {
"Service": "edgelambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}
EOF
max_session_duration = 3600
}
resource "aws_iam_role_policy" "ingestion_lambda_role" {
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "sns:*",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:log-group:/ingestion",
"arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:log-group:/sns_to_mqtt"
]
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:log-group:/ingestion*",
"arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:log-group:/sns_to_mqtt*"
]
},
{
"Action": [
"ec2:DescribeNetworkInterfaces",
"ec2:CreateNetworkInterface",
"ec2:DeleteNetworkInterface",
"ec2:DescribeInstances",
"ec2:AttachNetworkInterface"
],
"Effect": "Allow",
"Resource": "*"
}
]
}
EOF
role = aws_iam_role.ingestion_lambda_role.name
}
resource "aws_cloudwatch_log_group" "ignestion" {
name = "/ingestion"
}
resource "aws_cloudwatch_log_group" "sns_to_mqtt" {
name = "/sns_to_mqtt"
}
resource "aws_lambda_function" "upload_telem" {
function_name = "sonde-api-to-iot-core"
handler = "sonde_api_to_iot_core.lambda_handler"
@ -6,7 +103,7 @@ resource "aws_lambda_function" "upload_telem" {
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.basic_lambda_role.arn
role = aws_iam_role.ingestion_lambda_role.arn
runtime = "python3.9"
timeout = 30
architectures = ["arm64"]
@ -140,7 +237,7 @@ resource "aws_lambda_function" "sns_to_mqtt" {
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.basic_lambda_role.arn
role = aws_iam_role.ingestion_lambda_role.arn
runtime = "python3.9"
timeout = 3
architectures = ["arm64"]

View File

@ -7,6 +7,10 @@ import time
import random
import zlib
import base64
import boto3
import traceback
import sys
import uuid
client = mqtt.Client(transport="websockets")
@ -15,6 +19,48 @@ connected_flag = False
import socket
socket.setdefaulttimeout(1)
logs = boto3.client('logs')
sequenceToken = None
log_stream_name = str(uuid.uuid4())
def handle_error(message, event, stream_name):
global sequenceToken
print(message)
events = [
{
'timestamp': time.time_ns() // 1_000_000,
'message': message
},
]
if(event):
events.insert(0, {
'timestamp': time.time_ns() // 1_000_000,
'message': json.dumps(event)
})
if sequenceToken:
response = logs.put_log_events(
logGroupName='/sns_to_mqtt',
logStreamName=stream_name,
logEvents=events,
sequenceToken=sequenceToken
)
sequenceToken = response['nextSequenceToken']
else:
try:
log_stream = logs.create_log_stream(
logGroupName='/sns_to_mqtt',
logStreamName=stream_name
)
except: # ignore times we fail to create a log_stream - its probably already created
pass
response = logs.put_log_events(
logGroupName='/sns_to_mqtt',
logStreamName=stream_name,
logEvents=events
)
sequenceToken = response['nextSequenceToken']
print(sequenceToken)
def connect():
client.on_connect = on_connect
client.on_disconnect = on_disconnect
@ -23,61 +69,69 @@ def connect():
client.username_pw_set(username=os.getenv("MQTT_USERNAME"), password=os.getenv("MQTT_PASSWORD"))
HOSTS = os.getenv("MQTT_HOST").split(",")
HOST = random.choice(HOSTS)
print(f"Connecting to {HOST}")
print(f"Connecting to {HOST}",None,log_stream_name)
client.connect(HOST, 8080, 5)
client.loop_start()
print("loop started")
print("loop started",None,log_stream_name)
def on_disconnect(client, userdata, rc):
global connected_flag
print("disconnected")
print("disconnected", None, log_stream_name)
connected_flag=False #set flag
def on_connect(client, userdata, flags, rc):
global connected_flag
if rc==0:
print("connected")
print("connected", None, log_stream_name)
connected_flag=True #set flag
else:
print("Bad connection Returned code=",rc)
print("Bad connection Returned code=",rc, None, log_stream_name)
def on_publish(client, userdata, mid):
pass
connect()
try:
connect()
except:
exc_type, exc_value, exc_traceback = sys.exc_info()
handle_error("".join(traceback.format_exception(exc_type, exc_value, exc_traceback)), None, log_stream_name)
def lambda_handler(event, context):
client.loop(timeout=0.05, max_packets=1) # make sure it reconnects
for record in event['Records']:
sns_message = record["Sns"]
try:
decoded = json.loads(zlib.decompress(base64.b64decode(sns_message["Message"]), 16 + zlib.MAX_WBITS))
except:
decoded = json.loads(sns_message["Message"])
try:
client.loop(timeout=0.05, max_packets=1) # make sure it reconnects
for record in event['Records']:
sns_message = record["Sns"]
try:
decoded = json.loads(zlib.decompress(base64.b64decode(sns_message["Message"]), 16 + zlib.MAX_WBITS))
except:
decoded = json.loads(sns_message["Message"])
if type(decoded) == dict:
incoming_payloads = [decoded]
else:
incoming_payloads = decoded
payloads = incoming_payloads
for payload in payloads:
if type(decoded) == dict:
incoming_payloads = [decoded]
else:
incoming_payloads = decoded
body = json.dumps(payload)
payloads = incoming_payloads
for payload in payloads:
body = json.dumps(payload)
serial = payload[os.getenv("MQTT_ID")]
while not connected_flag:
time.sleep(0.01) # wait until connected
serial = payload[os.getenv("MQTT_ID")]
while not connected_flag:
time.sleep(0.01) # wait until connected
client.publish(
topic=f'{os.getenv("MQTT_PREFIX")}/{serial}',
payload=body,
qos=0,
retain=False
)
client.publish(
topic=f'{os.getenv("MQTT_PREFIX")}/{serial}',
payload=body,
topic=os.getenv("MQTT_BATCH"),
payload=json.dumps(payloads),
qos=0,
retain=False
)
client.publish(
topic=os.getenv("MQTT_BATCH"),
payload=json.dumps(payloads),
qos=0,
retain=False
)
time.sleep(0.05) # give paho mqtt 100ms to send messages this could be improved on but paho mqtt is a pain to interface with
time.sleep(0.05) # give paho mqtt 100ms to send messages this could be improved on but paho mqtt is a pain to interface with
except:
exc_type, exc_value, exc_traceback = sys.exc_info()
handle_error("".join(traceback.format_exception(exc_type, exc_value, exc_traceback)), event, log_stream_name)

View File

@ -1,4 +1,9 @@
from . import *
import uuid
class fakeContext:
def __init__(self):
self.log_stream_name = str(uuid.uuid4())
# test event
###########
if __name__ == "__main__":
@ -23,6 +28,6 @@ if __name__ == "__main__":
}
]
}
print(lambda_handler(demo_event, {}))
print(lambda_handler(demo_event, fakeContext()))

View File

@ -9,11 +9,62 @@ import re
from math import radians, degrees, sin, cos, atan2, sqrt, pi
import statistics
from collections import defaultdict
import time
import traceback
import sys
import base64
import gzip
from io import BytesIO
logs = boto3.client('logs')
sequenceToken = None
def handle_error(message, event, stream_name):
global sequenceToken
print(message)
if sequenceToken:
response = logs.put_log_events(
logGroupName='/ingestion',
logStreamName=stream_name,
logEvents=[
{
'timestamp': time.time_ns() // 1_000_000,
'message': json.dumps(event)
},
{
'timestamp': time.time_ns() // 1_000_000,
'message': message
},
],
sequenceToken=sequenceToken
)
sequenceToken = response['nextSequenceToken']
else:
try:
log_stream = logs.create_log_stream(
logGroupName='/ingestion',
logStreamName=stream_name
)
except: # ignore times we fail to create a log_stream - its probably already created
pass
response = logs.put_log_events(
logGroupName='/ingestion',
logStreamName=stream_name,
logEvents=[
{
'timestamp': time.time_ns() // 1_000_000,
'message': json.dumps(event)
},
{
'timestamp': time.time_ns() // 1_000_000,
'message': message
},
]
)
sequenceToken = response['nextSequenceToken']
print(sequenceToken)
def z_check(data, threshold):
outliers = []
mean = statistics.mean(data)
@ -267,7 +318,7 @@ def post(payload):
Message=payload
)
def upload(event, context):
def upload(event, context, orig_event):
if "isBase64Encoded" in event and event["isBase64Encoded"] == True:
event["body"] = base64.b64decode(event["body"])
if (
@ -306,7 +357,7 @@ def upload(event, context):
lon_outliers = z_check(lons, 3)
alt_outliers = z_check(alts, 3)
if lat_outliers or lon_outliers or alt_outliers:
print("Outlier check detected outlier, serial:", check_data[0]["serial"])
handle_error(f"Outlier check detected outlier, serial: {check_data[0]['serial']}", orig_event, context.log_stream_name)
for index in lat_outliers:
payload_serials[serial][index]["lat_outlier"] = True
for index in lon_outliers:
@ -375,32 +426,35 @@ def upload(event, context):
post(to_sns)
return errors
def lambda_handler(event, context):
orig_event = event.copy()
try:
errors = upload(event, context)
except zlib.error:
return {"statusCode": 400, "body": "Could not decompress"}
except json.decoder.JSONDecodeError:
return {"statusCode": 400, "body": "Not valid json"}
error_message = {
"message": "some or all payloads could not be processed",
"errors": errors
}
if errors:
output = {
"statusCode": 202,
"body": json.dumps(error_message),
"headers": {
"content-type": "application/json"
}
try:
errors = upload(event, context, orig_event)
except zlib.error:
response = {"statusCode": 400, "body": "Could not decompress"}
handle_error(json.dumps(response), orig_event, context.log_stream_name)
return response
except json.decoder.JSONDecodeError:
response = {"statusCode": 400, "body": "Not valid json"}
handle_error(json.dumps(response), orig_event, context.log_stream_name)
return response
error_message = {
"message": "some or all payloads could not be processed",
"errors": errors
}
print({
"statusCode": 202,
"body": error_message,
"headers": {
"content-type": "application/json"
if errors:
output = {
"statusCode": 202,
"body": json.dumps(error_message),
"headers": {
"content-type": "application/json"
}
}
})
return output
else:
return {"statusCode": 200, "body": "^v^ telm logged"}
handle_error(json.dumps(output), orig_event, context.log_stream_name)
return output
else:
return {"statusCode": 200, "body": "^v^ telm logged"}
except:
exc_type, exc_value, exc_traceback = sys.exc_info()
handle_error("".join(traceback.format_exception(exc_type, exc_value, exc_traceback)), orig_event, context.log_stream_name)

View File

@ -2,6 +2,7 @@ from . import *
import json
import base64
import gzip
import uuid
body = [
{
"software_name": "radiosonde_auto_rx",
@ -85,4 +86,7 @@ payload = {
"body": bbody,
"isBase64Encoded": True,
}
print(lambda_handler(payload, {}))
class fakeContext:
def __init__(self):
self.log_stream_name = str(uuid.uuid4())
print(lambda_handler(payload, fakeContext()))