change async and remove alt requirement on listeners

This commit is contained in:
Michaela 2021-05-24 16:54:41 +10:00
parent c886065f09
commit 27824549af
2 changed files with 40 additions and 27 deletions

View File

@ -514,7 +514,6 @@ def get_listeners(event, context):
"filter": [
{"match_all": {}},
{"exists": {"field": "uploader_position_elk"},},
{"exists": {"field": "uploader_alt"},},
{"exists": {"field": "uploader_antenna.keyword"},},
{"exists": {"field": "software_name.keyword"},},
{"exists": {"field": "software_version.keyword"},},
@ -563,7 +562,7 @@ def get_listeners(event, context):
.replace(" ", "")
.split(",")[0]
),
"alt": float(listener["1"]["hits"]["hits"][0]["fields"]["uploader_alt"][0]),
"alt": float(listener["1"]["hits"]["hits"][0]["fields"]["uploader_alt"][0]) if "uploader_alt" in listener["1"]["hits"]["hits"][0]["fields"] else 0,
"description": f"""\n
<font size=\"-2\"><BR>\n
<B>Radio: {html.escape(listener["1"]["hits"]["hits"][0]["fields"]["software_name.keyword"][0])}-{html.escape(listener["1"]["hits"]["hits"][0]["fields"]["software_version.keyword"][0])}</B><BR>\n
@ -601,21 +600,21 @@ if __name__ == "__main__":
# max_positions: 0
# position_id: 0
# vehicles: RS_*;*chase
print(
datanew(
{
"queryStringParameters": {
"type": "positions",
"mode": "12hours",
"position_id": "0",
"vehicles": ""
}
},
{},
)
)
# print(
# get_listeners(
# {},{}
# datanew(
# {
# "queryStringParameters": {
# "type": "positions",
# "mode": "12hours",
# "position_id": "0",
# "vehicles": ""
# }
# },
# {},
# )
# )
print(
get_listeners(
{},{}
)
)

View File

@ -16,6 +16,18 @@ import os
import asyncio
import aioboto3
import asyncio
from functools import wraps, partial
def async_wrap(func):
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run
# this needs a bunch of refactor but the general approach is
# connect to mqtt via websockets during init
# if we detect that we are disconnected then reconnect
@ -36,7 +48,7 @@ event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
io.init_logging(io.LogLevel.Error, "stderr")
#io.init_logging(io.LogLevel.Error, "stderr")
def connect():
@ -60,13 +72,19 @@ def connect():
connect()
sns = boto3.client("sns",region_name="us-east-1")
sns.meta.events.register('request-created.dynamodb', set_connection_header)
@async_wrap
def post(payload):
sns.publish(
TopicArn=os.getenv("SNS_TOPIC"),
Message=json.dumps(payload)
)
async def upload(event, context):
global connect_future, mqtt_connection
async with aioboto3.client("sns",region_name="us-east-1") as sns:
sns.meta.events.register('request-created.dynamodb', set_connection_header)
global connect_future, mqtt_connection
tasks = []
# Future.result() waits until a result is available
try:
@ -109,16 +127,12 @@ async def upload(event, context):
payload["uploader_position"][2],
f"{payload['uploader_position'][0]},{payload['uploader_position'][1]}",
)
tasks.append(sns.publish(
TopicArn=os.getenv("SNS_TOPIC"),
Message=json.dumps(payload)
))
tasks.append(post(payload))
(msg, x) = mqtt_connection.publish(
topic=f'sondes/{payload["serial"]}',
payload=json.dumps(payload),
qos=mqtt.QoS.AT_MOST_ONCE,
)
try:
msg.result()
except (RuntimeError, AwsCrtError):