This commit is contained in:
Michaela Wheeler 2021-12-20 17:02:02 +11:00
parent 9bd009ea1c
commit 512e366607
53 changed files with 5802 additions and 1206 deletions

View File

@ -1,14 +1,4 @@
data "archive_file" "historic_to_s3" {
type = "zip"
source_file = "historic/historic_es_to_s3/index.py"
output_path = "${path.module}/build/historic_to_s3.zip"
}
data "archive_file" "queue_data_update" {
type = "zip"
source_file = "historic/queue_data_update/index.py"
output_path = "${path.module}/build/queue_data_update.zip"
}
resource "aws_iam_role" "historic" {
path = "/service-role/"
@ -74,9 +64,9 @@ EOF
resource "aws_lambda_function" "historic_to_s3" {
function_name = "historic_to_s3"
handler = "index.handler"
filename = "${path.module}/build/historic_to_s3.zip"
source_code_hash = data.archive_file.historic_to_s3.output_base64sha256
handler = "historic_es_to_s3.handler"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 3096
role = aws_iam_role.historic.arn
@ -91,9 +81,9 @@ resource "aws_lambda_function" "historic_to_s3" {
}
resource "aws_lambda_function" "queue_data_update" {
function_name = "queue_data_update"
handler = "index.handler"
filename = "${path.module}/build/queue_data_update.zip"
source_code_hash = data.archive_file.queue_data_update.output_base64sha256
handler = "queue_data_update.handler"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 256
role = aws_iam_role.historic.arn
@ -216,17 +206,12 @@ EOF
role = aws_iam_role.history.name
}
data "archive_file" "history" {
type = "zip"
source_file = "history/lambda_function.py"
output_path = "${path.module}/build/history.zip"
}
resource "aws_lambda_function" "history" {
function_name = "history"
handler = "lambda_function.history"
filename = "${path.module}/build/history.zip"
source_code_hash = data.archive_file.history.output_base64sha256
handler = "history.history"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 512
role = aws_iam_role.basic_lambda_role.arn

View File

@ -1,20 +1,8 @@
data "archive_file" "api_to_iot" {
type = "zip"
source_dir = "sonde-api-to-iot-core/"
output_path = "${path.module}/build/sonde-api-to-iot-core.zip"
}
data "archive_file" "station_api_to_iot" {
type = "zip"
source_file = "station-api-to-iot-core/lambda_function.py"
output_path = "${path.module}/build/station-api-to-iot-core.zip"
}
resource "aws_lambda_function" "upload_telem" {
function_name = "sonde-api-to-iot-core"
handler = "lambda_function.lambda_handler"
filename = "${path.module}/build/sonde-api-to-iot-core.zip"
source_code_hash = data.archive_file.api_to_iot.output_base64sha256
handler = "sonde-api-to-iot-core.lambda_handler"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.basic_lambda_role.arn
@ -30,9 +18,9 @@ resource "aws_lambda_function" "upload_telem" {
resource "aws_lambda_function" "station" {
function_name = "station-api-to-iot-core"
handler = "lambda_function.lambda_handler"
filename = "${path.module}/build/station-api-to-iot-core.zip"
source_code_hash = data.archive_file.station_api_to_iot.output_base64sha256
handler = "station-api-to-iot-core.lambda_handler"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.basic_lambda_role.arn

0
lambda/__init__.py Normal file
View File

34
lambda/es/__init__.py Normal file
View File

@ -0,0 +1,34 @@
import boto3
import gzip
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
from io import BytesIO
import json
import os
es_session = URLLib3Session()
ES_HOST = os.getenv("ES")
def request(payload, path, method, params=None):
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(payload.encode('utf-8'))
payload = compressed.getvalue()
headers = {"Host": ES_HOST, "Content-Type": "application/json",
"Content-Encoding": "gzip"}
request = AWSRequest(
method=method, url=f"https://{ES_HOST}/{path}", data=payload, headers=headers, params=params
)
SigV4Auth(boto3.Session().get_credentials(),
"es", "us-east-1").add_auth(request)
r = es_session.send(request.prepare())
if r.status_code != 200 and r.status_code != 201:
raise RuntimeError
return json.loads(r.text)

View File

@ -1,54 +1,13 @@
import json
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import boto3
import botocore.credentials
import os
import gzip
from botocore.exceptions import ClientError
from io import BytesIO
import es
HOST = os.getenv("ES")
BUCKET = "sondehub-history"
s3 = boto3.resource('s3')
http_session = URLLib3Session()
from multiprocessing import Process
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
def es_request(payload, path, method, params=None):
# get aws creds
session = boto3.Session()
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(payload.encode('utf-8'))
payload = compressed.getvalue()
headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method=method, url=f"https://{HOST}/{path}", data=payload, headers=headers, params=params
)
SigV4Auth(boto3.Session().get_credentials(),
"es", "us-east-1").add_auth(request)
#p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
return json.loads(r.text)
def fetch_es(serial):
payload = {
"size": 10000,
@ -74,7 +33,7 @@ def fetch_es(serial):
}
}
data = []
response = es_request(json.dumps(payload),
response = es.request(json.dumps(payload),
"telm-*/_search", "POST", params={"scroll": "1m"})
try:
data += [x["_source"] for x in response['hits']['hits']]
@ -84,13 +43,13 @@ def fetch_es(serial):
scroll_id = response['_scroll_id']
scroll_ids = [scroll_id]
while response['hits']['hits']:
response = es_request(json.dumps({"scroll": "1m", "scroll_id": scroll_id }),
response = es.request(json.dumps({"scroll": "1m", "scroll_id": scroll_id }),
"_search/scroll", "POST")
scroll_id = response['_scroll_id']
scroll_ids.append(scroll_id)
data += [x["_source"] for x in response['hits']['hits']]
for scroll_id in scroll_ids:
scroll_delete = es_request(json.dumps({"scroll_id": scroll_id }),
scroll_delete = es.request(json.dumps({"scroll_id": scroll_id }),
"_search/scroll", "DELETE")
print(scroll_delete)
return data
@ -184,7 +143,7 @@ def fetch_launch_sites():
}
}
response = es_request(json.dumps(payload),
response = es.request(json.dumps(payload),
"reverse-prediction-*/_search", "POST")
data = { x['key'] : x for x in response['aggregations']['2']['buckets']}
output = {}
@ -269,27 +228,3 @@ def handler(event, context):
write_s3(serial, data, launch_sites)
print(f"{serial} done")
if __name__ == "__main__":
print(handler(
{
"Records": [
{
"messageId": "3b5853b3-369c-40bf-8746-130c918fbb5c",
"receiptHandle": "AQEBg+/MIA2rSNmlrpXvk7pbi26kgIzqhairaHWGSpMgLzf2T54PLUmG+eG6CDOv35e42scDH0gppmS9RTQVu8D161oHYohhd1+0S4LtFJdgXr3At86NBIky5+y1A/wVuUm1FNQSvGKIDRgBhCgcCzLkEMNYIWWeDmg2ez2SCPu/3hmY5sc7eC32uqz5es9PspgQXOTykmoNv/q37iy2RBRDdu51Tq7yIxEr+Us3qkQrddAJ7qsA0l09aRL+/PJe1V/7MMN3CFyBATpRP/G3Gjn0Iuu4i2UhkRx2pF+0Hj8yhhHbqTMcw5sbbGIWMdsMXFQKUCHNx6HPkbuwIWo0TsINQjY7IXeZM/mNq65xC4avSlctJ/9BMzOBtFwbnRPZfHmlS5Al2nF1Vu3RecFGbTm1nQ==",
"body": "R0230678",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1627873604999",
"SenderId": "AROASC7NF3EG5DNHEPSYZ:queue_data_update",
"ApproximateFirstReceiveTimestamp": "1627873751266"
},
"messageAttributes": {},
"md5OfBody": "b3d67879b6a2e7f3abd62d404e53f71f",
"md5OfMessageAttributes": None,
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:143841941773:update-history",
"awsRegion": "us-east-1"
}
]
}, {}))

View File

@ -0,0 +1,23 @@
from . import *
print(handler(
{
"Records": [
{
"messageId": "3b5853b3-369c-40bf-8746-130c918fbb5c",
"receiptHandle": "AQEBg+/MIA2rSNmlrpXvk7pbi26kgIzqhairaHWGSpMgLzf2T54PLUmG+eG6CDOv35e42scDH0gppmS9RTQVu8D161oHYohhd1+0S4LtFJdgXr3At86NBIky5+y1A/wVuUm1FNQSvGKIDRgBhCgcCzLkEMNYIWWeDmg2ez2SCPu/3hmY5sc7eC32uqz5es9PspgQXOTykmoNv/q37iy2RBRDdu51Tq7yIxEr+Us3qkQrddAJ7qsA0l09aRL+/PJe1V/7MMN3CFyBATpRP/G3Gjn0Iuu4i2UhkRx2pF+0Hj8yhhHbqTMcw5sbbGIWMdsMXFQKUCHNx6HPkbuwIWo0TsINQjY7IXeZM/mNq65xC4avSlctJ/9BMzOBtFwbnRPZfHmlS5Al2nF1Vu3RecFGbTm1nQ==",
"body": "R0230678",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1627873604999",
"SenderId": "AROASC7NF3EG5DNHEPSYZ:queue_data_update",
"ApproximateFirstReceiveTimestamp": "1627873751266"
},
"messageAttributes": {},
"md5OfBody": "b3d67879b6a2e7f3abd62d404e53f71f",
"md5OfMessageAttributes": None,
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:143841941773:update-history",
"awsRegion": "us-east-1"
}
]
}, {}))

View File

@ -1,34 +1,9 @@
import boto3
import botocore.credentials
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import json
import os
from datetime import datetime, timedelta, timezone
import sys, traceback
import uuid
import gzip
from io import BytesIO
import es
# TODO , HEAD S3 object, if it's less than 24 hours check ES, else 302 to bucket
HOST = os.getenv("ES")
# get current sondes, filter by date, location
from multiprocessing import Process
http_session = URLLib3Session()
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
def history(event, context):
@ -82,7 +57,7 @@ def history(event, context):
},
}
results = es_request(payload, path, "POST")
results = es.request(json.dumps(payload), path, "POST")
output = [
{k: v for k, v in data["1"]["hits"]["hits"][0]["_source"].items() if k != 'user-agent' and k != 'upload_time_delta'}
@ -95,35 +70,5 @@ def history(event, context):
def es_request(payload, path, method):
# get aws creds
session = boto3.Session()
params = json.dumps(payload)
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(params.encode('utf-8'))
params = compressed.getvalue()
headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
#p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
return json.loads(r.text)
if __name__ == "__main__":
print(
history(
{"pathParameters": {"serial": "T1510227"}}, {}
)
)

View File

@ -0,0 +1,6 @@
from . import *
print(
history(
{"pathParameters": {"serial": "T1510227"}}, {}
)
)

View File

@ -1,34 +1,11 @@
import boto3
import botocore.credentials
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import json
import os
from datetime import datetime, timedelta, timezone
import sys
import traceback
import http.client
import math
import logging
import gzip
from io import BytesIO
import base64
import es
HOST = os.getenv("ES")
http_session = URLLib3Session()
from multiprocessing import Process
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
def predict(event, context):
@ -109,7 +86,7 @@ def predict(event, context):
# for single sonde allow longer predictions
payload['query']['bool']['filter'].pop(0)
logging.debug("Start ES Request")
results = es_request(payload, path, "GET")
results = es.request(json.dumps(payload), path, "GET")
logging.debug("Finished ES Request")
output = []
for sonde in results['aggregations']['2']['buckets']:
@ -144,46 +121,3 @@ def predict(event, context):
}
}
def es_request(payload, path, method):
# get aws creds
session = boto3.Session()
params = json.dumps(payload)
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(params.encode('utf-8'))
params = compressed.getvalue()
headers = {"Host": HOST, "Content-Type": "application/json",
"Content-Encoding": "gzip"}
request = AWSRequest(
method=method, url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(),
"es", "us-east-1").add_auth(request)
#p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
return json.loads(r.text)
if __name__ == "__main__":
# print(get_sondes({"queryStringParameters":{"lat":"-28.22717","lon":"153.82996","distance":"50000"}}, {}))
# mode: 6hours
# type: positions
# format: json
# max_positions: 0
# position_id: 0
# vehicles: RS_*;*chase
print(predict(
{"queryStringParameters": {
"vehicles": ""
}}, {}
))
# get list of sondes, serial, lat,lon, alt
# and current rate
# for each one, request http://predict.cusf.co.uk/api/v1/?launch_latitude=-37.8136&launch_longitude=144.9631&launch_datetime=2021-02-22T00:15:18.513413Z&launch_altitude=30000&ascent_rate=5&burst_altitude=30000.1&descent_rate=5
# have to set the burst alt slightly higher than the launch

View File

@ -0,0 +1,7 @@
from . import *
print(predict(
{"queryStringParameters": {
"vehicles": ""
}}, {}
))

View File

@ -1,37 +1,13 @@
from json.encoder import py_encode_basestring_ascii
import boto3
import botocore.credentials
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import json
import os
from datetime import datetime, timedelta, timezone
import sys, traceback
from datetime import datetime
import http.client
import math
import logging
import gzip
from io import BytesIO
from math import radians, degrees, sin, cos, atan2, sqrt, pi
HOST = os.getenv("ES")
http_session = URLLib3Session()
from multiprocessing import Process
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
import es
#
# FLIGHT PROFILE DEFAULTS
#
@ -408,7 +384,7 @@ def get_reverse_predictions():
}
}
logging.debug("Start ES Request")
results = es_request(json.dumps(payload), path, "POST")
results = es.request(json.dumps(payload), path, "POST")
logging.debug("Finished ES Request")
return { x['_source']['serial'] : x['_source'] for x in results['hits']['hits']}
@ -427,7 +403,7 @@ def get_launch_sites():
"size": 10000
}
logging.debug("Start ES Request")
results = es_request(json.dumps(payload), path, "POST")
results = es.request(json.dumps(payload), path, "POST")
logging.debug("Finished ES Request")
return {x['_source']['station']: x['_source'] for x in results['hits']['hits']}
@ -438,7 +414,7 @@ def bulk_upload_es(index_prefix,payloads):
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
body += "\n"
date_prefix = datetime.now().strftime("%Y-%m")
result = es_request(body, f"{index_prefix}-{date_prefix}/_doc/_bulk", "POST")
result = es.request(body, f"{index_prefix}-{date_prefix}/_doc/_bulk", "POST")
if 'errors' in result and result['errors'] == True:
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
@ -583,7 +559,7 @@ def predict(event, context):
"size": 0
}
logging.debug("Start ES Request")
results = es_request(json.dumps(payload), path, "GET")
results = es.request(json.dumps(payload), path, "GET")
logging.debug("Finished ES Request")
@ -821,69 +797,5 @@ def predict(event, context):
logging.debug("Finished")
return
def es_request(params, path, method):
# get aws creds
session = boto3.Session()
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(params.encode('utf-8'))
params = compressed.getvalue()
headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method=method, url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
#p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
if r.status_code != 200:
raise RuntimeError
return json.loads(r.text)
if __name__ == "__main__":
# Predictor test
# conn = http.client.HTTPSConnection("tawhiri.v2.sondehub.org")
# _now = datetime.utcnow().isoformat() + "Z"
# _ascent = get_standard_prediction(conn, _now, -34.0, 138.0, 10.0, burst_altitude=26000)
# print(f"Got {len(_ascent)} data points for ascent prediction.")
# _descent = get_standard_prediction(conn, _now, -34.0, 138.0, 24000.0, burst_altitude=24000.5)
# print(f"Got {len(_descent)} data points for descent prediction.")
# test = predict(
# {},{}
# )
#print(get_launch_sites())
#print(get_reverse_predictions())
# for _serial in test:
# print(f"{_serial['serial']}: {len(_serial['data'])}")
logging.basicConfig(
format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG
)
print(predict(
{},{}
))
# bulk_upload_es("reverse-prediction",[{
# "datetime" : "2021-10-04",
# "data" : { },
# "serial" : "R12341234",
# "station" : "-2",
# "subtype" : "RS41-SGM",
# "ascent_rate" : "5",
# "alt" : 1000,
# "position" : [
# 1,
# 2
# ],
# "type" : "RS41"
# }]
# )

View File

@ -0,0 +1,43 @@
from . import *
# Predictor test
# conn = http.client.HTTPSConnection("tawhiri.v2.sondehub.org")
# _now = datetime.utcnow().isoformat() + "Z"
# _ascent = get_standard_prediction(conn, _now, -34.0, 138.0, 10.0, burst_altitude=26000)
# print(f"Got {len(_ascent)} data points for ascent prediction.")
# _descent = get_standard_prediction(conn, _now, -34.0, 138.0, 24000.0, burst_altitude=24000.5)
# print(f"Got {len(_descent)} data points for descent prediction.")
# test = predict(
# {},{}
# )
#print(get_launch_sites())
#print(get_reverse_predictions())
# for _serial in test:
# print(f"{_serial['serial']}: {len(_serial['data'])}")
logging.basicConfig(
format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG
)
print(predict(
{},{}
))
# bulk_upload_es("reverse-prediction",[{
# "datetime" : "2021-10-04",
# "data" : { },
# "serial" : "R12341234",
# "station" : "-2",
# "subtype" : "RS41-SGM",
# "ascent_rate" : "5",
# "alt" : 1000,
# "position" : [
# 1,
# 2
# ],
# "type" : "RS41"
# }]
# )

View File

@ -1,35 +1,9 @@
import boto3
import botocore.credentials
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import json
import os
from datetime import datetime, timedelta, timezone
import sys, traceback
import re
import html
import base64
import gzip
from io import BytesIO
from multiprocessing import Process
http_session = URLLib3Session()
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
HOST = os.getenv("ES")
# get current sondes, filter by date, location
import es
def get_sondes(event, context):
path = "telm-*/_search"
@ -93,7 +67,7 @@ def get_sondes(event, context):
{"range": {"datetime": {"gte": "now-1d", "lte": "now+1m"}}}
)
results = es_request(payload, path, "POST")
results = es.request(json.dumps(payload), path, "POST")
buckets = results["aggregations"]["2"]["buckets"]
sondes = {
bucket["1"]["hits"]["hits"][0]["_source"]["serial"]: bucket["1"]["hits"][
@ -206,7 +180,7 @@ def get_telem(event, context):
}
}
)
results = es_request(payload, path, "POST")
results = es.request(json.dumps(payload), path, "POST")
output = {
sonde["key"]: {
data["key_as_string"]: dict(data["1"]["hits"]["hits"][0]["_source"],
@ -335,7 +309,7 @@ def get_listener_telemetry(event, context):
}
}
)
results = es_request(payload, path, "POST")
results = es.request(json.dumps(payload), path, "POST")
output = {
sonde["key"]: {
data["key_as_string"]: data["1"]["hits"]["hits"][0]["_source"]
@ -389,7 +363,7 @@ def get_sites(event, context):
}
}
)
results = es_request(payload, path, "POST")
results = es.request(json.dumps(payload), path, "POST")
output = {x['_source']['station']: x['_source'] for x in results['hits']['hits']}
compressed = BytesIO()
@ -410,89 +384,5 @@ def get_sites(event, context):
}
def es_request(payload, path, method):
# get aws creds
session = boto3.Session()
params = json.dumps(payload)
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(params.encode('utf-8'))
params = compressed.getvalue()
headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
#p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
return json.loads(r.text)
if __name__ == "__main__":
#print(get_sondes({"queryStringParameters":{"lat":"-32.7933","lon":"151.8358","distance":"5000", "last":"604800"}}, {}))
# mode: 6hours
# type: positions
# format: json
# max_positions: 0
# position_id: 0
# vehicles: RS_*;*chase
# print(
# datanew(
# {
# "queryStringParameters": {
# "mode": "single",
# "format": "json",
# "position_id": "S1443103-2021-07-20T12:46:19.040000Z"
# }
# },
# {},
# )
# )
# print(get_sites({},{}))
print(
get_telem(
{
"queryStringParameters": {
"duration": "1d",
"serial": "S4430086"
}},{}
)
)
# print (
# get_chase(
# {"queryStringParameters": {
# "duration": "1d"
# }
# },
# {}
# )
# )
# print(
# datanew(
# {
# "queryStringParameters": {
# "type": "positions",
# "mode": "3hours",
# "position_id": "0"
# }
# },
# {},
# )
# )
# print(
# get_telem(
# {
# "queryStringParameters":{
# # "serial": "S3210639",
# "duration": "3h",
# # "datetime": "2021-07-26T06:49:29.001000Z"
# }
# }, {}
# )
# )

66
lambda/query/__main__.py Normal file
View File

@ -0,0 +1,66 @@
from . import *
#print(get_sondes({"queryStringParameters":{"lat":"-32.7933","lon":"151.8358","distance":"5000", "last":"604800"}}, {}))
# mode: 6hours
# type: positions
# format: json
# max_positions: 0
# position_id: 0
# vehicles: RS_*;*chase
# print(
# datanew(
# {
# "queryStringParameters": {
# "mode": "single",
# "format": "json",
# "position_id": "S1443103-2021-07-20T12:46:19.040000Z"
# }
# },
# {},
# )
# )
# print(get_sites({},{}))
print(
get_telem(
{
"queryStringParameters": {
"duration": "1d",
# "serial": "S4430086"
}},{}
)
)
# print (
# get_chase(
# {"queryStringParameters": {
# "duration": "1d"
# }
# },
# {}
# )
# )
# print(
# datanew(
# {
# "queryStringParameters": {
# "type": "positions",
# "mode": "3hours",
# "position_id": "0"
# }
# },
# {},
# )
# )
# print(
# get_telem(
# {
# "queryStringParameters":{
# # "serial": "S3210639",
# "duration": "3h",
# # "datetime": "2021-07-26T06:49:29.001000Z"
# }
# }, {}
# )
# )

View File

@ -1,30 +1,8 @@
import json
import boto3
import botocore.credentials
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import zlib
import base64
import datetime
import os
import gzip
from io import BytesIO
import es
from multiprocessing import Process
http_session = URLLib3Session()
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
HOST = os.getenv("ES")
@ -35,27 +13,6 @@ def batch(iterable, n=1):
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]
def es_request(payload, path, method):
session = boto3.Session()
params = json.dumps(payload)
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(params.encode('utf-8'))
params = compressed.getvalue()
headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method=method, url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(),
"es", "us-east-1").add_auth(request)
#p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
return json.loads(r.text)
def handler(event, context):
query = {
@ -88,7 +45,7 @@ def handler(event, context):
}
}
results = es_request(query, "telm-*/_search", "POST")
results = es.request(json.dumps(query), "telm-*/_search", "POST")
serials = [ x['key'] for x in results['aggregations']['serials']['buckets'] ]
for serial_batch in batch(serials, 10):
sqs.send_message_batch(
@ -103,7 +60,6 @@ def handler(event, context):
return [ x['key'] for x in results['aggregations']['serials']['buckets'] ]
#TODO add to SQS queue
if __name__ == "__main__":
print(handler({}, {}))
# this script will find list of sondes seen in the last 48 hours and add them to the queue to be updated (including the first and last date they were seen)

View File

@ -0,0 +1,2 @@
from . import *
print(handler({}, {}))

View File

@ -1,57 +1,9 @@
from multiprocessing import Process
import json
import boto3
import botocore.credentials
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import zlib
import base64
from datetime import datetime, timedelta
import os
from io import BytesIO
import gzip
HOST = os.getenv("ES")
http_session = URLLib3Session()
# get aws creds
aws_session = boto3.Session()
def mirror(path, params):
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com",
"Content-Type": "application/json", "Content-Encoding": "gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(aws_session.get_credentials(),
"es", "us-east-1").add_auth(request)
session = URLLib3Session()
r = session.send(request.prepare())
def es_request(payload, path, method):
params = json.dumps(payload)
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(params.encode('utf-8'))
params = compressed.getvalue()
headers = {"Host": HOST, "Content-Type": "application/json",
"Content-Encoding": "gzip"}
request = AWSRequest(
method="POST", url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(aws_session.get_credentials(),
"es", "us-east-1").add_auth(request)
#p = Process(target=mirror, args=(path,params)).start()
r = http_session.send(request.prepare())
return json.loads(r.text)
import es
def getSonde(serial):
query = {
@ -90,7 +42,7 @@ def getSonde(serial):
}
}
}
results = es_request(query, "telm-*/_search", "POST")
results = es.request(json.dumps(query), "telm-*/_search", "POST")
return results["aggregations"]["1"]["hits"]["hits"]
@ -132,7 +84,7 @@ def getRecovered(serial):
}
}
}
results = es_request(query, "recovered*/_search", "POST")
results = es.request(json.dumps(query), "recovered*/_search", "POST")
return results["aggregations"]["1"]["hits"]["hits"]
@ -175,7 +127,7 @@ def put(event, context):
recovered['position'] = [recovered['lon'], recovered['lat']]
result = es_request(recovered, "recovered/_doc", "POST")
result = es.request(json.dumps(recovered), "recovered/_doc", "POST")
# add in elasticsearch extra position field
return {"statusCode": 200, "body": json.dumps({"message": "telm logged. Have a good day ^_^"})}
@ -282,7 +234,7 @@ def get(event, context):
}
if serials:
query["query"]["bool"]["minimum_should_match"] = 1
results = es_request(query, "recovered*/_search", "POST")
results = es.request(json.dumps(query), "recovered*/_search", "POST")
output = [x['1']['hits']['hits'][0]["_source"]
for x in results['aggregations']['2']['buckets']]
return {"statusCode": 200, "body": json.dumps(output)}
@ -398,7 +350,7 @@ def stats(event, context):
}
}
}
results = es_request(query, "recovered*/_search", "POST")
results = es.request(json.dumps(query), "recovered*/_search", "POST")
output = {
"total": 0,
@ -435,59 +387,3 @@ def stats(event, context):
return {"statusCode": 200, "body": json.dumps(output)}
if __name__ == "__main__":
payload = {
"version": "2.0",
"routeKey": "PUT /recovered",
"rawPath": "/recovered",
"rawQueryString": "",
"queryStringParameters": {
# "datetime": "2021-12-20T00:00",
"duration": 1000000
},
"headers": {
"accept": "*/*",
"accept-encoding": "deflate",
"content-encoding": "",
"content-length": "2135",
"content-type": "application/json",
"host": "api.v2.sondehub.org",
"user-agent": "autorx-1.4.1-beta4",
"x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a",
"x-forwarded-for": "103.107.130.22",
"x-forwarded-port": "443",
"x-forwarded-proto": "https",
"date": "Sun, 31 Jan 2021 00:21:45 GMT",
},
"requestContext": {
"accountId": "143841941773",
"apiId": "r03szwwq41",
"domainName": "api.v2.sondehub.org",
"domainPrefix": "api",
"http": {
"method": "PUT",
"path": "/sondes/telemetry",
"protocol": "HTTP/1.1",
"sourceIp": "103.107.130.22",
"userAgent": "autorx-1.4.1-beta4",
},
"requestId": "Z_NJvh0RoAMEJaw=",
"routeKey": "PUT /sondes/telemetry",
"stage": "$default",
"time": "31/Jan/2021:00:10:25 +0000",
"timeEpoch": 1612051825409,
},
"body": json.dumps({
"datetime": "2021-06-06T01:10:07.629Z",
"serial": "string",
"lat": 0,
"lon": 0,
"alt": 0,
"recovered": True,
"recovered_by": "string",
"description": "string"
}),
"isBase64Encoded": False,
}
# print(put(payload, {}))
print(stats(payload, {}))

View File

@ -0,0 +1,60 @@
from . import *
payload = {
"version": "2.0",
"routeKey": "PUT /recovered",
"rawPath": "/recovered",
"rawQueryString": "",
"queryStringParameters": {
# "datetime": "2021-12-20T00:00",
# "duration": 1000000
"lat": "-32.7933",
"lon": "151.835",
"distance": "30000000"
},
"headers": {
"accept": "*/*",
"accept-encoding": "deflate",
"content-encoding": "",
"content-length": "2135",
"content-type": "application/json",
"host": "api.v2.sondehub.org",
"user-agent": "autorx-1.4.1-beta4",
"x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a",
"x-forwarded-for": "103.107.130.22",
"x-forwarded-port": "443",
"x-forwarded-proto": "https",
"date": "Sun, 31 Jan 2021 00:21:45 GMT",
},
"requestContext": {
"accountId": "143841941773",
"apiId": "r03szwwq41",
"domainName": "api.v2.sondehub.org",
"domainPrefix": "api",
"http": {
"method": "PUT",
"path": "/sondes/telemetry",
"protocol": "HTTP/1.1",
"sourceIp": "103.107.130.22",
"userAgent": "autorx-1.4.1-beta4",
},
"requestId": "Z_NJvh0RoAMEJaw=",
"routeKey": "PUT /sondes/telemetry",
"stage": "$default",
"time": "31/Jan/2021:00:10:25 +0000",
"timeEpoch": 1612051825409,
},
"body": json.dumps({
"datetime": "2021-06-06T01:10:07.629Z",
"serial": "string",
"lat": 0,
"lon": 0,
"alt": 0,
"recovered": True,
"recovered_by": "string",
"description": "string"
}),
"isBase64Encoded": False,
}
# print(put(payload, {}))
print(get(payload, {}))

View File

@ -1,34 +1,10 @@
import boto3
import botocore.credentials
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import json
import os
from datetime import datetime, timedelta, timezone
import sys
import traceback
import http.client
import math
import logging
import gzip
from io import BytesIO
import base64
HOST = os.getenv("ES")
from multiprocessing import Process
http_session = URLLib3Session()
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
r = http_session.send(request.prepare())
import es
def predict(event, context):
path = "reverse-prediction-*/_search"
@ -145,7 +121,7 @@ def predict(event, context):
# for single sonde allow longer predictions
payload['query']['bool']['filter'].pop(0)
logging.debug("Start ES Request")
results = es_request(payload, path, "GET")
results = es.request(json.dumps(payload), path, "GET")
logging.debug("Finished ES Request")
output = {x['1']['hits']['hits'][0]['_source']['serial']: x['1']['hits']['hits'][0]['_source'] for x in results['aggregations']['2']['buckets']}
@ -166,47 +142,3 @@ def predict(event, context):
}
def es_request(payload, path, method):
# get aws creds
session = boto3.Session()
params = json.dumps(payload)
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(params.encode('utf-8'))
params = compressed.getvalue()
headers = {"Host": HOST, "Content-Type": "application/json",
"Content-Encoding": "gzip"}
request = AWSRequest(
method=method, url=f"https://{HOST}/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(),
"es", "us-east-1").add_auth(request)
#p = Process(target=mirror, args=(path,params)).start()
session = URLLib3Session()
r = session.send(request.prepare())
return json.loads(r.text)
if __name__ == "__main__":
# print(get_sondes({"queryStringParameters":{"lat":"-28.22717","lon":"153.82996","distance":"50000"}}, {}))
# mode: 6hours
# type: positions
# format: json
# max_positions: 0
# position_id: 0
# vehicles: RS_*;*chase
print(predict(
{"queryStringParameters": {
"vehicles": ""
}}, {}
))
# get list of sondes, serial, lat,lon, alt
# and current rate
# for each one, request http://predict.cusf.co.uk/api/v1/?launch_latitude=-37.8136&launch_longitude=144.9631&launch_datetime=2021-02-22T00:15:18.513413Z&launch_altitude=30000&ascent_rate=5&burst_altitude=30000.1&descent_rate=5
# have to set the burst alt slightly higher than the launch

View File

@ -0,0 +1,19 @@
from . import *
# print(get_sondes({"queryStringParameters":{"lat":"-28.22717","lon":"153.82996","distance":"50000"}}, {}))
# mode: 6hours
# type: positions
# format: json
# max_positions: 0
# position_id: 0
# vehicles: RS_*;*chase
print(predict(
{"queryStringParameters": {
"vehicles": ""
}}, {}
))
# get list of sondes, serial, lat,lon, alt
# and current rate
# for each one, request http://predict.cusf.co.uk/api/v1/?launch_latitude=-37.8136&launch_longitude=144.9631&launch_datetime=2021-02-22T00:15:18.513413Z&launch_altitude=30000&ascent_rate=5&burst_altitude=30000.1&descent_rate=5
# have to set the burst alt slightly higher than the launch

View File

@ -0,0 +1,2 @@
def lambda_handler(event, context):
return {"statusCode": 200, "body": "wss://ws-reader.v2.sondehub.org/"}

View File

@ -0,0 +1,2 @@
from . import *
print(lambda_handler({}, {}))

View File

@ -0,0 +1,72 @@
import sys
sys.path.append("vendor")
import json
import os
import paho.mqtt.client as mqtt
import time
client = mqtt.Client(transport="websockets")
connected_flag = False
def connect():
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
client.tls_set()
client.username_pw_set(username=os.getenv("MQTT_USERNAME"), password=os.getenv("MQTT_PASSWORD"))
client.connect(os.getenv("MQTT_HOST"), 443, 5)
client.loop_start()
print("loop started")
def on_disconnect(client, userdata, rc):
global connected_flag
print("disconnected")
connected_flag=False #set flag
def on_connect(client, userdata, flags, rc):
global connected_flag
if rc==0:
print("connected")
connected_flag=True #set flag
else:
print("Bad connection Returned code=",rc)
def on_publish(client, userdata, mid):
pass
connect()
def lambda_handler(event, context):
client.loop(timeout=0.05, max_packets=1) # make sure it reconnects
for record in event['Records']:
sns_message = record["Sns"]
if type(json.loads(sns_message["Message"])) == dict:
incoming_payloads = [json.loads(sns_message["Message"])]
else:
incoming_payloads = json.loads(sns_message["Message"])
#send only the first, last and every 5th packet
payloads = [incoming_payloads[0]] + incoming_payloads[1:-1:5][1:] + [incoming_payloads[-1]]
for payload in payloads:
body = json.dumps(payload)
serial = payload['serial']
while not connected_flag:
time.sleep(0.01) # wait until connected
client.publish(
topic=f'sondes/{serial}',
payload=body,
qos=0,
retain=False
)
client.publish(
topic=f'batch',
payload=json.dumps(payloads),
qos=0,
retain=False
)
time.sleep(0.05) # give paho mqtt 100ms to send messages this could be improved on but paho mqtt is a pain to interface with

View File

@ -1,76 +1,4 @@
import sys
sys.path.append("vendor/lib/python3.9/site-packages")
import json
import os
import paho.mqtt.client as mqtt
import time
client = mqtt.Client(transport="websockets")
connected_flag = False
def connect():
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
client.tls_set()
client.username_pw_set(username=os.getenv("MQTT_USERNAME"), password=os.getenv("MQTT_PASSWORD"))
client.connect(os.getenv("MQTT_HOST"), 443, 5)
client.loop_start()
print("loop started")
def on_disconnect(client, userdata, rc):
global connected_flag
print("disconnected")
connected_flag=False #set flag
def on_connect(client, userdata, flags, rc):
global connected_flag
if rc==0:
print("connected")
connected_flag=True #set flag
else:
print("Bad connection Returned code=",rc)
def on_publish(client, userdata, mid):
pass
connect()
def lambda_handler(event, context):
client.loop(timeout=0.05, max_packets=1) # make sure it reconnects
for record in event['Records']:
sns_message = record["Sns"]
if type(json.loads(sns_message["Message"])) == dict:
incoming_payloads = [json.loads(sns_message["Message"])]
else:
incoming_payloads = json.loads(sns_message["Message"])
#send only the first, last and every 5th packet
payloads = [incoming_payloads[0]] + incoming_payloads[1:-1:5][1:] + [incoming_payloads[-1]]
for payload in payloads:
body = json.dumps(payload)
serial = payload['serial']
while not connected_flag:
time.sleep(0.01) # wait until connected
client.publish(
topic=f'sondes/{serial}',
payload=body,
qos=0,
retain=False
)
client.publish(
topic=f'batch',
payload=json.dumps(payloads),
qos=0,
retain=False
)
time.sleep(0.05) # give paho mqtt 100ms to send messages this could be improved on but paho mqtt is a pain to interface with
from . import *
# test event
###########
if __name__ == "__main__":

View File

View File

@ -0,0 +1,5 @@
__version__ = "1.5.1"
class MQTTException(Exception):
pass

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,78 @@
class MQTTMatcher(object):
"""Intended to manage topic filters including wildcards.
Internally, MQTTMatcher use a prefix tree (trie) to store
values associated with filters, and has an iter_match()
method to iterate efficiently over all filters that match
some topic name."""
class Node(object):
__slots__ = '_children', '_content'
def __init__(self):
self._children = {}
self._content = None
def __init__(self):
self._root = self.Node()
def __setitem__(self, key, value):
"""Add a topic filter :key to the prefix tree
and associate it to :value"""
node = self._root
for sym in key.split('/'):
node = node._children.setdefault(sym, self.Node())
node._content = value
def __getitem__(self, key):
"""Retrieve the value associated with some topic filter :key"""
try:
node = self._root
for sym in key.split('/'):
node = node._children[sym]
if node._content is None:
raise KeyError(key)
return node._content
except KeyError:
raise KeyError(key)
def __delitem__(self, key):
"""Delete the value associated with some topic filter :key"""
lst = []
try:
parent, node = None, self._root
for k in key.split('/'):
parent, node = node, node._children[k]
lst.append((parent, k, node))
# TODO
node._content = None
except KeyError:
raise KeyError(key)
else: # cleanup
for parent, k, node in reversed(lst):
if node._children or node._content is not None:
break
del parent._children[k]
def iter_match(self, topic):
"""Return an iterator on all values associated with filters
that match the :topic"""
lst = topic.split('/')
normal = not topic.startswith('$')
def rec(node, i=0):
if i == len(lst):
if node._content is not None:
yield node._content
else:
part = lst[i]
if part in node._children:
for content in rec(node._children[part], i + 1):
yield content
if '+' in node._children and (normal or i > 0):
for content in rec(node._children['+'], i + 1):
yield content
if '#' in node._children and (normal or i > 0):
content = node._children['#']._content
if content is not None:
yield content
return rec(self._root)

View File

@ -0,0 +1,43 @@
"""
*******************************************************************
Copyright (c) 2017, 2019 IBM Corp.
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Ian Craggs - initial implementation and/or documentation
*******************************************************************
"""
class PacketTypes:
"""
Packet types class. Includes the AUTH packet for MQTT v5.0.
Holds constants for each packet type such as PacketTypes.PUBLISH
and packet name strings: PacketTypes.Names[PacketTypes.PUBLISH].
"""
indexes = range(1, 16)
# Packet types
CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, \
PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, \
PINGREQ, PINGRESP, DISCONNECT, AUTH = indexes
# Dummy packet type for properties use - will delay only applies to will
WILLMESSAGE = 99
Names = [ "reserved", \
"Connect", "Connack", "Publish", "Puback", "Pubrec", "Pubrel", \
"Pubcomp", "Subscribe", "Suback", "Unsubscribe", "Unsuback", \
"Pingreq", "Pingresp", "Disconnect", "Auth"]

View File

@ -0,0 +1,409 @@
"""
*******************************************************************
Copyright (c) 2017, 2019 IBM Corp.
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Ian Craggs - initial implementation and/or documentation
*******************************************************************
"""
import sys, struct
from .packettypes import PacketTypes
class MQTTException(Exception):
pass
class MalformedPacket(MQTTException):
pass
def writeInt16(length):
# serialize a 16 bit integer to network format
return bytearray(struct.pack("!H", length))
def readInt16(buf):
# deserialize a 16 bit integer from network format
return struct.unpack("!H", buf[:2])[0]
def writeInt32(length):
# serialize a 32 bit integer to network format
return bytearray(struct.pack("!L", length))
def readInt32(buf):
# deserialize a 32 bit integer from network format
return struct.unpack("!L", buf[:4])[0]
def writeUTF(data):
# data could be a string, or bytes. If string, encode into bytes with utf-8
if sys.version_info[0] < 3:
data = bytearray(data, 'utf-8')
else:
data = data if type(data) == type(b"") else bytes(data, "utf-8")
return writeInt16(len(data)) + data
def readUTF(buffer, maxlen):
if maxlen >= 2:
length = readInt16(buffer)
else:
raise MalformedPacket("Not enough data to read string length")
maxlen -= 2
if length > maxlen:
raise MalformedPacket("Length delimited string too long")
buf = buffer[2:2+length].decode("utf-8")
# look for chars which are invalid for MQTT
for c in buf: # look for D800-DFFF in the UTF string
ord_c = ord(c)
if ord_c >= 0xD800 and ord_c <= 0xDFFF:
raise MalformedPacket("[MQTT-1.5.4-1] D800-DFFF found in UTF-8 data")
if ord_c == 0x00: # look for null in the UTF string
raise MalformedPacket("[MQTT-1.5.4-2] Null found in UTF-8 data")
if ord_c == 0xFEFF:
raise MalformedPacket("[MQTT-1.5.4-3] U+FEFF in UTF-8 data")
return buf, length+2
def writeBytes(buffer):
return writeInt16(len(buffer)) + buffer
def readBytes(buffer):
length = readInt16(buffer)
return buffer[2:2+length], length+2
class VariableByteIntegers: # Variable Byte Integer
"""
MQTT variable byte integer helper class. Used
in several places in MQTT v5.0 properties.
"""
@staticmethod
def encode(x):
"""
Convert an integer 0 <= x <= 268435455 into multi-byte format.
Returns the buffer convered from the integer.
"""
assert 0 <= x <= 268435455
buffer = b''
while 1:
digit = x % 128
x //= 128
if x > 0:
digit |= 0x80
if sys.version_info[0] >= 3:
buffer += bytes([digit])
else:
buffer += bytes(chr(digit))
if x == 0:
break
return buffer
@staticmethod
def decode(buffer):
"""
Get the value of a multi-byte integer from a buffer
Return the value, and the number of bytes used.
[MQTT-1.5.5-1] the encoded value MUST use the minimum number of bytes necessary to represent the value
"""
multiplier = 1
value = 0
bytes = 0
while 1:
bytes += 1
digit = buffer[0]
buffer = buffer[1:]
value += (digit & 127) * multiplier
if digit & 128 == 0:
break
multiplier *= 128
return (value, bytes)
class Properties(object):
"""MQTT v5.0 properties class.
See Properties.names for a list of accepted property names along with their numeric values.
See Properties.properties for the data type of each property.
Example of use:
publish_properties = Properties(PacketTypes.PUBLISH)
publish_properties.UserProperty = ("a", "2")
publish_properties.UserProperty = ("c", "3")
First the object is created with packet type as argument, no properties will be present at
this point. Then properties are added as attributes, the name of which is the string property
name without the spaces.
"""
def __init__(self, packetType):
self.packetType = packetType
self.types = ["Byte", "Two Byte Integer", "Four Byte Integer", "Variable Byte Integer",
"Binary Data", "UTF-8 Encoded String", "UTF-8 String Pair"]
self.names = {
"Payload Format Indicator": 1,
"Message Expiry Interval": 2,
"Content Type": 3,
"Response Topic": 8,
"Correlation Data": 9,
"Subscription Identifier": 11,
"Session Expiry Interval": 17,
"Assigned Client Identifier": 18,
"Server Keep Alive": 19,
"Authentication Method": 21,
"Authentication Data": 22,
"Request Problem Information": 23,
"Will Delay Interval": 24,
"Request Response Information": 25,
"Response Information": 26,
"Server Reference": 28,
"Reason String": 31,
"Receive Maximum": 33,
"Topic Alias Maximum": 34,
"Topic Alias": 35,
"Maximum QoS": 36,
"Retain Available": 37,
"User Property": 38,
"Maximum Packet Size": 39,
"Wildcard Subscription Available": 40,
"Subscription Identifier Available": 41,
"Shared Subscription Available": 42
}
self.properties = {
# id: type, packets
# payload format indicator
1: (self.types.index("Byte"), [PacketTypes.PUBLISH, PacketTypes.WILLMESSAGE]),
2: (self.types.index("Four Byte Integer"), [PacketTypes.PUBLISH, PacketTypes.WILLMESSAGE]),
3: (self.types.index("UTF-8 Encoded String"), [PacketTypes.PUBLISH, PacketTypes.WILLMESSAGE]),
8: (self.types.index("UTF-8 Encoded String"), [PacketTypes.PUBLISH, PacketTypes.WILLMESSAGE]),
9: (self.types.index("Binary Data"), [PacketTypes.PUBLISH, PacketTypes.WILLMESSAGE]),
11: (self.types.index("Variable Byte Integer"),
[PacketTypes.PUBLISH, PacketTypes.SUBSCRIBE]),
17: (self.types.index("Four Byte Integer"),
[PacketTypes.CONNECT, PacketTypes.CONNACK, PacketTypes.DISCONNECT]),
18: (self.types.index("UTF-8 Encoded String"), [PacketTypes.CONNACK]),
19: (self.types.index("Two Byte Integer"), [PacketTypes.CONNACK]),
21: (self.types.index("UTF-8 Encoded String"),
[PacketTypes.CONNECT, PacketTypes.CONNACK, PacketTypes.AUTH]),
22: (self.types.index("Binary Data"),
[PacketTypes.CONNECT, PacketTypes.CONNACK, PacketTypes.AUTH]),
23: (self.types.index("Byte"),
[PacketTypes.CONNECT]),
24: (self.types.index("Four Byte Integer"), [PacketTypes.WILLMESSAGE]),
25: (self.types.index("Byte"), [PacketTypes.CONNECT]),
26: (self.types.index("UTF-8 Encoded String"), [PacketTypes.CONNACK]),
28: (self.types.index("UTF-8 Encoded String"),
[PacketTypes.CONNACK, PacketTypes.DISCONNECT]),
31: (self.types.index("UTF-8 Encoded String"),
[PacketTypes.CONNACK, PacketTypes.PUBACK, PacketTypes.PUBREC,
PacketTypes.PUBREL, PacketTypes.PUBCOMP, PacketTypes.SUBACK,
PacketTypes.UNSUBACK, PacketTypes.DISCONNECT, PacketTypes.AUTH]),
33: (self.types.index("Two Byte Integer"),
[PacketTypes.CONNECT, PacketTypes.CONNACK]),
34: (self.types.index("Two Byte Integer"),
[PacketTypes.CONNECT, PacketTypes.CONNACK]),
35: (self.types.index("Two Byte Integer"), [PacketTypes.PUBLISH]),
36: (self.types.index("Byte"), [PacketTypes.CONNACK]),
37: (self.types.index("Byte"), [PacketTypes.CONNACK]),
38: (self.types.index("UTF-8 String Pair"),
[PacketTypes.CONNECT, PacketTypes.CONNACK,
PacketTypes.PUBLISH, PacketTypes.PUBACK,
PacketTypes.PUBREC, PacketTypes.PUBREL, PacketTypes.PUBCOMP,
PacketTypes.SUBSCRIBE, PacketTypes.SUBACK,
PacketTypes.UNSUBSCRIBE, PacketTypes.UNSUBACK,
PacketTypes.DISCONNECT, PacketTypes.AUTH, PacketTypes.WILLMESSAGE]),
39: (self.types.index("Four Byte Integer"),
[PacketTypes.CONNECT, PacketTypes.CONNACK]),
40: (self.types.index("Byte"), [PacketTypes.CONNACK]),
41: (self.types.index("Byte"), [PacketTypes.CONNACK]),
42: (self.types.index("Byte"), [PacketTypes.CONNACK]),
}
def allowsMultiple(self, compressedName):
return self.getIdentFromName(compressedName) in [11, 38]
def getIdentFromName(self, compressedName):
# return the identifier corresponding to the property name
result = -1
for name in self.names.keys():
if compressedName == name.replace(' ', ''):
result = self.names[name]
break
return result
def __setattr__(self, name, value):
name = name.replace(' ', '')
privateVars = ["packetType", "types", "names", "properties"]
if name in privateVars:
object.__setattr__(self, name, value)
else:
# the name could have spaces in, or not. Remove spaces before assignment
if name not in [aname.replace(' ', '') for aname in self.names.keys()]:
raise MQTTException(
"Property name must be one of "+str(self.names.keys()))
# check that this attribute applies to the packet type
if self.packetType not in self.properties[self.getIdentFromName(name)][1]:
raise MQTTException("Property %s does not apply to packet type %s"
% (name, PacketTypes.Names[self.packetType]))
if self.allowsMultiple(name):
if type(value) != type([]):
value = [value]
if hasattr(self, name):
value = object.__getattribute__(self, name) + value
object.__setattr__(self, name, value)
def __str__(self):
buffer = "["
first = True
for name in self.names.keys():
compressedName = name.replace(' ', '')
if hasattr(self, compressedName):
if not first:
buffer += ", "
buffer += compressedName + " : " + \
str(getattr(self, compressedName))
first = False
buffer += "]"
return buffer
def json(self):
data = {}
for name in self.names.keys():
compressedName = name.replace(' ', '')
if hasattr(self, compressedName):
data[compressedName] = getattr(self, compressedName)
return data
def isEmpty(self):
rc = True
for name in self.names.keys():
compressedName = name.replace(' ', '')
if hasattr(self, compressedName):
rc = False
break
return rc
def clear(self):
for name in self.names.keys():
compressedName = name.replace(' ', '')
if hasattr(self, compressedName):
delattr(self, compressedName)
def writeProperty(self, identifier, type, value):
buffer = b""
buffer += VariableByteIntegers.encode(identifier) # identifier
if type == self.types.index("Byte"): # value
if sys.version_info[0] < 3:
buffer += chr(value)
else:
buffer += bytes([value])
elif type == self.types.index("Two Byte Integer"):
buffer += writeInt16(value)
elif type == self.types.index("Four Byte Integer"):
buffer += writeInt32(value)
elif type == self.types.index("Variable Byte Integer"):
buffer += VariableByteIntegers.encode(value)
elif type == self.types.index("Binary Data"):
buffer += writeBytes(value)
elif type == self.types.index("UTF-8 Encoded String"):
buffer += writeUTF(value)
elif type == self.types.index("UTF-8 String Pair"):
buffer += writeUTF(value[0]) + writeUTF(value[1])
return buffer
def pack(self):
# serialize properties into buffer for sending over network
buffer = b""
for name in self.names.keys():
compressedName = name.replace(' ', '')
if hasattr(self, compressedName):
identifier = self.getIdentFromName(compressedName)
attr_type = self.properties[identifier][0]
if self.allowsMultiple(compressedName):
for prop in getattr(self, compressedName):
buffer += self.writeProperty(identifier,
attr_type, prop)
else:
buffer += self.writeProperty(identifier, attr_type,
getattr(self, compressedName))
return VariableByteIntegers.encode(len(buffer)) + buffer
def readProperty(self, buffer, type, propslen):
if type == self.types.index("Byte"):
value = buffer[0]
valuelen = 1
elif type == self.types.index("Two Byte Integer"):
value = readInt16(buffer)
valuelen = 2
elif type == self.types.index("Four Byte Integer"):
value = readInt32(buffer)
valuelen = 4
elif type == self.types.index("Variable Byte Integer"):
value, valuelen = VariableByteIntegers.decode(buffer)
elif type == self.types.index("Binary Data"):
value, valuelen = readBytes(buffer)
elif type == self.types.index("UTF-8 Encoded String"):
value, valuelen = readUTF(buffer, propslen)
elif type == self.types.index("UTF-8 String Pair"):
value, valuelen = readUTF(buffer, propslen)
buffer = buffer[valuelen:] # strip the bytes used by the value
value1, valuelen1 = readUTF(buffer, propslen - valuelen)
value = (value, value1)
valuelen += valuelen1
return value, valuelen
def getNameFromIdent(self, identifier):
rc = None
for name in self.names:
if self.names[name] == identifier:
rc = name
return rc
def unpack(self, buffer):
if sys.version_info[0] < 3:
buffer = bytearray(buffer)
self.clear()
# deserialize properties into attributes from buffer received from network
propslen, VBIlen = VariableByteIntegers.decode(buffer)
buffer = buffer[VBIlen:] # strip the bytes used by the VBI
propslenleft = propslen
while propslenleft > 0: # properties length is 0 if there are none
identifier, VBIlen = VariableByteIntegers.decode(
buffer) # property identifier
buffer = buffer[VBIlen:] # strip the bytes used by the VBI
propslenleft -= VBIlen
attr_type = self.properties[identifier][0]
value, valuelen = self.readProperty(
buffer, attr_type, propslenleft)
buffer = buffer[valuelen:] # strip the bytes used by the value
propslenleft -= valuelen
propname = self.getNameFromIdent(identifier)
compressedName = propname.replace(' ', '')
if not self.allowsMultiple(compressedName) and hasattr(self, compressedName):
raise MQTTException(
"Property '%s' must not exist more than once" % property)
setattr(self, propname, value)
return self, propslen + VBIlen

View File

@ -0,0 +1,232 @@
# Copyright (c) 2014 Roger Light <roger@atchoo.org>
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# and Eclipse Distribution License v1.0 which accompany this distribution.
#
# The Eclipse Public License is available at
# http://www.eclipse.org/legal/epl-v10.html
# and the Eclipse Distribution License is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# Contributors:
# Roger Light - initial API and implementation
"""
This module provides some helper functions to allow straightforward publishing
of messages in a one-shot manner. In other words, they are useful for the
situation where you have a single/multiple messages you want to publish to a
broker, then disconnect and nothing else is required.
"""
from __future__ import absolute_import
import collections
try:
from collections.abc import Iterable
except ImportError:
from collections import Iterable
from . import client as paho
from .. import mqtt
def _do_publish(client):
"""Internal function"""
message = client._userdata.popleft()
if isinstance(message, dict):
client.publish(**message)
elif isinstance(message, (tuple, list)):
client.publish(*message)
else:
raise TypeError('message must be a dict, tuple, or list')
def _on_connect(client, userdata, flags, rc):
"""Internal callback"""
#pylint: disable=invalid-name, unused-argument
if rc == 0:
if len(userdata) > 0:
_do_publish(client)
else:
raise mqtt.MQTTException(paho.connack_string(rc))
def _on_publish(client, userdata, mid):
"""Internal callback"""
#pylint: disable=unused-argument
if len(userdata) == 0:
client.disconnect()
else:
_do_publish(client)
def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
will=None, auth=None, tls=None, protocol=paho.MQTTv311,
transport="tcp", proxy_args=None):
"""Publish multiple messages to a broker, then disconnect cleanly.
This function creates an MQTT client, connects to a broker and publishes a
list of messages. Once the messages have been delivered, it disconnects
cleanly from the broker.
msgs : a list of messages to publish. Each message is either a dict or a
tuple.
If a dict, only the topic must be present. Default values will be
used for any missing arguments. The dict must be of the form:
msg = {'topic':"<topic>", 'payload':"<payload>", 'qos':<qos>,
'retain':<retain>}
topic must be present and may not be empty.
If payload is "", None or not present then a zero length payload
will be published.
If qos is not present, the default of 0 is used.
If retain is not present, the default of False is used.
If a tuple, then it must be of the form:
("<topic>", "<payload>", qos, retain)
hostname : a string containing the address of the broker to connect to.
Defaults to localhost.
port : the port to connect to the broker on. Defaults to 1883.
client_id : the MQTT client id to use. If "" or None, the Paho library will
generate a client id automatically.
keepalive : the keepalive timeout value for the client. Defaults to 60
seconds.
will : a dict containing will parameters for the client: will = {'topic':
"<topic>", 'payload':"<payload">, 'qos':<qos>, 'retain':<retain>}.
Topic is required, all other parameters are optional and will
default to None, 0 and False respectively.
Defaults to None, which indicates no will should be used.
auth : a dict containing authentication parameters for the client:
auth = {'username':"<username>", 'password':"<password>"}
Username is required, password is optional and will default to None
if not provided.
Defaults to None, which indicates no authentication is to be used.
tls : a dict containing TLS configuration parameters for the client:
dict = {'ca_certs':"<ca_certs>", 'certfile':"<certfile>",
'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
'ciphers':"<ciphers">, 'insecure':"<bool>"}
ca_certs is required, all other parameters are optional and will
default to None if not provided, which results in the client using
the default behaviour - see the paho.mqtt.client documentation.
Alternatively, tls input can be an SSLContext object, which will be
processed using the tls_set_context method.
Defaults to None, which indicates that TLS should not be used.
transport : set to "tcp" to use the default setting of transport which is
raw TCP. Set to "websockets" to use WebSockets as the transport.
proxy_args: a dictionary that will be given to the client.
"""
if not isinstance(msgs, Iterable):
raise TypeError('msgs must be an iterable')
client = paho.Client(client_id=client_id, userdata=collections.deque(msgs),
protocol=protocol, transport=transport)
client.on_publish = _on_publish
client.on_connect = _on_connect
if proxy_args is not None:
client.proxy_set(**proxy_args)
if auth:
username = auth.get('username')
if username:
password = auth.get('password')
client.username_pw_set(username, password)
else:
raise KeyError("The 'username' key was not found, this is "
"required for auth")
if will is not None:
client.will_set(**will)
if tls is not None:
if isinstance(tls, dict):
insecure = tls.pop('insecure', False)
client.tls_set(**tls)
if insecure:
# Must be set *after* the `client.tls_set()` call since it sets
# up the SSL context that `client.tls_insecure_set` alters.
client.tls_insecure_set(insecure)
else:
# Assume input is SSLContext object
client.tls_set_context(tls)
client.connect(hostname, port, keepalive)
client.loop_forever()
def single(topic, payload=None, qos=0, retain=False, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None,
tls=None, protocol=paho.MQTTv311, transport="tcp", proxy_args=None):
"""Publish a single message to a broker, then disconnect cleanly.
This function creates an MQTT client, connects to a broker and publishes a
single message. Once the message has been delivered, it disconnects cleanly
from the broker.
topic : the only required argument must be the topic string to which the
payload will be published.
payload : the payload to be published. If "" or None, a zero length payload
will be published.
qos : the qos to use when publishing, default to 0.
retain : set the message to be retained (True) or not (False).
hostname : a string containing the address of the broker to connect to.
Defaults to localhost.
port : the port to connect to the broker on. Defaults to 1883.
client_id : the MQTT client id to use. If "" or None, the Paho library will
generate a client id automatically.
keepalive : the keepalive timeout value for the client. Defaults to 60
seconds.
will : a dict containing will parameters for the client: will = {'topic':
"<topic>", 'payload':"<payload">, 'qos':<qos>, 'retain':<retain>}.
Topic is required, all other parameters are optional and will
default to None, 0 and False respectively.
Defaults to None, which indicates no will should be used.
auth : a dict containing authentication parameters for the client:
auth = {'username':"<username>", 'password':"<password>"}
Username is required, password is optional and will default to None
if not provided.
Defaults to None, which indicates no authentication is to be used.
tls : a dict containing TLS configuration parameters for the client:
dict = {'ca_certs':"<ca_certs>", 'certfile':"<certfile>",
'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
'ciphers':"<ciphers">, 'insecure':"<bool>"}
ca_certs is required, all other parameters are optional and will
default to None if not provided, which results in the client using
the default behaviour - see the paho.mqtt.client documentation.
Defaults to None, which indicates that TLS should not be used.
Alternatively, tls input can be an SSLContext object, which will be
processed using the tls_set_context method.
transport : set to "tcp" to use the default setting of transport which is
raw TCP. Set to "websockets" to use WebSockets as the transport.
proxy_args: a dictionary that will be given to the client.
"""
msg = {'topic':topic, 'payload':payload, 'qos':qos, 'retain':retain}
multiple([msg], hostname, port, client_id, keepalive, will, auth, tls,
protocol, transport, proxy_args)

View File

@ -0,0 +1,191 @@
"""
*******************************************************************
Copyright (c) 2017, 2019 IBM Corp.
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Ian Craggs - initial implementation and/or documentation
*******************************************************************
"""
import sys
from .packettypes import PacketTypes
class ReasonCodes:
"""MQTT version 5.0 reason codes class.
See ReasonCodes.names for a list of possible numeric values along with their
names and the packets to which they apply.
"""
def __init__(self, packetType, aName="Success", identifier=-1):
"""
packetType: the type of the packet, such as PacketTypes.CONNECT that
this reason code will be used with. Some reason codes have different
names for the same identifier when used a different packet type.
aName: the String name of the reason code to be created. Ignored
if the identifier is set.
identifier: an integer value of the reason code to be created.
"""
self.packetType = packetType
self.names = {
0: {"Success": [PacketTypes.CONNACK, PacketTypes.PUBACK,
PacketTypes.PUBREC, PacketTypes.PUBREL, PacketTypes.PUBCOMP,
PacketTypes.UNSUBACK, PacketTypes.AUTH],
"Normal disconnection": [PacketTypes.DISCONNECT],
"Granted QoS 0": [PacketTypes.SUBACK]},
1: {"Granted QoS 1": [PacketTypes.SUBACK]},
2: {"Granted QoS 2": [PacketTypes.SUBACK]},
4: {"Disconnect with will message": [PacketTypes.DISCONNECT]},
16: {"No matching subscribers":
[PacketTypes.PUBACK, PacketTypes.PUBREC]},
17: {"No subscription found": [PacketTypes.UNSUBACK]},
24: {"Continue authentication": [PacketTypes.AUTH]},
25: {"Re-authenticate": [PacketTypes.AUTH]},
128: {"Unspecified error": [PacketTypes.CONNACK, PacketTypes.PUBACK,
PacketTypes.PUBREC, PacketTypes.SUBACK, PacketTypes.UNSUBACK,
PacketTypes.DISCONNECT], },
129: {"Malformed packet":
[PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
130: {"Protocol error":
[PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
131: {"Implementation specific error": [PacketTypes.CONNACK,
PacketTypes.PUBACK, PacketTypes.PUBREC, PacketTypes.SUBACK,
PacketTypes.UNSUBACK, PacketTypes.DISCONNECT], },
132: {"Unsupported protocol version": [PacketTypes.CONNACK]},
133: {"Client identifier not valid": [PacketTypes.CONNACK]},
134: {"Bad user name or password": [PacketTypes.CONNACK]},
135: {"Not authorized": [PacketTypes.CONNACK, PacketTypes.PUBACK,
PacketTypes.PUBREC, PacketTypes.SUBACK, PacketTypes.UNSUBACK,
PacketTypes.DISCONNECT], },
136: {"Server unavailable": [PacketTypes.CONNACK]},
137: {"Server busy": [PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
138: {"Banned": [PacketTypes.CONNACK]},
139: {"Server shutting down": [PacketTypes.DISCONNECT]},
140: {"Bad authentication method":
[PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
141: {"Keep alive timeout": [PacketTypes.DISCONNECT]},
142: {"Session taken over": [PacketTypes.DISCONNECT]},
143: {"Topic filter invalid":
[PacketTypes.SUBACK, PacketTypes.UNSUBACK, PacketTypes.DISCONNECT]},
144: {"Topic name invalid":
[PacketTypes.CONNACK, PacketTypes.PUBACK,
PacketTypes.PUBREC, PacketTypes.DISCONNECT]},
145: {"Packet identifier in use":
[PacketTypes.PUBACK, PacketTypes.PUBREC,
PacketTypes.SUBACK, PacketTypes.UNSUBACK]},
146: {"Packet identifier not found":
[PacketTypes.PUBREL, PacketTypes.PUBCOMP]},
147: {"Receive maximum exceeded": [PacketTypes.DISCONNECT]},
148: {"Topic alias invalid": [PacketTypes.DISCONNECT]},
149: {"Packet too large": [PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
150: {"Message rate too high": [PacketTypes.DISCONNECT]},
151: {"Quota exceeded": [PacketTypes.CONNACK, PacketTypes.PUBACK,
PacketTypes.PUBREC, PacketTypes.SUBACK, PacketTypes.DISCONNECT], },
152: {"Administrative action": [PacketTypes.DISCONNECT]},
153: {"Payload format invalid":
[PacketTypes.PUBACK, PacketTypes.PUBREC, PacketTypes.DISCONNECT]},
154: {"Retain not supported":
[PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
155: {"QoS not supported":
[PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
156: {"Use another server":
[PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
157: {"Server moved":
[PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
158: {"Shared subscription not supported":
[PacketTypes.SUBACK, PacketTypes.DISCONNECT]},
159: {"Connection rate exceeded":
[PacketTypes.CONNACK, PacketTypes.DISCONNECT]},
160: {"Maximum connect time":
[PacketTypes.DISCONNECT]},
161: {"Subscription identifiers not supported":
[PacketTypes.SUBACK, PacketTypes.DISCONNECT]},
162: {"Wildcard subscription not supported":
[PacketTypes.SUBACK, PacketTypes.DISCONNECT]},
}
if identifier == -1:
if packetType == PacketTypes.DISCONNECT and aName == "Success":
aName = "Normal disconnection"
self.set(aName)
else:
self.value = identifier
self.getName() # check it's good
def __getName__(self, packetType, identifier):
"""
Get the reason code string name for a specific identifier.
The name can vary by packet type for the same identifier, which
is why the packet type is also required.
Used when displaying the reason code.
"""
assert identifier in self.names.keys(), identifier
names = self.names[identifier]
namelist = [name for name in names.keys() if packetType in names[name]]
assert len(namelist) == 1
return namelist[0]
def getId(self, name):
"""
Get the numeric id corresponding to a reason code name.
Used when setting the reason code for a packetType
check that only valid codes for the packet are set.
"""
identifier = None
for code in self.names.keys():
if name in self.names[code].keys():
if self.packetType in self.names[code][name]:
identifier = code
break
assert identifier != None, name
return identifier
def set(self, name):
self.value = self.getId(name)
def unpack(self, buffer):
c = buffer[0]
if sys.version_info[0] < 3:
c = ord(c)
name = self.__getName__(self.packetType, c)
self.value = self.getId(name)
return 1
def getName(self):
"""Returns the reason code name corresponding to the numeric value which is set.
"""
return self.__getName__(self.packetType, self.value)
def __eq__(self, other):
if isinstance(other, int):
return self.value == other
if isinstance(other, str):
return self.value == str(self)
if isinstance(other, ReasonCodes):
return self.value == other.value
return False
def __str__(self):
return self.getName()
def json(self):
return self.getName()
def pack(self):
return bytearray([self.value])

View File

@ -0,0 +1,266 @@
# Copyright (c) 2016 Roger Light <roger@atchoo.org>
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# and Eclipse Distribution License v1.0 which accompany this distribution.
#
# The Eclipse Public License is available at
# http://www.eclipse.org/legal/epl-v10.html
# and the Eclipse Distribution License is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# Contributors:
# Roger Light - initial API and implementation
"""
This module provides some helper functions to allow straightforward subscribing
to topics and retrieving messages. The two functions are simple(), which
returns one or messages matching a set of topics, and callback() which allows
you to pass a callback for processing of messages.
"""
from __future__ import absolute_import
from . import client as paho
from .. import mqtt
def _on_connect(client, userdata, flags, rc):
"""Internal callback"""
if rc != 0:
raise mqtt.MQTTException(paho.connack_string(rc))
if isinstance(userdata['topics'], list):
for topic in userdata['topics']:
client.subscribe(topic, userdata['qos'])
else:
client.subscribe(userdata['topics'], userdata['qos'])
def _on_message_callback(client, userdata, message):
"""Internal callback"""
userdata['callback'](client, userdata['userdata'], message)
def _on_message_simple(client, userdata, message):
"""Internal callback"""
if userdata['msg_count'] == 0:
return
# Don't process stale retained messages if 'retained' was false
if message.retain and not userdata['retained']:
return
userdata['msg_count'] = userdata['msg_count'] - 1
if userdata['messages'] is None and userdata['msg_count'] == 0:
userdata['messages'] = message
client.disconnect()
return
userdata['messages'].append(message)
if userdata['msg_count'] == 0:
client.disconnect()
def callback(callback, topics, qos=0, userdata=None, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None,
tls=None, protocol=paho.MQTTv311, transport="tcp",
clean_session=True, proxy_args=None):
"""Subscribe to a list of topics and process them in a callback function.
This function creates an MQTT client, connects to a broker and subscribes
to a list of topics. Incoming messages are processed by the user provided
callback. This is a blocking function and will never return.
callback : function of the form "on_message(client, userdata, message)" for
processing the messages received.
topics : either a string containing a single topic to subscribe to, or a
list of topics to subscribe to.
qos : the qos to use when subscribing. This is applied to all topics.
userdata : passed to the callback
hostname : a string containing the address of the broker to connect to.
Defaults to localhost.
port : the port to connect to the broker on. Defaults to 1883.
client_id : the MQTT client id to use. If "" or None, the Paho library will
generate a client id automatically.
keepalive : the keepalive timeout value for the client. Defaults to 60
seconds.
will : a dict containing will parameters for the client: will = {'topic':
"<topic>", 'payload':"<payload">, 'qos':<qos>, 'retain':<retain>}.
Topic is required, all other parameters are optional and will
default to None, 0 and False respectively.
Defaults to None, which indicates no will should be used.
auth : a dict containing authentication parameters for the client:
auth = {'username':"<username>", 'password':"<password>"}
Username is required, password is optional and will default to None
if not provided.
Defaults to None, which indicates no authentication is to be used.
tls : a dict containing TLS configuration parameters for the client:
dict = {'ca_certs':"<ca_certs>", 'certfile':"<certfile>",
'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
'ciphers':"<ciphers">, 'insecure':"<bool>"}
ca_certs is required, all other parameters are optional and will
default to None if not provided, which results in the client using
the default behaviour - see the paho.mqtt.client documentation.
Alternatively, tls input can be an SSLContext object, which will be
processed using the tls_set_context method.
Defaults to None, which indicates that TLS should not be used.
transport : set to "tcp" to use the default setting of transport which is
raw TCP. Set to "websockets" to use WebSockets as the transport.
clean_session : a boolean that determines the client type. If True,
the broker will remove all information about this client
when it disconnects. If False, the client is a persistent
client and subscription information and queued messages
will be retained when the client disconnects.
Defaults to True.
proxy_args: a dictionary that will be given to the client.
"""
if qos < 0 or qos > 2:
raise ValueError('qos must be in the range 0-2')
callback_userdata = {
'callback':callback,
'topics':topics,
'qos':qos,
'userdata':userdata}
client = paho.Client(client_id=client_id, userdata=callback_userdata,
protocol=protocol, transport=transport,
clean_session=clean_session)
client.on_message = _on_message_callback
client.on_connect = _on_connect
if proxy_args is not None:
client.proxy_set(**proxy_args)
if auth:
username = auth.get('username')
if username:
password = auth.get('password')
client.username_pw_set(username, password)
else:
raise KeyError("The 'username' key was not found, this is "
"required for auth")
if will is not None:
client.will_set(**will)
if tls is not None:
if isinstance(tls, dict):
insecure = tls.pop('insecure', False)
client.tls_set(**tls)
if insecure:
# Must be set *after* the `client.tls_set()` call since it sets
# up the SSL context that `client.tls_insecure_set` alters.
client.tls_insecure_set(insecure)
else:
# Assume input is SSLContext object
client.tls_set_context(tls)
client.connect(hostname, port, keepalive)
client.loop_forever()
def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None,
tls=None, protocol=paho.MQTTv311, transport="tcp",
clean_session=True, proxy_args=None):
"""Subscribe to a list of topics and return msg_count messages.
This function creates an MQTT client, connects to a broker and subscribes
to a list of topics. Once "msg_count" messages have been received, it
disconnects cleanly from the broker and returns the messages.
topics : either a string containing a single topic to subscribe to, or a
list of topics to subscribe to.
qos : the qos to use when subscribing. This is applied to all topics.
msg_count : the number of messages to retrieve from the broker.
if msg_count == 1 then a single MQTTMessage will be returned.
if msg_count > 1 then a list of MQTTMessages will be returned.
retained : If set to True, retained messages will be processed the same as
non-retained messages. If set to False, retained messages will
be ignored. This means that with retained=False and msg_count=1,
the function will return the first message received that does
not have the retained flag set.
hostname : a string containing the address of the broker to connect to.
Defaults to localhost.
port : the port to connect to the broker on. Defaults to 1883.
client_id : the MQTT client id to use. If "" or None, the Paho library will
generate a client id automatically.
keepalive : the keepalive timeout value for the client. Defaults to 60
seconds.
will : a dict containing will parameters for the client: will = {'topic':
"<topic>", 'payload':"<payload">, 'qos':<qos>, 'retain':<retain>}.
Topic is required, all other parameters are optional and will
default to None, 0 and False respectively.
Defaults to None, which indicates no will should be used.
auth : a dict containing authentication parameters for the client:
auth = {'username':"<username>", 'password':"<password>"}
Username is required, password is optional and will default to None
if not provided.
Defaults to None, which indicates no authentication is to be used.
tls : a dict containing TLS configuration parameters for the client:
dict = {'ca_certs':"<ca_certs>", 'certfile':"<certfile>",
'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
'ciphers':"<ciphers">, 'insecure':"<bool>"}
ca_certs is required, all other parameters are optional and will
default to None if not provided, which results in the client using
the default behaviour - see the paho.mqtt.client documentation.
Alternatively, tls input can be an SSLContext object, which will be
processed using the tls_set_context method.
Defaults to None, which indicates that TLS should not be used.
transport : set to "tcp" to use the default setting of transport which is
raw TCP. Set to "websockets" to use WebSockets as the transport.
clean_session : a boolean that determines the client type. If True,
the broker will remove all information about this client
when it disconnects. If False, the client is a persistent
client and subscription information and queued messages
will be retained when the client disconnects.
Defaults to True.
proxy_args: a dictionary that will be given to the client.
"""
if msg_count < 1:
raise ValueError('msg_count must be > 0')
# Set ourselves up to return a single message if msg_count == 1, or a list
# if > 1.
if msg_count == 1:
messages = None
else:
messages = []
userdata = {'retained':retained, 'msg_count':msg_count, 'messages':messages}
callback(_on_message_simple, topics, qos, userdata, hostname, port,
client_id, keepalive, will, auth, tls, protocol, transport,
clean_session, proxy_args)
return userdata['messages']

View File

@ -0,0 +1,110 @@
"""
*******************************************************************
Copyright (c) 2017, 2019 IBM Corp.
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Ian Craggs - initial implementation and/or documentation
*******************************************************************
"""
import sys
class MQTTException(Exception):
pass
class SubscribeOptions(object):
"""The MQTT v5.0 subscribe options class.
The options are:
qos: As in MQTT v3.1.1.
noLocal: True or False. If set to True, the subscriber will not receive its own publications.
retainAsPublished: True or False. If set to True, the retain flag on received publications will be as set
by the publisher.
retainHandling: RETAIN_SEND_ON_SUBSCRIBE, RETAIN_SEND_IF_NEW_SUB or RETAIN_DO_NOT_SEND
Controls when the broker should send retained messages:
- RETAIN_SEND_ON_SUBSCRIBE: on any successful subscribe request
- RETAIN_SEND_IF_NEW_SUB: only if the subscribe request is new
- RETAIN_DO_NOT_SEND: never send retained messages
"""
# retain handling options
RETAIN_SEND_ON_SUBSCRIBE, RETAIN_SEND_IF_NEW_SUB, RETAIN_DO_NOT_SEND = range(
0, 3)
def __init__(self, qos=0, noLocal=False, retainAsPublished=False, retainHandling=RETAIN_SEND_ON_SUBSCRIBE):
"""
qos: 0, 1 or 2. 0 is the default.
noLocal: True or False. False is the default and corresponds to MQTT v3.1.1 behavior.
retainAsPublished: True or False. False is the default and corresponds to MQTT v3.1.1 behavior.
retainHandling: RETAIN_SEND_ON_SUBSCRIBE, RETAIN_SEND_IF_NEW_SUB or RETAIN_DO_NOT_SEND
RETAIN_SEND_ON_SUBSCRIBE is the default and corresponds to MQTT v3.1.1 behavior.
"""
object.__setattr__(self, "names",
["QoS", "noLocal", "retainAsPublished", "retainHandling"])
self.QoS = qos # bits 0,1
self.noLocal = noLocal # bit 2
self.retainAsPublished = retainAsPublished # bit 3
self.retainHandling = retainHandling # bits 4 and 5: 0, 1 or 2
assert self.QoS in [0, 1, 2]
assert self.retainHandling in [
0, 1, 2], "Retain handling should be 0, 1 or 2"
def __setattr__(self, name, value):
if name not in self.names:
raise MQTTException(
name + " Attribute name must be one of "+str(self.names))
object.__setattr__(self, name, value)
def pack(self):
assert self.QoS in [0, 1, 2]
assert self.retainHandling in [
0, 1, 2], "Retain handling should be 0, 1 or 2"
noLocal = 1 if self.noLocal else 0
retainAsPublished = 1 if self.retainAsPublished else 0
data = [(self.retainHandling << 4) | (retainAsPublished << 3) |
(noLocal << 2) | self.QoS]
if sys.version_info[0] >= 3:
buffer = bytes(data)
else:
buffer = bytearray(data)
return buffer
def unpack(self, buffer):
b0 = buffer[0]
self.retainHandling = ((b0 >> 4) & 0x03)
self.retainAsPublished = True if ((b0 >> 3) & 0x01) == 1 else False
self.noLocal = True if ((b0 >> 2) & 0x01) == 1 else False
self.QoS = (b0 & 0x03)
assert self.retainHandling in [
0, 1, 2], "Retain handling should be 0, 1 or 2, not %d" % self.retainHandling
assert self.QoS in [
0, 1, 2], "QoS should be 0, 1 or 2, not %d" % self.QoS
return 1
def __repr__(self):
return str(self)
def __str__(self):
return "{QoS="+str(self.QoS)+", noLocal="+str(self.noLocal) +\
", retainAsPublished="+str(self.retainAsPublished) +\
", retainHandling="+str(self.retainHandling)+"}"
def json(self):
data = {
"QoS": self.QoS,
"noLocal": self.noLocal,
"retainAsPublished": self.retainAsPublished,
"retainHandling": self.retainHandling,
}
return data

View File

@ -1,10 +1,8 @@
import sys
import json
import boto3
import zlib
import base64
import datetime
import functools
from email.utils import parsedate
import os
import re
@ -367,45 +365,3 @@ def lambda_handler(event, context):
else:
return {"statusCode": 200, "body": "^v^ telm logged"}
if __name__ == "__main__":
payload = {
"version": "2.0",
"routeKey": "PUT /sondes/telemetry",
"rawPath": "/sondes/telemetry",
"rawQueryString": "",
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate",
"content-encoding": "gzip",
"content-length": "2135",
"content-type": "application/json",
"host": "api.v2.sondehub.org",
"user-agent": "autorx-1.4.1-beta4",
"x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a",
"x-forwarded-for": "103.107.130.22",
"x-forwarded-port": "443",
"x-forwarded-proto": "https",
"date": "Sun, 31 Jan 2021 00:21:45 GMT",
},
"requestContext": {
"accountId": "143841941773",
"apiId": "r03szwwq41",
"domainName": "api.v2.sondehub.org",
"domainPrefix": "api",
"http": {
"method": "PUT",
"path": "/sondes/telemetry",
"protocol": "HTTP/1.1",
"sourceIp": "103.107.130.22",
"userAgent": "autorx-1.4.1-beta4",
},
"requestId": "Z_NJvh0RoAMEJaw=",
"routeKey": "PUT /sondes/telemetry",
"stage": "$default",
"time": "31/Jan/2021:00:10:25 +0000",
"timeEpoch": 1612051825409,
},
"body": "H4sIAFsEMmEAA+XaS2/jNhAA4Pv+CiNnh+X74fOiLYr2UvTUojC0MbMRYEuuLDtdFP3vHWlFKiapmMAaEFAtkMOOKVkjfiGHZP74sFr9Az+r1cOpfm5fi8Zuq+JgHzarh6bYlfWprnZ2W5zbetv8/bAOml5scyrrqmtNkEACGnz42uR83NfFzjbbp2K/P5Wf+zYffyY//fjrI+nbXTcrqtZWVdHf6Tu+ei0udnWoq/pY76372rY82G1jn2x5sbuuJcWUPGLziNlvWG4E3giOhDBaqt/dNbuitd116eYM4f6fb34oqvNz8dSeG9t0l/zQFK/+678c+9t8/P4X/yLOn95EsfFx25TFvk9GY40Zx+6T5+br2yWMMGYIFmz4YF+0EOYKaUYUd8H+3RKMtGGaDMFi37UkUmvEqXs2ezhCUCE5BC52v71AhCLC34ReIAT9pIfQi4U+rj5DUENDd7NT0Z4gZIb/firatr+MuTu9nA/lrmy/QPSR+MTsX2dbPXVBjikS2F1/qpo+Yf9sx/pUtgMbn7BL0r2oUUafruAC4v+u/2deBeKSSZzrlc/vlae8imuvtO9KE3g1GBl+2ytLeFWRV6UQY7leb9qEb522KZxNsyybCmwaTmSmTTm/TZmyqa9tdl3JsQptSsQjmyKyyWObOLapCSL6bjYZItM29doltCCbZIMZvCcqlcixCc3p7DYlTdjUKmFTBDYVjE5c3rYpTGCTI8oimwpn2KS5NumkTEjNpbMsmfAM0NtaZ8qcf0aXqRndJGD6qtTDVMiwECaPYZIAph4JXg2avuG3w2TvTOhm7fJZlkyY4qDQNlm1Zg95dpkiJZMkaGoW0KQYMXKbJglpCkRjmhKqdHM3mgSpaZpk7RJals1uKauFzJ3P1fw2VcqmiG2KUKZAMkOmkIFMPRafbwZNjPDNVXv+bO6fIyFzWAWJZbk0SDAmNcl0qed3qVMuE3Xm2PtephrVTcqEbqGBTIFimJIgKu4I850hcyg0BV7S8pxsYB4RHLPcQhPe0dw0FY5p0gRMGg6ZjCA/jL4D04QLIIlMos5Uo+B7bBxNLs7p2qWzLJfdLglXima6JPO7JAmXOFFmChaugBgfdyG9TBbIpIiJSGZi20gBpptl5l1k4qHMhISWZZMjLnH2EojMfzykEsdDFMuETT+oeZsaaZ1hk0U2ZcImLIzU3WxOrs271Fw6y5IJi3NFlOSZMudfnKvE4pzixL6R8CWpk8kZ0tHBZSyThIUmLIFIJFOYsTj4dpnYb6wmbA4bR5DQsmxC5yiuOM60Of9BkEocBFGSqjVVZFMmNo5imyIcNRXyo/LVjK7veUg5uQrqknMJLcsmjAnKYJa5cUTm3zhSwcaR7ruPJWzq8CCIG/hFvG3ThIeUciwErv7gQ93P5vSMDh+5dN6X+ed/QRi1DYwkAAA=",
"isBase64Encoded": True,
}
print(lambda_handler(payload, {}))

View File

@ -0,0 +1,42 @@
from . import *
payload = {
"version": "2.0",
"routeKey": "PUT /sondes/telemetry",
"rawPath": "/sondes/telemetry",
"rawQueryString": "",
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate",
"content-encoding": "gzip",
"content-length": "2135",
"content-type": "application/json",
"host": "api.v2.sondehub.org",
"user-agent": "autorx-1.4.1-beta4",
"x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a",
"x-forwarded-for": "103.107.130.22",
"x-forwarded-port": "443",
"x-forwarded-proto": "https",
"date": "Sun, 31 Jan 2021 00:21:45 GMT",
},
"requestContext": {
"accountId": "143841941773",
"apiId": "r03szwwq41",
"domainName": "api.v2.sondehub.org",
"domainPrefix": "api",
"http": {
"method": "PUT",
"path": "/sondes/telemetry",
"protocol": "HTTP/1.1",
"sourceIp": "103.107.130.22",
"userAgent": "autorx-1.4.1-beta4",
},
"requestId": "Z_NJvh0RoAMEJaw=",
"routeKey": "PUT /sondes/telemetry",
"stage": "$default",
"time": "31/Jan/2021:00:10:25 +0000",
"timeEpoch": 1612051825409,
},
"body": "H4sIAFsEMmEAA+XaS2/jNhAA4Pv+CiNnh+X74fOiLYr2UvTUojC0MbMRYEuuLDtdFP3vHWlFKiapmMAaEFAtkMOOKVkjfiGHZP74sFr9Az+r1cOpfm5fi8Zuq+JgHzarh6bYlfWprnZ2W5zbetv8/bAOml5scyrrqmtNkEACGnz42uR83NfFzjbbp2K/P5Wf+zYffyY//fjrI+nbXTcrqtZWVdHf6Tu+ei0udnWoq/pY76372rY82G1jn2x5sbuuJcWUPGLziNlvWG4E3giOhDBaqt/dNbuitd116eYM4f6fb34oqvNz8dSeG9t0l/zQFK/+678c+9t8/P4X/yLOn95EsfFx25TFvk9GY40Zx+6T5+br2yWMMGYIFmz4YF+0EOYKaUYUd8H+3RKMtGGaDMFi37UkUmvEqXs2ezhCUCE5BC52v71AhCLC34ReIAT9pIfQi4U+rj5DUENDd7NT0Z4gZIb/firatr+MuTu9nA/lrmy/QPSR+MTsX2dbPXVBjikS2F1/qpo+Yf9sx/pUtgMbn7BL0r2oUUafruAC4v+u/2deBeKSSZzrlc/vlae8imuvtO9KE3g1GBl+2ytLeFWRV6UQY7leb9qEb522KZxNsyybCmwaTmSmTTm/TZmyqa9tdl3JsQptSsQjmyKyyWObOLapCSL6bjYZItM29doltCCbZIMZvCcqlcixCc3p7DYlTdjUKmFTBDYVjE5c3rYpTGCTI8oimwpn2KS5NumkTEjNpbMsmfAM0NtaZ8qcf0aXqRndJGD6qtTDVMiwECaPYZIAph4JXg2avuG3w2TvTOhm7fJZlkyY4qDQNlm1Zg95dpkiJZMkaGoW0KQYMXKbJglpCkRjmhKqdHM3mgSpaZpk7RJals1uKauFzJ3P1fw2VcqmiG2KUKZAMkOmkIFMPRafbwZNjPDNVXv+bO6fIyFzWAWJZbk0SDAmNcl0qed3qVMuE3Xm2PtephrVTcqEbqGBTIFimJIgKu4I850hcyg0BV7S8pxsYB4RHLPcQhPe0dw0FY5p0gRMGg6ZjCA/jL4D04QLIIlMos5Uo+B7bBxNLs7p2qWzLJfdLglXima6JPO7JAmXOFFmChaugBgfdyG9TBbIpIiJSGZi20gBpptl5l1k4qHMhISWZZMjLnH2EojMfzykEsdDFMuETT+oeZsaaZ1hk0U2ZcImLIzU3WxOrs271Fw6y5IJi3NFlOSZMudfnKvE4pzixL6R8CWpk8kZ0tHBZSyThIUmLIFIJFOYsTj4dpnYb6wmbA4bR5DQsmxC5yiuOM60Of9BkEocBFGSqjVVZFMmNo5imyIcNRXyo/LVjK7veUg5uQrqknMJLcsmjAnKYJa5cUTm3zhSwcaR7ruPJWzq8CCIG/hFvG3ThIeUciwErv7gQ93P5vSMDh+5dN6X+ed/QRi1DYwkAAA=",
"isBase64Encoded": True,
}
print(lambda_handler(payload, {}))

View File

@ -1,33 +1,6 @@
import json
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import boto3
import botocore.credentials
import os
from io import BytesIO
import gzip
null = None
HOST = os.getenv("ES")
http_session = URLLib3Session()
import es
def es_request(payload, path, method):
# get aws creds
session = boto3.Session()
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(payload.encode('utf-8'))
payload = compressed.getvalue()
headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://{HOST}/{path}", data=payload, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
r = http_session.send(request.prepare())
if r.status_code != 200:
raise RuntimeError
return json.loads(r.text)
def lambda_handler(event, context):
@ -52,7 +25,7 @@ def lambda_handler(event, context):
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
body += "\n"
result = es_request(body, f"telm-{index}/_doc/_bulk", "POST")
result = es.request(body, f"telm-{index}/_doc/_bulk", "POST")
if 'errors' in result and result['errors'] == True:
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
error_types = [a for a in error_types if a != 'mapper_parsing_exception'] # filter out mapper failures since they will never succeed

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,56 @@
import json
import zlib
import base64
import datetime
from email.utils import parsedate
import es
def lambda_handler(event, context):
if "isBase64Encoded" in event and event["isBase64Encoded"] == True:
event["body"] = base64.b64decode(event["body"])
if (
"content-encoding" in event["headers"]
and event["headers"]["content-encoding"] == "gzip"
):
event["body"] = zlib.decompress(event["body"], 16 + zlib.MAX_WBITS)
time_delta = None
if "date" in event["headers"]:
try:
time_delta_header = event["headers"]["date"]
time_delta = (
datetime.datetime(*parsedate(time_delta_header)[:7])
- datetime.datetime.utcfromtimestamp(event["requestContext"]["timeEpoch"]/1000)
).total_seconds()
except:
pass
try:
payload = json.loads(event["body"])
except:
return {"statusCode": 400, "body": "JSON decode issue"}
print(payload)
if "user-agent" in event["headers"]:
event["time_server"] = datetime.datetime.now().isoformat()
payload["user-agent"] = event["headers"]["user-agent"]
if time_delta:
payload["upload_time_delta"] = time_delta
payload.pop("uploader_contact_email", None)
# clean up None reports
if "uploader_position" in payload and None == payload["uploader_position"] or None in payload["uploader_position"]:
payload.pop("uploader_position", None)
if "uploader_position" in payload:
(payload["uploader_alt"], payload["uploader_position_elk"]) = (
payload["uploader_position"][2],
f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}",
)
index = datetime.datetime.utcnow().strftime("listeners-%Y-%m")
payload["ts"] = datetime.datetime.utcnow().isoformat()
es.request(json.dumps(payload),f"{index}/_doc","POST")
return {"statusCode": 200, "body": "^v^ telm logged"}

View File

@ -0,0 +1,43 @@
from . import *
payload = {
"version": "2.0",
"routeKey": "PUT /sondes/telemetry",
"rawPath": "/sondes/telemetry",
"rawQueryString": "",
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate",
"content-length": "2135",
"content-type": "application/json",
"host": "api.v2.sondehub.org",
"user-agent": "autorx-1.4.1-beta4",
"x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a",
"x-forwarded-for": "103.107.130.22",
"x-forwarded-port": "443",
"x-forwarded-proto": "https",
"date": "Sun, 31 Jan 2021 00:21:45 GMT",
},
"requestContext": {
"accountId": "143841941773",
"apiId": "r03szwwq41",
"domainName": "api.v2.sondehub.org",
"domainPrefix": "api",
"http": {
"method": "PUT",
"path": "/sondes/telemetry",
"protocol": "HTTP/1.1",
"sourceIp": "103.107.130.22",
"userAgent": "autorx-1.4.1-beta4",
},
"requestId": "Z_NJvh0RoAMEJaw=",
"routeKey": "PUT /sondes/telemetry",
"stage": "$default",
"time": "31/Jan/2021:00:10:25 +0000",
"timeEpoch": 1612051825409,
},
"body": """
{"software_name": "radiosonde_auto_rx", "software_version": "1.5.8-beta2", "uploader_callsign": "LZ3DJ-18", "uploader_position": [null,null,null], "uploader_antenna": "Dipole", "uploader_contact_email": "none@none.com", "mobile": false}
""",
"isBase64Encoded": False,
}
print(lambda_handler(payload, {}))

View File

@ -0,0 +1,21 @@
import boto3
import json
import re
MATCH_OBJECT = re.compile(r"^gfs.\d{8}/\d{2}/atmos/gfs.t\d{2}z.pgrb2.0p50.f192$")
BUCKET = 'noaa-gfs-bdp-pds'
SERVICE_NAME="tawhiri"
CLUSTER_NAME="Tawhiri"
ecs = boto3.client('ecs', region_name="us-east-1")
def handler(event, context):
for record in event["Records"]:
message = json.loads(record["Sns"]["Message"])
for inner_record in message['Records']:
if "ObjectCreated" in inner_record['eventName']:
if inner_record['s3']['bucket']['name'] == BUCKET:
print(inner_record['s3']['object']['key'])
if MATCH_OBJECT.match(inner_record['s3']['object']['key']):
print(f"Found new GFS - updating service {inner_record['s3']['object']['key']}")
ecs.update_service(cluster=CLUSTER_NAME, service=SERVICE_NAME, forceNewDeployment=True)

View File

@ -1,26 +1,5 @@
import boto3
import json
import re
MATCH_OBJECT = re.compile(r"^gfs.\d{8}/\d{2}/atmos/gfs.t\d{2}z.pgrb2.0p50.f192$")
BUCKET = 'noaa-gfs-bdp-pds'
SERVICE_NAME="tawhiri"
CLUSTER_NAME="Tawhiri"
ecs = boto3.client('ecs', region_name="us-east-1")
def handler(event, context):
for record in event["Records"]:
message = json.loads(record["Sns"]["Message"])
for inner_record in message['Records']:
if "ObjectCreated" in inner_record['eventName']:
if inner_record['s3']['bucket']['name'] == BUCKET:
print(inner_record['s3']['object']['key'])
if MATCH_OBJECT.match(inner_record['s3']['object']['key']):
print(f"Found new GFS - updating service {inner_record['s3']['object']['key']}")
ecs.update_service(cluster=CLUSTER_NAME, service=SERVICE_NAME, forceNewDeployment=True)
if __name__ == "__main__":
handler(
from . import *
handler(
{
"Records": [
{

View File

@ -123,4 +123,10 @@ resource "aws_acm_certificate" "CertificateManagerCertificate_root" {
"*.sondehub.org"
]
validation_method = "DNS"
}
data "archive_file" "lambda" {
type = "zip"
source_dir = "lambda/"
output_path = "${path.module}/build/lambda.zip"
}

View File

@ -1,9 +1,3 @@
data "archive_file" "predict_updater" {
type = "zip"
source_file = "predict_updater/lambda_function.py"
output_path = "${path.module}/build/predict_updater.zip"
}
resource "aws_iam_role" "predict_updater" {
path = "/service-role/"
name = "predict-updater"
@ -68,9 +62,9 @@ EOF
resource "aws_lambda_function" "predict_updater" {
function_name = "predict_updater"
handler = "lambda_function.predict"
filename = "${path.module}/build/predict_updater.zip"
source_code_hash = data.archive_file.predict_updater.output_base64sha256
handler = "predict_updater.predict"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 1024
role = aws_iam_role.predict_updater.arn
@ -142,23 +136,11 @@ resource "aws_apigatewayv2_integration" "reverse_predictions" {
payload_format_version = "2.0"
}
data "archive_file" "predictions" {
type = "zip"
source_file = "predict/lambda_function.py"
output_path = "${path.module}/build/predictions.zip"
}
data "archive_file" "reverse_predictions" {
type = "zip"
source_file = "reverse-predict/lambda_function.py"
output_path = "${path.module}/build/reverse-predict.zip"
}
resource "aws_lambda_function" "predictions" {
function_name = "predictions"
handler = "lambda_function.predict"
filename = "${path.module}/build/predictions.zip"
source_code_hash = data.archive_file.predictions.output_base64sha256
handler = "predict.predict"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.basic_lambda_role.arn
@ -181,9 +163,9 @@ resource "aws_lambda_permission" "predictions" {
resource "aws_lambda_function" "reverse_predictions" {
function_name = "reverse-predictions"
handler = "lambda_function.predict"
filename = "${path.module}/build/reverse-predict.zip"
source_code_hash = data.archive_file.reverse_predictions.output_base64sha256
handler = "reverse-predict.predict"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.basic_lambda_role.arn
@ -674,17 +656,11 @@ EOF
role = aws_iam_role.predictor_update_trigger_lambda.name
}
data "archive_file" "predictor_update_trigger_lambda" {
type = "zip"
source_file = "tawhiri-updater/index.py"
output_path = "${path.module}/build/tawhiri-updater.zip"
}
resource "aws_lambda_function" "predictor_update_trigger_lambda" {
function_name = "tawhiri-updater"
handler = "index.handler"
filename = "${path.module}/build/tawhiri-updater.zip"
source_code_hash = data.archive_file.predictor_update_trigger_lambda.output_base64sha256
handler = "tawhiri-updater.handler"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.predictor_update_trigger_lambda.arn

View File

@ -1,25 +1,9 @@
data "archive_file" "query" {
type = "zip"
source_file = "query/lambda_function.py"
output_path = "${path.module}/build/query.zip"
}
resource "aws_lambda_function" "get_sondes" {
function_name = "query"
handler = "lambda_function.get_sondes"
filename = "${path.module}/build/query.zip"
source_code_hash = data.archive_file.query.output_base64sha256
handler = "query.get_sondes"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 256
role = aws_iam_role.basic_lambda_role.arn
@ -40,9 +24,9 @@ resource "aws_lambda_function" "get_sondes" {
resource "aws_lambda_function" "get_telem" {
function_name = "get_telem"
handler = "lambda_function.get_telem"
filename = "${path.module}/build/query.zip"
source_code_hash = data.archive_file.query.output_base64sha256
handler = "query.get_telem"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 256
role = aws_iam_role.basic_lambda_role.arn
@ -58,9 +42,9 @@ resource "aws_lambda_function" "get_telem" {
resource "aws_lambda_function" "get_sites" {
function_name = "get_sites"
handler = "lambda_function.get_sites"
filename = "${path.module}/build/query.zip"
source_code_hash = data.archive_file.query.output_base64sha256
handler = "query.get_sites"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 256
role = aws_iam_role.basic_lambda_role.arn
@ -76,9 +60,9 @@ resource "aws_lambda_function" "get_sites" {
resource "aws_lambda_function" "get_listener_telemetry" {
function_name = "get_listener_telemetry"
handler = "lambda_function.get_listener_telemetry"
filename = "${path.module}/build/query.zip"
source_code_hash = data.archive_file.query.output_base64sha256
handler = "query.get_listener_telemetry"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 256
role = aws_iam_role.basic_lambda_role.arn

View File

@ -1,8 +1,3 @@
data "archive_file" "recovered" {
type = "zip"
source_file = "recovered/lambda_function.py"
output_path = "${path.module}/build/recovered.zip"
}
resource "aws_iam_role" "recovered" {
path = "/service-role/"
@ -58,9 +53,9 @@ EOF
resource "aws_lambda_function" "recovered_get" {
function_name = "recovered_get"
handler = "lambda_function.get"
filename = "${path.module}/build/recovered.zip"
source_code_hash = data.archive_file.recovered.output_base64sha256
handler = "recovered.get"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.recovered.arn
@ -77,9 +72,9 @@ resource "aws_lambda_function" "recovered_get" {
resource "aws_lambda_function" "recovered_stats" {
function_name = "recovered_stats"
handler = "lambda_function.stats"
filename = "${path.module}/build/recovered.zip"
source_code_hash = data.archive_file.recovered.output_base64sha256
handler = "recovered.stats"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.recovered.arn
@ -96,9 +91,9 @@ resource "aws_lambda_function" "recovered_stats" {
resource "aws_lambda_function" "recovered_put" {
function_name = "recovered_put"
handler = "lambda_function.put"
filename = "${path.module}/build/recovered.zip"
source_code_hash = data.archive_file.recovered.output_base64sha256
handler = "recovered.put"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.recovered.arn

View File

@ -1,13 +0,0 @@
import boto3
import time
import uuid
import urllib.parse
import hmac, datetime, hashlib
import os
def lambda_handler(event, context):
return {"statusCode": 200, "body": "wss://ws-reader.v2.sondehub.org/"}
if __name__ == "__main__":
print(lambda_handler({}, {}))

View File

@ -1 +0,0 @@
Lambda function which reads off SQS queue and processes sonde data to S3.

View File

@ -1,106 +0,0 @@
import sys
sys.path.append("vendor/lib/python3.9/site-packages")
import json
import boto3
import os
import uuid
import hashlib
import asyncio
import aioboto3
BUCKET = "sondehub-open-data"
def values_to_hash(payload):
fields = [
"type",
"serial",
"frame",
"datetime",
"lat",
"lon",
"alt",
"subtype",
"temp",
"humidity",
"pressure",
"vel_h",
"vel_v",
"heading",
"sats",
"batt",
"burst_timer",
"xdata"
]
output = ""
for field in fields:
if field in payload:
output += str(payload[field])
return output
def set_connection_header(request, operation_name, **kwargs):
request.headers['Connection'] = 'keep-alive'
async def upload(event, context):
async with aioboto3.client("s3") as s3:
s3.meta.events.register('request-created.s3', set_connection_header)
tasks = []
payloads = {}
for record in event['Records']:
sns_message = json.loads(record["body"])
if type(json.loads(sns_message["Message"])) == dict:
incoming_payloads = [json.loads(sns_message["Message"])]
else:
incoming_payloads = json.loads(sns_message["Message"])
for payload in incoming_payloads:
body = json.dumps(payload)
id = str(uuid.uuid4())
hash = hashlib.sha256(values_to_hash(payload).encode("utf-8")).hexdigest()
filenames = [
f"date/{payload['datetime']}-{payload['serial']}-{id}.json",
f"serial/{payload['serial']}/{payload['datetime']}-{id}.json",
f"serial-hashed/{payload['serial']}/{payload['datetime']}-{hash}.json"
]
for filename in filenames:
tasks.append(s3.put_object(
ACL="public-read",
Bucket=BUCKET,
Body=body,
Key=filename
))
await asyncio.gather(*tasks)
def lambda_handler(event, context):
asyncio.run(upload(event, context))
# test event
###########
if __name__ == "__main__":
demo_event = {
"Records": [
{
"messageId": "262d4090-e23b-4907-b677-3c94334dc899",
"receiptHandle": "AQEBL1FXHS4m+Om59KZH9ayxC5VBqDEDh6DgXUZuBhV2uQJS312bhOTpLvptuCCIWaeLkfHU+7NajqV2kTVhnz5lehE/zfQ8OU1jqqm+cHxyul99MxA7K7+C+ww2Ri9KSbgaAgqvZzcLbwpW8rP0MNhrBcIQAE5Pz1urfTZKx1RVnv/XQHbR2ARPwocOzk2yEexa0y2f7FedS4F10gju8Ypp0Zr4DSRb1zUkES3QJGiSJakaO1QJT5npRySjAd0CUSPXw7IDTejolfGkItQG5eMRx0enELTUDv8LPsHJkr7ha3DHNfbvxTtdk406nWFn8U8DW515emp7+Y+AD469OnceIMdVC62GHwrpMkedXzLEH0C8TOXHQ+WuRkhR1dauwKqO",
"body": "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"65147554-e06d-5324-a87d-2da107fea807\",\n \"TopicArn\" : \"arn:aws:sns:us-east-1:143841941773:sonde-telem\",\n \"Message\" : \"{\\\"software_name\\\":\\\"radiosonde_auto_rx\\\",\\\"software_version\\\":\\\"1.5.1\\\",\\\"uploader_callsign\\\":\\\"BIOWL1\\\",\\\"uploader_position\\\":\\\"52.014417,8.47351\\\",\\\"uploader_antenna\\\":\\\"SirioCX395\\\",\\\"time_received\\\":\\\"2021-04-18T07:52:37.196266Z\\\",\\\"datetime\\\":\\\"2021-04-18T07:52:53.001000Z\\\",\\\"manufacturer\\\":\\\"Vaisala\\\",\\\"type\\\":\\\"RS41\\\",\\\"serial\\\":\\\"meowmeowtest\\\",\\\"subtype\\\":\\\"RS41-SGP\\\",\\\"frame\\\":12781,\\\"lat\\\":50.65064,\\\"lon\\\":6.60805,\\\"alt\\\":2954.44289,\\\"temp\\\":-9.3,\\\"humidity\\\":75.4,\\\"pressure\\\":709.79,\\\"vel_v\\\":-2.85326,\\\"vel_h\\\":8.53055,\\\"heading\\\":236.0122,\\\"sats\\\":9,\\\"batt\\\":2.7,\\\"frequency\\\":405.3,\\\"burst_timer\\\":25423,\\\"snr\\\":12.5,\\\"user-agent\\\":\\\"Amazon CloudFront\\\",\\\"position\\\":\\\"50.65064,6.60805\\\",\\\"upload_time_delta\\\":-0.713689,\\\"uploader_alt\\\":340}\",\n \"Timestamp\" : \"2021-04-18T07:52:51.776Z\",\n \"SignatureVersion\" : \"1\",\n \"Signature\" : \"qXuYwDAGPYYLjKXfDtF69AWKDEhhz9MXlqxO2nBwJ/dgOqNSUZtDPqOYSuge3jVCoTSRY5qGw38gg2G+JnEbJd8SVvp9GRsFre8MKWu8T0obq3rj8S0YAh7dTqi4EILIMmi2KziasCDQlrVuZvCSgPnC+hYF3GByI626QW6m3a4E2igclvbE+O6x6qvVDKwmf/eh+8LRiH1PCrEckiXthnr+qOCiTcstyZoOqMOShJBun9k0DK07+Yf1tYDPSHnqZSIaOvAMSjIKKXfGCkel3SWieO7Zgk7xQuo9Z1bcV8Miu4uEvge4G9HKU3S41zaVcQjYvEhQLxxgd1x3HxXImA==\",\n \"SigningCertURL\" : \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService-010a507c1833636cd94bdb98bd93083a.pem\",\n \"UnsubscribeURL\" : \"https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:143841941773:sonde-telem:1a52ac41-6e17-43da-bfb6-114577c94ca6\"\n}",
"attributes": {
"ApproximateReceiveCount": "2",
"SentTimestamp": "1618732371814",
"SenderId": "AIDAIT2UOQQY3AUEKVGXU",
"ApproximateFirstReceiveTimestamp": "1618732640317"
},
"messageAttributes": {},
"md5OfMessageAttributes": None,
"md5OfBody": "a0191fc5ea3705340c088e457c31095b",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:143841941773:to-elk",
"awsRegion": "us-east-1"
}
]
}
lambda_handler(demo_event, {})

View File

@ -1,9 +1,3 @@
data "archive_file" "sqs_to_elk" {
type = "zip"
source_file = "sqs-to-elk/lambda_function.py"
output_path = "${path.module}/build/sqs-to-elk.zip"
}
resource "aws_iam_role" "sqs_to_elk" {
path = "/service-role/"
name = "sqs-to-elk"
@ -62,9 +56,9 @@ EOF
resource "aws_lambda_function" "sqs_to_elk" {
function_name = "sqs-to-elk"
handler = "lambda_function.lambda_handler"
filename = "${path.module}/build/sqs-to-elk.zip"
source_code_hash = data.archive_file.sqs_to_elk.output_base64sha256
handler = "sqs-to-elk.lambda_handler"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.sqs_to_elk.arn

View File

@ -1,147 +0,0 @@
import json
import boto3
import zlib
import base64
import datetime
import functools
import uuid
import threading
from email.utils import parsedate
from botocore.awsrequest import AWSRequest
from botocore.endpoint import URLLib3Session
from botocore.auth import SigV4Auth
import boto3
import botocore.credentials
import os
from io import BytesIO
import gzip
from multiprocessing import Process
http_session = URLLib3Session()
def mirror(path,params):
session = boto3.Session()
headers = {"Host": "search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com", "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://search-sondes-v2-hiwdpmnjbuckpbwfhhx65mweee.us-east-1.es.amazonaws.com/{path}", data=params, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
r = http_session.send(request.prepare())
HOST = os.getenv("ES")
def lambda_handler(event, context):
if "isBase64Encoded" in event and event["isBase64Encoded"] == True:
event["body"] = base64.b64decode(event["body"])
if (
"content-encoding" in event["headers"]
and event["headers"]["content-encoding"] == "gzip"
):
event["body"] = zlib.decompress(event["body"], 16 + zlib.MAX_WBITS)
time_delta = None
if "date" in event["headers"]:
try:
time_delta_header = event["headers"]["date"]
time_delta = (
datetime.datetime(*parsedate(time_delta_header)[:7])
- datetime.datetime.utcfromtimestamp(event["requestContext"]["timeEpoch"]/1000)
).total_seconds()
except:
pass
try:
payload = json.loads(event["body"])
except:
return {"statusCode": 400, "body": "JSON decode issue"}
print(payload)
if "user-agent" in event["headers"]:
event["time_server"] = datetime.datetime.now().isoformat()
payload["user-agent"] = event["headers"]["user-agent"]
if time_delta:
payload["upload_time_delta"] = time_delta
payload.pop("uploader_contact_email", None)
# clean up None reports
if "uploader_position" in payload and None == payload["uploader_position"]:
payload.pop("uploader_position", None)
if "uploader_position" in payload:
(payload["uploader_alt"], payload["uploader_position_elk"]) = (
payload["uploader_position"][2],
f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}",
)
index = datetime.datetime.utcnow().strftime("listeners-%Y-%m")
payload["ts"] = datetime.datetime.utcnow().isoformat()
es_request(json.dumps(payload),f"{index}/_doc","POST")
return {"statusCode": 200, "body": "^v^ telm logged"}
def es_request(payload, path, method):
# get aws creds
session = boto3.Session()
compressed = BytesIO()
with gzip.GzipFile(fileobj=compressed, mode='w') as f:
f.write(payload.encode('utf-8'))
payload = compressed.getvalue()
headers = {"Host": HOST, "Content-Type": "application/json", "Content-Encoding":"gzip"}
request = AWSRequest(
method="POST", url=f"https://{HOST}/{path}", data=payload, headers=headers
)
SigV4Auth(boto3.Session().get_credentials(), "es", "us-east-1").add_auth(request)
#p = Process(target=mirror, args=(path,payload)).start()
session = URLLib3Session()
r = session.send(request.prepare())
if r.status_code != 200 and r.status_code != 201:
raise RuntimeError
return json.loads(r.text)
if __name__ == "__main__":
payload = {
"version": "2.0",
"routeKey": "PUT /sondes/telemetry",
"rawPath": "/sondes/telemetry",
"rawQueryString": "",
"headers": {
"accept": "*/*",
"accept-encoding": "gzip, deflate",
"content-length": "2135",
"content-type": "application/json",
"host": "api.v2.sondehub.org",
"user-agent": "autorx-1.4.1-beta4",
"x-amzn-trace-id": "Root=1-6015f571-6aef2e73165042d53fcc317a",
"x-forwarded-for": "103.107.130.22",
"x-forwarded-port": "443",
"x-forwarded-proto": "https",
"date": "Sun, 31 Jan 2021 00:21:45 GMT",
},
"requestContext": {
"accountId": "143841941773",
"apiId": "r03szwwq41",
"domainName": "api.v2.sondehub.org",
"domainPrefix": "api",
"http": {
"method": "PUT",
"path": "/sondes/telemetry",
"protocol": "HTTP/1.1",
"sourceIp": "103.107.130.22",
"userAgent": "autorx-1.4.1-beta4",
},
"requestId": "Z_NJvh0RoAMEJaw=",
"routeKey": "PUT /sondes/telemetry",
"stage": "$default",
"time": "31/Jan/2021:00:10:25 +0000",
"timeEpoch": 1612051825409,
},
"body": """
{"software_name": "radiosonde_auto_rx", "software_version": "1.5.8-beta2", "uploader_callsign": "LZ3DJ-18", "uploader_position": null, "uploader_antenna": "Dipole", "uploader_contact_email": "none@none.com", "mobile": false}
""",
"isBase64Encoded": False,
}
print(lambda_handler(payload, {}))

View File

@ -23,17 +23,11 @@ EOF
max_session_duration = 3600
}
data "archive_file" "sign_socket" {
type = "zip"
source_file = "sign-websocket/lambda_function.py"
output_path = "${path.module}/build/sign_socket.zip"
}
resource "aws_lambda_function" "sign_socket" {
function_name = "sign-websocket"
handler = "lambda_function.lambda_handler"
filename = "${path.module}/build/sign_socket.zip"
source_code_hash = data.archive_file.sign_socket.output_base64sha256
handler = "sign-websocket.lambda_handler"
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
publish = true
memory_size = 128
role = aws_iam_role.sign_socket.arn