relay SignalR integrations through a storage queue (#1100)

The SignalR integration from Azure Functions does not have automatic retry.  When the SignalR instance has issues, all other APIs fail.

To make the service resilient to SignalR outages, this bounces SignalR events through an Azure Storage queue.

NOTE: This PR does not remove the integration from all of the functions.  That is intended to be done as a follow-on PR.
This commit is contained in:
bmc-msft
2021-07-22 14:10:20 -04:00
committed by GitHub
parent ee3d0871f2
commit 5be9c4dcee
4 changed files with 45 additions and 19 deletions

View File

@ -3,36 +3,31 @@
# Copyright (c) Microsoft Corporation. # Copyright (c) Microsoft Corporation.
# Licensed under the MIT License. # Licensed under the MIT License.
import json
import logging import logging
from queue import Empty, Queue from typing import List, Optional
from typing import Optional
from onefuzztypes.events import Event, EventMessage, EventType, get_event_type from onefuzztypes.events import Event, EventMessage, EventType, get_event_type
from onefuzztypes.models import UserInfo from onefuzztypes.models import UserInfo
from pydantic import BaseModel from pydantic import BaseModel
from .azure.creds import get_instance_id, get_instance_name from .azure.creds import get_instance_id, get_instance_name
from .azure.queue import send_message
from .azure.storage import StorageType
from .webhooks import Webhook from .webhooks import Webhook
EVENTS: Queue = Queue()
class SignalREvent(BaseModel):
target: str
arguments: List[EventMessage]
def queue_signalr_event(event_message: EventMessage) -> None:
message = SignalREvent(target="events", arguments=[event_message]).json().encode()
send_message("signalr-events", message, StorageType.config)
def get_events() -> Optional[str]: def get_events() -> Optional[str]:
events = [] return None
for _ in range(5):
try:
event = EVENTS.get(block=False)
events.append(json.loads(event))
EVENTS.task_done()
except Empty:
break
if events:
return json.dumps({"target": "events", "arguments": events})
else:
return None
def log_event(event: Event, event_type: EventType) -> None: def log_event(event: Event, event_type: EventType) -> None:
@ -85,6 +80,6 @@ def send_event(event: Event) -> None:
if event_message.event != event: if event_message.event != event:
event_message.event = event.copy(deep=True) event_message.event = event.copy(deep=True)
EVENTS.put(event_message.json()) queue_signalr_event(event_message)
Webhook.send_event(event_message) Webhook.send_event(event_message)
log_event(event, event_type) log_event(event, event_type)

View File

@ -0,0 +1,12 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import azure.functions as func
def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:
body = msg.get_body().decode()
dashboard.set(body)

View File

@ -0,0 +1,18 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "msg",
"type": "queueTrigger",
"direction": "in",
"queueName": "signalr-events",
"connection": "AzureWebJobsStorage"
},
{
"type": "signalR",
"direction": "out",
"name": "dashboard",
"hubName": "dashboard"
}
]
}

View File

@ -584,6 +584,7 @@ class Client:
"proxy", "proxy",
"update-queue", "update-queue",
"webhooks", "webhooks",
"signalr-events",
]: ]:
try: try:
client.create_queue(queue) client.create_queue(queue)