From 31a2cb998db48da3c1958871c5c3ae4751580798 Mon Sep 17 00:00:00 2001 From: grossmj Date: Sun, 17 Nov 2024 14:39:22 +1000 Subject: [PATCH] Fix issue with asyncio.Queue which is not thread safe. --- gns3server/compute/notification_manager.py | 5 +++-- gns3server/controller/notification.py | 9 +++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/gns3server/compute/notification_manager.py b/gns3server/compute/notification_manager.py index 82388b0a..b18e92d2 100644 --- a/gns3server/compute/notification_manager.py +++ b/gns3server/compute/notification_manager.py @@ -15,7 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . - +import asyncio from contextlib import contextmanager from gns3server.utils.notification_queue import NotificationQueue @@ -28,6 +28,7 @@ class NotificationManager: def __init__(self): self._listeners = set() + self._loop = asyncio.get_event_loop() @contextmanager def queue(self): @@ -54,7 +55,7 @@ class NotificationManager: """ for listener in self._listeners: - listener.put_nowait((action, event, kwargs)) + self._loop.call_soon_threadsafe(listener.put_nowait, (action, event, kwargs)) @staticmethod def reset(): diff --git a/gns3server/controller/notification.py b/gns3server/controller/notification.py index 48d5a3d2..4fa25e76 100644 --- a/gns3server/controller/notification.py +++ b/gns3server/controller/notification.py @@ -15,7 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os +import asyncio from contextlib import contextmanager from gns3server.utils.notification_queue import NotificationQueue @@ -32,6 +32,7 @@ class Notification: self._controller = controller self._project_listeners = {} self._controller_listeners = set() + self._loop = asyncio.get_event_loop() @contextmanager def project_queue(self, project_id): @@ -73,7 +74,7 @@ class Notification: """ for controller_listener in self._controller_listeners: - controller_listener.put_nowait((action, event, {})) + self._loop.call_soon_threadsafe(controller_listener.put_nowait, (action, event, {})) def project_has_listeners(self, project_id): """ @@ -134,7 +135,7 @@ class Notification: except KeyError: return for listener in project_listeners: - listener.put_nowait((action, event, {})) + self._loop.call_soon_threadsafe(listener.put_nowait, (action, event, {})) def _send_event_to_all_projects(self, action, event): """ @@ -146,4 +147,4 @@ class Notification: """ for project_listeners in self._project_listeners.values(): for listener in project_listeners: - listener.put_nowait((action, event, {})) + self._loop.call_soon_threadsafe(listener.put_nowait, (action, event, {}))