add in mirroring, and redo predictor updater

This commit is contained in:
Michaela Wheeler 2021-11-29 21:09:32 +11:00
parent ed05776d15
commit 09f8136c85
15 changed files with 400 additions and 57 deletions

47
es.tf
View File

@ -1,8 +1,8 @@
resource "aws_elasticsearch_domain" "ElasticsearchDomain" {
domain_name = "sondes-v2"
elasticsearch_version = "OpenSearch_1.0"
domain_name = "sondes-v2-7-9"
elasticsearch_version = "7.9"
cluster_config {
dedicated_master_count = 3
dedicated_master_enabled = false
@ -17,7 +17,13 @@ resource "aws_elasticsearch_domain" "ElasticsearchDomain" {
role_arn = aws_iam_role.IAMRole3.arn
user_pool_id = aws_cognito_user_pool.CognitoUserPool.id
}
domain_endpoint_options {
enforce_https = true
tls_security_policy = "Policy-Min-TLS-1-2-2019-07"
custom_endpoint = "es.v2.sondehub.org"
custom_endpoint_certificate_arn = "arn:aws:acm:us-east-1:143841941773:certificate/a7da821c-bdbc-404b-aa12-bce28d86cdeb"
custom_endpoint_enabled = true
}
access_policies = <<EOF
{
"Version": "2012-10-17",
@ -28,7 +34,7 @@ resource "aws_elasticsearch_domain" "ElasticsearchDomain" {
"AWS": "*"
},
"Action": "es:*",
"Resource": "arn:aws:es:us-east-1:${data.aws_caller_identity.current.account_id}:domain/sondes-v2/*"
"Resource": "arn:aws:es:us-east-1:${data.aws_caller_identity.current.account_id}:domain/sondes-v2*"
}
]
}
@ -40,6 +46,12 @@ EOF
node_to_node_encryption {
enabled = true
}
advanced_security_options {
enabled = true
master_user_options {
master_user_arn = "arn:aws:iam::143841941773:role/es-admin"
}
}
advanced_options = {
"rest.action.multi.allow_explicit_index" = "true"
"override_main_response_version" = "true"
@ -71,7 +83,7 @@ data "aws_kms_key" "es" {
resource "aws_cognito_identity_pool" "CognitoIdentityPool" {
identity_pool_name = "sondes"
allow_unauthenticated_identities = true
allow_unauthenticated_identities = false
supported_login_providers = {
"accounts.google.com" = "575970424139-vkk7scicbdd1igj04riqjh2bbs0oa6vj.apps.googleusercontent.com"
@ -82,6 +94,17 @@ resource "aws_cognito_identity_pool" "CognitoIdentityPool" {
server_side_token_check = false
}
cognito_identity_providers {
client_id = "4uvts41d75b2r2cmsdgff47pec"
provider_name = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM"
server_side_token_check = false
}
cognito_identity_providers {
client_id = "7v892rnrta8ms785pl0aaqo8ke"
provider_name = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM"
server_side_token_check = false
}
}
resource "aws_cognito_identity_pool_roles_attachment" "CognitoIdentityPoolRoleAttachment" {
@ -90,11 +113,21 @@ resource "aws_cognito_identity_pool_roles_attachment" "CognitoIdentityPoolRoleAt
authenticated = aws_iam_role.auth_role.arn
unauthenticated = aws_iam_role.unauth_role.arn
}
role_mapping {
ambiguous_role_resolution = "AuthenticatedRole"
identity_provider = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM:4uvts41d75b2r2cmsdgff47pec"
type = "Token"
}
role_mapping {
ambiguous_role_resolution = "AuthenticatedRole"
identity_provider = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM:62uut02s5ts991uhpf4fbuvrtj"
type = "Token"
}
role_mapping {
ambiguous_role_resolution = "AuthenticatedRole"
identity_provider = "cognito-idp.us-east-1.amazonaws.com/us-east-1_G4H7NMniM:7v892rnrta8ms785pl0aaqo8ke"
type = "Token"
}
}
resource "aws_cognito_user_pool" "CognitoUserPool" {
@ -293,12 +326,12 @@ resource "aws_iam_role_policy" "IAMPolicy" {
{
"Effect": "Allow",
"Action": "es:*",
"Resource": "arn:aws:es:us-east-1:${data.aws_caller_identity.current.account_id}:domain/sondes-v2"
"Resource": "arn:aws:es:us-east-1:${data.aws_caller_identity.current.account_id}:domain/sondes-*"
},
{
"Effect": "Allow",
"Action": "es:*",
"Resource": "arn:aws:es:us-east-1:${data.aws_caller_identity.current.account_id}:domain/sondes-v2/*"
"Resource": "arn:aws:es:us-east-1:${data.aws_caller_identity.current.account_id}:domain/sondes-*"
}
]
}

View File

@ -203,12 +203,12 @@ resource "aws_iam_role_policy" "history" {
{
"Effect": "Allow",
"Action": "es:*",
"Resource": "arn:aws:es:us-east-1:${data.aws_caller_identity.current.account_id}:domain/sondes-v2"
"Resource": "arn:aws:es:us-east-1:${data.aws_caller_identity.current.account_id}:domain/sondes-v2*"
},
{
"Effect": "Allow",
"Action": "es:*",
"Resource": "arn:aws:es:us-east-1:${data.aws_caller_identity.current.account_id}:domain/sondes-v2/*"
"Resource": "arn:aws:es:us-east-1:${data.aws_caller_identity.current.account_id}:domain/sondes-v2*"
}
]
}

View File

@ -14,6 +14,20 @@ BUCKET = "sondehub-history"
s3 = boto3.resource('s3')
http_session = URLLib3Session()
from multiprocessing import Process
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
def es_request(payload, path, method, params=None):
# get aws creds
session = boto3.Session()
@ -30,9 +44,8 @@ def es_request(payload, path, method, params=None):
)
SigV4Auth(boto3.Session().get_credentials(),
"es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
return json.loads(r.text)

View File

@ -12,6 +12,20 @@ import os
import gzip
from io import BytesIO
from multiprocessing import Process
http_session = URLLib3Session()
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
HOST = os.getenv("ES")
sqs = boto3.client('sqs', region_name="us-east-1")
@ -38,9 +52,8 @@ def es_request(payload, path, method):
)
SigV4Auth(boto3.Session().get_credentials(),
"es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
return json.loads(r.text)

View File

@ -16,6 +16,21 @@ from io import BytesIO
HOST = os.getenv("ES")
# get current sondes, filter by date, location
from multiprocessing import Process
http_session = URLLib3Session()
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
def history(event, context):
s3 = boto3.resource('s3')
serial = str(event["pathParameters"]["serial"])
@ -97,9 +112,8 @@ def es_request(payload, path, method):
method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
return json.loads(r.text)

View File

@ -16,6 +16,19 @@ from io import BytesIO
import base64
HOST = os.getenv("ES")
http_session = URLLib3Session()
from multiprocessing import Process
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
def predict(event, context):
@ -150,9 +163,8 @@ def es_request(payload, path, method):
)
SigV4Auth(boto3.Session().get_credentials(),
"es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
return json.loads(r.text)

View File

@ -16,6 +16,19 @@ from io import BytesIO
from math import radians, degrees, sin, cos, atan2, sqrt, pi
HOST = os.getenv("ES")
http_session = URLLib3Session()
from multiprocessing import Process
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
#
@ -274,7 +287,7 @@ def get_standard_prediction(conn, timestamp, latitude, longitude, altitude, curr
# Generate the prediction URL
url = f"/api/v1/?launch_latitude={latitude}&launch_longitude={longitude}&launch_datetime={timestamp}&launch_altitude={altitude:.2f}&ascent_rate={ascent_rate:.2f}&burst_altitude={burst_altitude:.2f}&descent_rate={descent_rate:.2f}"
logging.debug(url)
conn.request("GET", url)
res = conn.getresponse()
data = res.read()
@ -330,7 +343,7 @@ def get_launch_estimate(conn, timestamp, latitude, longitude, altitude, ascent_r
# Generate the prediction URL
url = f"/api/v1/?profile=reverse_profile&launch_latitude={latitude}&launch_longitude={longitude}&launch_datetime={timestamp}&launch_altitude={altitude:.2f}&ascent_rate={ascent_rate:.2f}"
logging.debug(url)
conn.request("GET", url)
res = conn.getresponse()
data = res.read()
@ -823,9 +836,8 @@ def es_request(params, path, method):
method=method, url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
if r.status_code != 200:
raise RuntimeError

View File

@ -72,12 +72,12 @@ resource "aws_lambda_function" "predict_updater" {
filename = "${path.module}/build/predict_updater.zip"
source_code_hash = data.archive_file.predict_updater.output_base64sha256
publish = true
memory_size = 256
memory_size = 1024
role = aws_iam_role.predict_updater.arn
runtime = "python3.9"
architectures = ["arm64"]
timeout = 60
reserved_concurrent_executions = 8
timeout = 300
reserved_concurrent_executions = 1
environment {
variables = {
"ES" = aws_route53_record.es.fqdn
@ -221,6 +221,12 @@ resource "aws_ecs_task_definition" "tawhiri" {
"--threads=1",
"tawhiri.api:app"
]
dependsOn = [
{
containerName = "downloader"
condition = "SUCCESS"
}
]
cpu = 0
environment = []
essential = true
@ -254,7 +260,6 @@ resource "aws_ecs_task_definition" "tawhiri" {
volumesFrom = []
},
{
command = ["daemon"]
cpu = 0
environment = [
{
@ -262,7 +267,7 @@ resource "aws_ecs_task_definition" "tawhiri" {
value = "UTC"
}
]
essential = true
essential = false
image = "${data.aws_caller_identity.current.account_id}.dkr.ecr.us-east-1.amazonaws.com/tawhiri-downloader:latest"
logConfiguration = {
logDriver = "awslogs"
@ -295,24 +300,24 @@ resource "aws_ecs_task_definition" "tawhiri" {
volume {
name = "downloader"
}
volume {
name = "srv"
efs_volume_configuration {
file_system_id = aws_efs_file_system.tawhiri.id
root_directory = "srv"
root_directory = "/"
transit_encryption = "DISABLED"
authorization_config {
iam = "DISABLED"
}
}
}
volume {
name = "downloader"
}
}
resource "aws_ecs_task_definition" "tawhiri_ruaumoko" {
@ -615,4 +620,70 @@ resource "aws_route53_record" "tawhiri_AAAA" {
evaluate_target_health = true
}
zone_id = aws_route53_zone.Route53HostedZone.zone_id
}
}
resource "aws_iam_role" "predictor_update_trigger_lambda" {
path = "/service-role/"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}
EOF
max_session_duration = 3600
}
resource "aws_iam_role_policy" "predictor_update_trigger_lambda" {
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ecs:UpdateService",
"Resource": "*"
}
]
}
EOF
role = aws_iam_role.predictor_update_trigger_lambda.name
}
data "archive_file" "predictor_update_trigger_lambda" {
type = "zip"
source_file = "tawhiri-updater/index.py"
output_path = "${path.module}/build/tawhiri-updater.zip"
}
resource "aws_lambda_function" "predictor_update_trigger_lambda" {
function_name = "tawhiri-updater"
handler = "index.handler"
filename = "${path.module}/build/tawhiri-updater.zip"
source_code_hash = data.archive_file.predictor_update_trigger_lambda.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.predictor_update_trigger_lambda.arn
runtime = "python3.9"
timeout = 3
}
resource "aws_lambda_permission" "predictor_update_trigger_lambda" {
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.predictor_update_trigger_lambda.function_name
principal = "sns.amazonaws.com"
source_arn = "arn:aws:sns:us-east-1:123901341784:NewGFSObject"
}
resource "aws_sns_topic_subscription" "predictor_update_trigger_lambda" {
topic_arn = "arn:aws:sns:us-east-1:123901341784:NewGFSObject"
protocol = "lambda"
endpoint = aws_lambda_function.predictor_update_trigger_lambda.arn
}
# sns subscription

View File

@ -13,6 +13,19 @@ import base64
import gzip
from io import BytesIO
from multiprocessing import Process
http_session = URLLib3Session()
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
HOST = os.getenv("ES")
# get current sondes, filter by date, location
@ -412,9 +425,8 @@ def es_request(payload, path, method):
method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
return json.loads(r.text)
@ -438,17 +450,17 @@ if __name__ == "__main__":
# {},
# )
# )
print(get_sites({},{}))
# print(
# get_telem(
# {
# "queryStringParameters": {
# "duration": "3d",
# "serial": "P4120469"
# }},{}
# print(get_sites({},{}))
print(
get_telem(
{
"queryStringParameters": {
"duration": "3d",
"serial": "P4120469"
}},{}
# )
# )
)
)
# print (
# get_chase(
# {"queryStringParameters": {

View File

@ -13,7 +13,19 @@ from io import BytesIO
import gzip
HOST = os.getenv("ES")
http_session = URLLib3Session()
from multiprocessing import Process
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
def es_request(payload, path, method):
# get aws creds
@ -32,9 +44,8 @@ def es_request(payload, path, method):
)
SigV4Auth(boto3.Session().get_credentials(),
"es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
return json.loads(r.text)

View File

@ -17,6 +17,18 @@ import base64
HOST = os.getenv("ES")
from multiprocessing import Process
http_session = URLLib3Session()
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
r = http_session.send(request.prepare())
def predict(event, context):
path = "reverse-prediction-*/_search"
@ -173,6 +185,7 @@ def es_request(payload, path, method):
SigV4Auth(boto3.Session().get_credentials(),
"es", "us-east-1").add_auth(request)
p = Process(target=mirror, args=(path,params)).start()
session = URLLib3Session()
r = session.send(request.prepare())
return json.loads(r.text)

View File

@ -9,6 +9,7 @@ from io import BytesIO
import gzip
null = None
HOST = os.getenv("ES")
http_session = URLLib3Session()
def es_request(payload, path, method):
# get aws creds
@ -23,8 +24,7 @@ def es_request(payload, path, method):
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
r = http_session.send(request.prepare())
if r.status_code != 200:
raise RuntimeError
return json.loads(r.text)

View File

@ -95,6 +95,13 @@ resource "aws_sqs_queue" "sqs_to_elk" {
name = "to-elk"
receive_wait_time_seconds = 1
message_retention_seconds = 1209600 # 14 days
redrive_policy = jsonencode(
{
deadLetterTargetArn = "arn:aws:sqs:us-east-1:143841941773:to-elk-dlq"
maxReceiveCount = 100
}
)
}
resource "aws_sqs_queue_policy" "sqs_to_elk" {
@ -130,4 +137,78 @@ resource "aws_sqs_queue_policy" "sqs_to_elk" {
]
}
EOF
}
# temporary opensearch test
resource "aws_lambda_function" "sqs_to_elk_os" {
function_name = "sqs-to-elk-os"
handler = "lambda_function.lambda_handler"
filename = "${path.module}/build/sqs-to-elk.zip"
source_code_hash = data.archive_file.sqs_to_elk.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.sqs_to_elk.arn
runtime = "python3.9"
timeout = 5
reserved_concurrent_executions = 100
environment {
variables = {
"ES" = "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com"
}
}
}
resource "aws_lambda_event_source_mapping" "sqs_to_elk_os" {
event_source_arn = aws_sqs_queue.sqs_to_elk_os.arn
function_name = aws_lambda_function.sqs_to_elk_os.arn
batch_size = 20
maximum_batching_window_in_seconds = 15
}
resource "aws_sns_topic_subscription" "sqs_to_elk_os" {
topic_arn = aws_sns_topic.sonde_telem.arn
protocol = "sqs"
endpoint = aws_sqs_queue.sqs_to_elk_os.arn
}
resource "aws_sqs_queue" "sqs_to_elk_os" {
name = "to-elk-os"
receive_wait_time_seconds = 1
message_retention_seconds = 1209600 # 14 days
}
resource "aws_sqs_queue_policy" "sqs_to_elk_os" {
queue_url = aws_sqs_queue.sqs_to_elk_os.id
policy = <<EOF
{
"Version": "2008-10-17",
"Id": "__default_policy_ID",
"Statement": [
{
"Sid": "__owner_statement",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"
},
"Action": "SQS:*",
"Resource": "${aws_sqs_queue.sqs_to_elk_os.arn}"
},
{
"Sid": "to-elk",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": "SQS:SendMessage",
"Resource": "${aws_sqs_queue.sqs_to_elk_os.arn}",
"Condition": {
"ArnLike": {
"aws:SourceArn": "${aws_sns_topic.sonde_telem.arn}"
}
}
}
]
}
EOF
}

View File

@ -17,6 +17,20 @@ import os
from io import BytesIO
import gzip
from multiprocessing import Process
http_session = URLLib3Session()
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
r = http_session.send(request.prepare())
HOST = os.getenv("ES")
def lambda_handler(event, context):
@ -78,7 +92,7 @@ def es_request(payload, path, method):
method="POST", url=f"https://{HOST}/{path}", data=payload, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
p = Process(target=mirror, args=(path,payload)).start()
session = URLLib3Session()
r = session.send(request.prepare())
if r.status_code != 200 and r.status_code != 201:

44
tawhiri-updater/index.py Normal file
View File

@ -0,0 +1,44 @@
import boto3
import json
import re
MATCH_OBJECT = re.compile(r"^gfs.\d{8}/\d{2}/atmos/gfs.t\d{2}z.prgb2.0p50.f192$")
BUCKET = 'noaa-gfs-bdp-pds'
SERVICE_NAME="tawhiri"
CLUSTER_NAME="Tawhiri"
ecs = boto3.client('ecs', region_name="us-east-1")
def handler(event, context):
for record in event["Records"]:
message = json.loads(record["Sns"]["Message"])
for inner_record in message['Records']:
if "ObjectCreated" in inner_record['eventName']:
if inner_record['s3']['bucket']['name'] == BUCKET:
if MATCH_OBJECT.match(inner_record['s3']['object']['key']):
print(f"Found new GFS - updating service {inner_record['s3']['object']['key']}")
ecs.update_service(cluster=CLUSTER_NAME, service=SERVICE_NAME, forceNewDeployment=True)
if __name__ == "__main__":
handler(
{
"Records": [
{
"EventSource": "aws:sns",
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:us-east-1:123901341784:NewGFSObject:47c29dbe-6482-4495-bc25-41686b9c8f30",
"Sns": {
"Type": "Notification",
"MessageId": "4ddb4adc-c245-5f45-bb5b-10c24be3e4b5",
"TopicArn": "arn:aws:sns:us-east-1:123901341784:NewGFSObject",
"Subject": "Amazon S3 Notification",
"Message": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\",\"eventTime\":\"2021-11-29T07:55:17.516Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:AROAIFS7SMW4FSODZYIMM:Fetch_GFS_NCEP\"},\"requestParameters\":{\"sourceIPAddress\":\"52.204.74.204\"},\"responseElements\":{\"x-amz-request-id\":\"ZSHQPQ51RFMMDSCJ\",\"x-amz-id-2\":\"NSLglU4PxYEEXmKN4LHrJg3jeHjKafCU6SaDSWbfwKjvcfKsrpMB/SLvfW+lKjn0d256kNhV845Cu/OHxMrC/GQ1EEVn4ODC\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"NjNmNjg3MWUtNTAzNy00YTcxLWI3ZGMtM2MzMjI2OGY5Y2Ey\",\"bucket\":{\"name\":\"noaa-gfs-bdp-pds\",\"ownerIdentity\":{\"principalId\":\"A2AJV00K47QOI1\"},\"arn\":\"arn:aws:s3:::noaa-gfs-bdp-pds\"},\"object\":{\"key\":\"gfs.20211127/12/atmos/gfs.t12z.prgb2.0p50.f192\",\"size\":5242,\"eTag\":\"2e6fa824124d06b1e0af0a6c852f37cc\",\"sequencer\":\"0061A48765785BC5E7\"}}}]}",
"Timestamp": "2021-11-29T07:55:18.716Z",
"SignatureVersion": "1",
"Signature": "Jb7AzFgOzDXgsllGk04XJZQv3KF+2/JXziU2uFV6r5fti3GiLzQm9gZtx2imUuLCfNayFBRckzV3Q7ZxxxoUcebg0gG6Is0j/sVVHauLX/VhdkmyyjdkeJdqsnnBMOGCxiMXwO6YRAmTFM5Fx1WXiPLc5+TKoxxM1OmtPBkirmheJOpSzyvAX/BN8XdD+E/WjBtUZnc0qpy5kN/MVm6pwiNUNTZlMjBtPC8+qw9a04HGk2SkWb/nSksoYZnTnWDrxVu7lpQc7QnG2RA8KrevgisSyfMweeWKfQe1zRs6e+Uopepto48UsZ08A340kUcEsEdXf/XW5xMlPYrgIrTTXQ==",
"SigningCertUrl": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-7ff5318490ec183fbaddaa2a969abfda.pem",
"UnsubscribeUrl": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123901341784:NewGFSObject:47c29dbe-6482-4495-bc25-41686b9c8f30",
"MessageAttributes": {}
}
}
]
}, {})