APRS importing of ham balloons

This commit is contained in:
Michaela Wheeler 2022-01-12 21:00:42 +11:00
parent 28646db615
commit b613968da7
4 changed files with 329 additions and 0 deletions

113
ham_aprs.tf Normal file
View File

@ -0,0 +1,113 @@
resource "aws_ecs_task_definition" "aprsgw" {
family = "aprsgw"
runtime_platform {
cpu_architecture = "ARM64"
}
container_definitions = jsonencode(
[
{
cpu = 0
"environment": [
{"name": "AWS_REGION", "value": "us-east-1"},
{"name": "AWS_DEFAULT_REGION", "value": "us-east-1"},
{"name": "CALLSIGN", "value": "VK3FUR"},
{"name": "SNS", "value": aws_sns_topic.ham_telem.arn}
],
essential = true
image = "143841941773.dkr.ecr.us-east-1.amazonaws.com/aprsgw:latest"
logConfiguration = {
logDriver = "awslogs"
options = {
awslogs-group = "/ecs/aprsgw"
awslogs-region = "us-east-1"
awslogs-stream-prefix = "ecs"
}
}
mountPoints = []
name = "aprsgw"
portMappings = [ ]
ulimits = []
volumesFrom = []
},
]
)
cpu = "256"
execution_role_arn = "arn:aws:iam::143841941773:role/aprsgw"
memory = "512"
network_mode = "awsvpc"
requires_compatibilities = [
"FARGATE",
]
tags = {}
task_role_arn = "arn:aws:iam::143841941773:role/aprsgw"
}
resource "aws_iam_role" "aprsgw" {
name = "aprsgw"
description = "Allows EC2 instances to call AWS services on your behalf."
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "ecs-tasks.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
max_session_duration = 3600
}
resource "aws_iam_role_policy_attachment" "aprsgw" {
role = aws_iam_role.aprsgw.id
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
}
resource "aws_iam_role_policy" "aprsgw" {
name = "aprsgw"
role = aws_iam_role.aprsgw.id
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "sns:Publish",
"Resource": "*"
}
]
}
EOF
}
resource "aws_ecs_cluster" "aprsgw" {
name = "aprsgw"
capacity_providers = ["FARGATE", "FARGATE_SPOT"]
}
resource "aws_ecs_service" "aprsgw" {
name = "aprsgw"
cluster = aws_ecs_cluster.aprsgw.id
task_definition = aws_ecs_task_definition.aprsgw.arn
enable_ecs_managed_tags = true
launch_type = "FARGATE"
platform_version = "LATEST"
desired_count = 1
network_configuration {
assign_public_ip = true
security_groups = []
subnets = [aws_subnet.public["us-east-1b"].id]
}
}

166
ham_ingestion.tf Normal file
View File

@ -0,0 +1,166 @@
resource "aws_sns_topic" "ham_telem" {
name = "ham-telem"
delivery_policy = <<EOF
{
"http": {
"defaultHealthyRetryPolicy": {
"minDelayTarget": 5,
"maxDelayTarget": 30,
"numRetries": 100,
"numMaxDelayRetries": 0,
"numNoDelayRetries": 3,
"numMinDelayRetries": 0,
"backoffFunction": "linear"
},
"disableSubscriptionOverrides": false
}
}
EOF
}
resource "aws_iam_role" "ham_sqs_to_elk" {
path = "/service-role/"
name = "ham_sqs-to-elk"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}
EOF
max_session_duration = 3600
}
resource "aws_iam_role_policy" "ham_sqs_to_elk" {
name = "ham_sqs_to_elk"
role = aws_iam_role.ham_sqs_to_elk.name
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:log-group:/aws/lambda/*"
]
},
{
"Effect": "Allow",
"Action": "es:*",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "sqs:*",
"Resource": "*"
}
]
}
EOF
}
resource "aws_lambda_function" "ham_sqs_to_elk" {
function_name = "ham-sqs-to-elk"
handler = "ham_sqs_to_elk.lambda_handler"
s3_bucket = aws_s3_bucket_object.lambda.bucket
s3_key = aws_s3_bucket_object.lambda.key
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.ham_sqs_to_elk.arn
runtime = "python3.9"
timeout = 5
reserved_concurrent_executions = 100
environment {
variables = {
"ES" = aws_route53_record.es.fqdn
}
}
tags = {
Name = "ham_sqs_to_elk"
}
}
resource "aws_lambda_event_source_mapping" "ham_sqs_to_elk" {
event_source_arn = aws_sqs_queue.ham_sqs_to_elk.arn
function_name = aws_lambda_function.ham_sqs_to_elk.arn
batch_size = 20
maximum_batching_window_in_seconds = 15
}
resource "aws_sns_topic_subscription" "ham_sqs_to_elk" {
topic_arn = aws_sns_topic.ham_telem.arn
protocol = "sqs"
endpoint = aws_sqs_queue.ham_sqs_to_elk.arn
}
resource "aws_sqs_queue" "ham_sqs_to_elk" {
name = "ham-to-elk"
receive_wait_time_seconds = 1
message_retention_seconds = 1209600 # 14 days
redrive_policy = jsonencode(
{
deadLetterTargetArn = aws_sqs_queue.ham_sqs_to_elk_dlq.arn
maxReceiveCount = 100
}
)
}
resource "aws_sqs_queue" "ham_sqs_to_elk_dlq" {
name = "ham-to-elk-dlq"
receive_wait_time_seconds = 1
message_retention_seconds = 1209600 # 14 days
}
resource "aws_sqs_queue_policy" "ham_sqs_to_elk" {
queue_url = aws_sqs_queue.ham_sqs_to_elk.id
policy = <<EOF
{
"Version": "2008-10-17",
"Id": "__default_policy_ID",
"Statement": [
{
"Sid": "__owner_statement",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"
},
"Action": "SQS:*",
"Resource": "${aws_sqs_queue.ham_sqs_to_elk.arn}"
},
{
"Sid": "to-elk",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": "SQS:SendMessage",
"Resource": "${aws_sqs_queue.ham_sqs_to_elk.arn}",
"Condition": {
"ArnLike": {
"aws:SourceArn": "${aws_sns_topic.ham_telem.arn}"
}
}
}
]
}
EOF
}

View File

@ -0,0 +1,35 @@
import json
import es
def lambda_handler(event, context):
payloads = {}
for record in event['Records']:
sns_message = json.loads(record["body"])
if type(json.loads(sns_message["Message"])) == dict:
incoming_payloads = [json.loads(sns_message["Message"])]
else:
incoming_payloads = json.loads(sns_message["Message"])
for payload in incoming_payloads:
index = payload['datetime'][:7]
if index not in payloads: # create index if not exists
payloads[index] = []
payloads[index].append(payload)
for index in payloads:
body=""
for payload in payloads[index]:
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
body += "\n"
result = es.request(body, f"ham-telm-{index}/_doc/_bulk", "POST")
if 'errors' in result and result['errors'] == True:
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
print(event)
print(result)
error_types = [a for a in error_types if a != 'mapper_parsing_exception'] # filter out mapper failures since they will never succeed
if error_types:
raise RuntimeError

File diff suppressed because one or more lines are too long