add history

This commit is contained in:
Michaela 2021-02-15 15:23:51 +10:00
parent 414c4dc989
commit 084cef93b2
3 changed files with 199 additions and 10 deletions

View File

@ -0,0 +1,68 @@
import boto3
import json
import os
from datetime import datetime, timedelta
import threading
from queue import Queue
import queue
S3_BUCKET = "sondehub-open-data"
class Downloader(threading.Thread): # Stolen from the SDK, if I wasn't lazy I'd made a build chain for this lambda so we can reuse the code in both projects
def __init__(
self, tasks_to_accomplish, tasks_that_are_done, debug=False, *args, **kwargs
):
self.tasks_to_accomplish = tasks_to_accomplish
self.tasks_that_are_done = tasks_that_are_done
self.debug = debug
super().__init__(*args, **kwargs)
def run(self):
s3 = boto3.client("s3")
while True:
try:
task = self.tasks_to_accomplish.get_nowait()
except queue.Empty:
return
data = s3.get_object(Bucket=task[0], Key=task[1])
response = json.loads(data["Body"].read())
if self.debug:
print(response)
self.tasks_that_are_done.put(response)
self.tasks_to_accomplish.task_done()
def download(serial):
prefix_filter = f"serial-hashed/{serial}/"
s3 = boto3.resource("s3")
bucket = s3.Bucket(S3_BUCKET)
data = []
number_of_processes = 200
tasks_to_accomplish = Queue()
tasks_that_are_done = Queue()
for s3_object in bucket.objects.filter(Prefix=prefix_filter):
tasks_to_accomplish.put((s3_object.bucket_name, s3_object.key))
for _ in range(number_of_processes):
Downloader(tasks_to_accomplish, tasks_that_are_done, False).start()
tasks_to_accomplish.join()
while not tasks_that_are_done.empty():
data.append(tasks_that_are_done.get())
return data
def history(event, context):
radiosondes = download(serial=event["pathParameters"]["serial"])
return json.dumps(radiosondes)
if __name__ == "__main__":
# print(get_sondes({"queryStringParameters":{"lat":"-28.22717","lon":"153.82996","distance":"50000"}}, {}))
print(
history(
{"pathParameters": {"serial": "R2450480"}}, {}
)
)

117
main.tf
View File

@ -18,7 +18,9 @@ locals {
}
data "aws_caller_identity" "current" {}
data "aws_iot_endpoint" "endpoint" {}
data "aws_iot_endpoint" "endpoint" {
endpoint_type = "iot:Data-ATS"
}
resource "aws_iam_role" "IAMRole" {
path = "/"
@ -143,6 +145,23 @@ EOF
max_session_duration = 3600
}
resource "aws_iam_role" "history" {
name = "history"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}
EOF
max_session_duration = 3600
}
resource "aws_iam_service_linked_role" "IAMServiceLinkedRole" {
aws_service_name = "es.amazonaws.com"
}
@ -171,7 +190,7 @@ resource "aws_iam_policy" "IAMManagedPolicy" {
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:log-group:/aws/lambda/sonde-api-to-iot-core:*"
"arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:log-group:/aws/lambda/*"
]
}
]
@ -323,6 +342,43 @@ EOF
role = aws_iam_role.sign_socket.name
}
resource "aws_iam_role_policy" "history" {
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": "arn:aws:s3:::sondehub-open-data/*"
},
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": "arn:aws:s3:::sondehub-open-data"
},
{
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:us-east-1:${data.aws_caller_identity.current.account_id}:log-group:/aws/lambda/*"
]
}
]
}
EOF
role = aws_iam_role.history.name
}
resource "aws_route53_zone" "Route53HostedZone" {
name = "${local.domain_name}."
}
@ -407,6 +463,12 @@ data "archive_file" "query" {
output_path = "${path.module}/build/query.zip"
}
data "archive_file" "history" {
type = "zip"
source_file = "history/lambda_function.py"
output_path = "${path.module}/build/history.zip"
}
data "archive_file" "sign_socket" {
type = "zip"
@ -511,6 +573,25 @@ resource "aws_lambda_function" "sign_socket" {
]
}
resource "aws_lambda_function" "history" {
function_name = "history"
handler = "lambda_function.history"
filename = "${path.module}/build/history.zip"
source_code_hash = data.archive_file.history.output_base64sha256
publish = true
memory_size = 1024
role = aws_iam_role.history.arn
runtime = "python3.7"
timeout = 30
tracing_config {
mode = "Active"
}
layers = [
"arn:aws:lambda:us-east-1:${data.aws_caller_identity.current.account_id}:layer:xray-python:1",
"arn:aws:lambda:us-east-1:${data.aws_caller_identity.current.account_id}:layer:iot:3"
]
}
resource "aws_lambda_permission" "sign_socket" {
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.sign_socket.arn
@ -518,6 +599,13 @@ resource "aws_lambda_permission" "sign_socket" {
source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:r03szwwq41/*/*/sondes/websocket"
}
resource "aws_lambda_permission" "history" {
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.history.arn
principal = "apigateway.amazonaws.com"
source_arn = "arn:aws:execute-api:us-east-1:${data.aws_caller_identity.current.account_id}:r03szwwq41/*/*/sonde/{serial}"
}
resource "aws_lambda_permission" "get_sondes" {
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.get_sondes.arn
@ -620,6 +708,13 @@ resource "aws_apigatewayv2_route" "sign_socket" {
route_key = "GET /sondes/websocket"
target = "integrations/${aws_apigatewayv2_integration.sign_socket.id}"
}
resource "aws_apigatewayv2_route" "history" {
api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id
api_key_required = false
authorization_type = "NONE"
route_key = "GET /sonde/{serial}"
target = "integrations/${aws_apigatewayv2_integration.history.id}"
}
resource "aws_apigatewayv2_route" "get_sondes" {
api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id
@ -647,6 +742,16 @@ resource "aws_apigatewayv2_integration" "sign_socket" {
payload_format_version = "2.0"
}
resource "aws_apigatewayv2_integration" "history" {
api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id
connection_type = "INTERNET"
integration_method = "POST"
integration_type = "AWS_PROXY"
integration_uri = aws_lambda_function.history.arn
timeout_milliseconds = 30000
payload_format_version = "2.0"
}
resource "aws_apigatewayv2_integration" "get_sondes" {
api_id = aws_apigatewayv2_api.ApiGatewayV2Api.id
connection_type = "INTERNET"
@ -708,9 +813,9 @@ resource "aws_elasticsearch_domain" "ElasticsearchDomain" {
dedicated_master_count = 3
dedicated_master_enabled = false
dedicated_master_type = "t3.small.elasticsearch"
instance_count = 2
instance_type = "m5.large.elasticsearch"
zone_awareness_enabled = true
instance_count = 1
instance_type = "r5.large.elasticsearch"
zone_awareness_enabled = false
}
cognito_options {
enabled = true
@ -747,7 +852,7 @@ EOF
ebs_options {
ebs_enabled = true
volume_type = "gp2"
volume_size = 10
volume_size = 60
}
}
data "aws_kms_key" "es" {

View File

@ -5,6 +5,7 @@ from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import json
import os
from datetime import datetime, timedelta
HOST = os.getenv("ES")
# get current sondes, filter by date, location
@ -88,6 +89,8 @@ def get_telem(event, context):
"3h": (10800, 10), # 3h, 10s
}
duration_query = "3h"
requested_time = datetime.now()
if (
"queryStringParameters" in event
@ -98,7 +101,19 @@ def get_telem(event, context):
else:
return f"Duration must be either {', '.join(durations.keys())}"
if (
"queryStringParameters" in event
and "datetime" in event["queryStringParameters"]
):
requested_time = datetime.fromisoformat(event["queryStringParameters"]["datetime"].replace("Z", "+00:00"))
(duration, interval) = durations[duration_query]
lt = requested_time
gte = requested_time - timedelta(0,duration)
path = "telm-*/_search"
payload = {
@ -141,7 +156,10 @@ def get_telem(event, context):
{"match_all": {}},
{
"range": {
"datetime": {"gte": f"now-{str(duration)}s", "lt": "now"}
"datetime": {
"gte": gte.isoformat(),
"lt": lt.isoformat()
}
}
},
]
@ -150,7 +168,6 @@ def get_telem(event, context):
}
if "queryStringParameters" in event:
if "serial" in event["queryStringParameters"]:
print("test")
payload["query"]["bool"]["filter"].append(
{
"match_phrase": {
@ -158,7 +175,6 @@ def get_telem(event, context):
}
}
)
print(payload)
results = es_request(payload, path, "POST")
output = {
sonde["key"]: {
@ -190,6 +206,6 @@ if __name__ == "__main__":
# print(get_sondes({"queryStringParameters":{"lat":"-28.22717","lon":"153.82996","distance":"50000"}}, {}))
print(
get_telem(
{"queryStringParameters": {"serial": "R4450388", "duration": "3h"}}, {}
{"queryStringParameters": {"duration": "6h", "datetime": "2021-01-31T00:10:40.001000Z"}}, {}
)
)