Move notifications under controller router

This commit is contained in:
grossmj
2023-09-02 18:57:53 +07:00
parent 5155aea923
commit a358369606
3 changed files with 66 additions and 95 deletions

View File

@ -18,9 +18,12 @@ import asyncio
import signal
import os
from fastapi import APIRouter, Depends, Request, Response, status
from fastapi import APIRouter, Request, Depends, WebSocket, WebSocketDisconnect, status
from fastapi.responses import StreamingResponse
from fastapi.encoders import jsonable_encoder
from fastapi.routing import Mount
from websockets.exceptions import ConnectionClosed, WebSocketException
from typing import List
from gns3server.config import Config
@ -29,7 +32,7 @@ from gns3server.version import __version__
from gns3server.controller.controller_error import ControllerError, ControllerForbiddenError
from gns3server import schemas
from .dependencies.authentication import get_current_active_user
from .dependencies.authentication import get_current_active_user, get_current_active_user_from_websocket
import logging
@ -174,6 +177,57 @@ async def statistics() -> List[dict]:
return compute_statistics
@router.get("/notifications", dependencies=[Depends(get_current_active_user)])
async def controller_http_notifications(request: Request) -> StreamingResponse:
"""
Receive controller notifications about the controller from HTTP stream.
"""
from gns3server.api.server import app
log.info(f"New client {request.client.host}:{request.client.port} has connected to controller HTTP "
f"notification stream")
async def event_stream():
try:
with Controller.instance().notification.controller_queue() as queue:
while not app.state.exiting:
msg = await queue.get_json(5)
yield f"{msg}\n".encode("utf-8")
finally:
log.info(f"Client {request.client.host}:{request.client.port} has disconnected from controller HTTP "
f"notification stream")
return StreamingResponse(event_stream(), media_type="application/json")
@router.websocket("/notifications/ws")
async def controller_ws_notifications(
websocket: WebSocket,
current_user: schemas.User = Depends(get_current_active_user_from_websocket)
) -> None:
"""
Receive project notifications about the controller from WebSocket.
"""
if current_user is None:
return
log.info(f"New client {websocket.client.host}:{websocket.client.port} has connected to controller WebSocket")
try:
with Controller.instance().notification.controller_queue() as queue:
while True:
notification = await queue.get_json(5)
await websocket.send_text(notification)
except (ConnectionClosed, WebSocketDisconnect):
log.info(f"Client {websocket.client.host}:{websocket.client.port} has disconnected from controller WebSocket")
except WebSocketException as e:
log.warning(f"Error while sending to controller event to WebSocket client: {e}")
finally:
try:
await websocket.close()
except OSError:
pass # ignore OSError: [Errno 107] Transport endpoint is not connected
# @Route.post(
# r"/debug",
# description="Dump debug information to disk (debug directory in config directory). Work only for local server",