diff --git a/gns3server/compute/base_manager.py b/gns3server/compute/base_manager.py index fcf26e7e..9584fe1e 100644 --- a/gns3server/compute/base_manager.py +++ b/gns3server/compute/base_manager.py @@ -460,10 +460,7 @@ class BaseManager: if not data: await asyncio.sleep(0.1) continue - try: - await response.write(data) - except ConnectionError: - break + await response.write(data) except FileNotFoundError: raise aiohttp.web.HTTPNotFound() except PermissionError: diff --git a/gns3server/controller/compute.py b/gns3server/controller/compute.py index 31c59ce1..e36c27fe 100644 --- a/gns3server/controller/compute.py +++ b/gns3server/controller/compute.py @@ -446,13 +446,14 @@ class Compute: msg = json.loads(response.data) action = msg.pop("action") event = msg.pop("event") + project_id = msg.pop("project_id", None) if action == "ping": self._cpu_usage_percent = event["cpu_usage_percent"] self._memory_usage_percent = event["memory_usage_percent"] #FIXME: slow down number of compute events self._controller.notification.controller_emit("compute.updated", self.__json__()) else: - await self._controller.notification.dispatch(action, event, compute_id=self.id) + await self._controller.notification.dispatch(action, event, project_id=project_id, compute_id=self.id) elif response.type == aiohttp.WSMsgType.CLOSED or response.type == aiohttp.WSMsgType.ERROR or response.data is None: self._connected = False break diff --git a/gns3server/controller/drawing.py b/gns3server/controller/drawing.py index c730cd80..58605e6d 100644 --- a/gns3server/controller/drawing.py +++ b/gns3server/controller/drawing.py @@ -185,7 +185,7 @@ class Drawing: data = self.__json__() if not svg_changed: del data["svg"] - self._project.controller.notification.project_emit("drawing.updated", data) + self._project.emit_notification("drawing.updated", data) self._project.dump() def __json__(self, topology_dump=False): diff --git a/gns3server/controller/export_project.py b/gns3server/controller/export_project.py index 9937130e..9afcd906 100644 --- a/gns3server/controller/export_project.py +++ b/gns3server/controller/export_project.py @@ -74,7 +74,7 @@ async def export_project(project, temporary_dir, include_images=False, keep_comp except OSError as e: msg = "Could not export file {}: {}".format(path, e) log.warning(msg) - project.controller.notification.project_emit("log.warning", {"message": msg}) + project.emit_notification("log.warning", {"message": msg}) continue # ignore the .gns3 file if file.endswith(".gns3"): diff --git a/gns3server/controller/link.py b/gns3server/controller/link.py index ab413741..1b2c27a9 100644 --- a/gns3server/controller/link.py +++ b/gns3server/controller/link.py @@ -199,14 +199,14 @@ class Link: self._filters = new_filters if self._created: await self.update() - self._project.controller.notification.project_emit("link.updated", self.__json__()) + self._project.emit_notification("link.updated", self.__json__()) self._project.dump() async def update_suspend(self, value): if value != self._suspended: self._suspended = value await self.update() - self._project.controller.notification.project_emit("link.updated", self.__json__()) + self._project.emit_notification("link.updated", self.__json__()) self._project.dump() @property @@ -269,7 +269,7 @@ class Link: n["node"].add_link(self) n["port"].link = self self._created = True - self._project.controller.notification.project_emit("link.created", self.__json__()) + self._project.emit_notification("link.created", self.__json__()) if dump: self._project.dump() @@ -282,7 +282,7 @@ class Link: label = node_data.get("label") if label: port["label"] = label - self._project.controller.notification.project_emit("link.updated", self.__json__()) + self._project.emit_notification("link.updated", self.__json__()) self._project.dump() async def create(self): @@ -317,7 +317,7 @@ class Link: self._capturing = True self._capture_file_name = capture_file_name - self._project.controller.notification.project_emit("link.updated", self.__json__()) + self._project.emit_notification("link.updated", self.__json__()) async def stop_capture(self): """ @@ -325,7 +325,7 @@ class Link: """ self._capturing = False - self._project.controller.notification.project_emit("link.updated", self.__json__()) + self._project.emit_notification("link.updated", self.__json__()) def pcap_streaming_url(self): """ diff --git a/gns3server/controller/node.py b/gns3server/controller/node.py index 4e0b039c..60e70c01 100644 --- a/gns3server/controller/node.py +++ b/gns3server/controller/node.py @@ -405,7 +405,7 @@ class Node: await self.parse_node_response(response.json) elif old_json != self.__json__(): # We send notif only if object has changed - self.project.controller.notification.project_emit("node.updated", self.__json__()) + self.project.emit_notification("node.updated", self.__json__()) self.project.dump() async def parse_node_response(self, response): @@ -563,13 +563,13 @@ class Node: for directory in images_directories(type): image = os.path.join(directory, img) if os.path.exists(image): - self.project.controller.notification.project_emit("log.info", {"message": "Uploading missing image {}".format(img)}) + self.project.emit_notification("log.info", {"message": "Uploading missing image {}".format(img)}) try: with open(image, 'rb') as f: await self._compute.post("/{}/images/{}".format(self._node_type, os.path.basename(img)), data=f, timeout=None) except OSError as e: raise aiohttp.web.HTTPConflict(text="Can't upload {}: {}".format(image, str(e))) - self.project.controller.notification.project_emit("log.info", {"message": "Upload finished for {}".format(img)}) + self.project.emit_notification("log.info", {"message": "Upload finished for {}".format(img)}) return True return False diff --git a/gns3server/controller/notification.py b/gns3server/controller/notification.py index 97e0883c..7190195d 100644 --- a/gns3server/controller/notification.py +++ b/gns3server/controller/notification.py @@ -33,19 +33,19 @@ class Notification: self._controller_listeners = [] @contextmanager - def project_queue(self, project): + def project_queue(self, project_id): """ Get a queue of notifications Use it with Python with """ queue = NotificationQueue() - self._project_listeners.setdefault(project.id, set()) - self._project_listeners[project.id].add(queue) + self._project_listeners.setdefault(project_id, set()) + self._project_listeners[project_id].add(queue) try: yield queue finally: - self._project_listeners[project.id].remove(queue) + self._project_listeners[project_id].remove(queue) @contextmanager def controller_queue(self): @@ -84,14 +84,14 @@ class Notification: for controller_listener in self._controller_listeners: controller_listener.put_nowait((action, event, {})) - def project_has_listeners(self, project): + def project_has_listeners(self, project_id): """ :param project_id: Project object :returns: True if client listen this project """ - return project.id in self._project_listeners and len(self._project_listeners[project.id]) > 0 + return project_id in self._project_listeners and len(self._project_listeners[project_id]) > 0 - async def dispatch(self, action, event, compute_id): + async def dispatch(self, action, event, project_id, compute_id): """ Notification received from compute node. Send it directly to clients or process it @@ -110,13 +110,13 @@ class Notification: self.project_emit("node.updated", node.__json__()) except (aiohttp.web.HTTPNotFound, aiohttp.web.HTTPForbidden): # Project closing return - elif action == "ping": - event["compute_id"] = compute_id - self.project_emit(action, event) + # elif action == "ping": + # event["compute_id"] = compute_id + # self.project_emit(action, event) else: - self.project_emit(action, event) + self.project_emit(action, event, project_id) - def project_emit(self, action, event): + def project_emit(self, action, event, project_id=None): """ Send a notification to clients scoped by projects @@ -136,8 +136,8 @@ class Notification: except TypeError: # If we receive a mock as an event it will raise TypeError when using json dump pass - if "project_id" in event: - self._send_event_to_project(event["project_id"], action, event) + if "project_id" in event or project_id: + self._send_event_to_project(event.get("project_id", project_id), action, event) else: self._send_event_to_all_projects(action, event) diff --git a/gns3server/controller/project.py b/gns3server/controller/project.py index a8384324..a7407a69 100644 --- a/gns3server/controller/project.py +++ b/gns3server/controller/project.py @@ -122,6 +122,16 @@ class Project: assert self._status != "closed" self.dump() + def emit_notification(self, action, event): + """ + Emit a notification to all clients using this project. + + :param action: Action name + :param event: Event to send + """ + + self.controller.notification.project_emit(action, event, project_id=self.id) + async def update(self, **kwargs): """ Update the project @@ -135,7 +145,7 @@ class Project: # We send notif only if object has changed if old_json != self.__json__(): - self.controller.notification.project_emit("project.updated", self.__json__()) + self.emit_notification("project.updated", self.__json__()) self.dump() # update on computes @@ -533,7 +543,7 @@ class Project: self._project_created_on_compute.add(compute) await node.create() self._nodes[node.id] = node - self.controller.notification.project_emit("node.created", node.__json__()) + self.emit_notification("node.created", node.__json__()) if dump: self.dump() return node @@ -558,7 +568,7 @@ class Project: del self._nodes[node.id] await node.destroy() self.dump() - self.controller.notification.project_emit("node.deleted", node.__json__()) + self.emit_notification("node.deleted", node.__json__()) @open_required def get_node(self, node_id): @@ -623,7 +633,7 @@ class Project: if drawing_id not in self._drawings: drawing = Drawing(self, drawing_id=drawing_id, **kwargs) self._drawings[drawing.id] = drawing - self.controller.notification.project_emit("drawing.created", drawing.__json__()) + self.emit_notification("drawing.created", drawing.__json__()) if dump: self.dump() return drawing @@ -644,7 +654,7 @@ class Project: drawing = self.get_drawing(drawing_id) del self._drawings[drawing.id] self.dump() - self.controller.notification.project_emit("drawing.deleted", drawing.__json__()) + self.emit_notification("drawing.deleted", drawing.__json__()) @open_required async def add_link(self, link_id=None, dump=True): @@ -671,7 +681,7 @@ class Project: if force_delete is False: raise self.dump() - self.controller.notification.project_emit("link.deleted", link.__json__()) + self.emit_notification("link.deleted", link.__json__()) @open_required def get_link(self, link_id): @@ -743,7 +753,7 @@ class Project: self._clean_pictures() self._status = "closed" if not ignore_notification: - self.controller.notification.project_emit("project.closed", self.__json__()) + self.emit_notification("project.closed", self.__json__()) self.reset() def _clean_pictures(self): diff --git a/gns3server/controller/snapshot.py b/gns3server/controller/snapshot.py index c7321047..ebe22b58 100644 --- a/gns3server/controller/snapshot.py +++ b/gns3server/controller/snapshot.py @@ -122,7 +122,7 @@ class Snapshot: except (OSError, PermissionError) as e: raise aiohttp.web.HTTPConflict(text=str(e)) await project.open() - self._project.controller.notification.project_emit("snapshot.restored", self.__json__()) + self._project.emit_notification("snapshot.restored", self.__json__()) return self._project def __json__(self): diff --git a/gns3server/handlers/api/controller/project_handler.py b/gns3server/handlers/api/controller/project_handler.py index 66ff6294..69d45ed0 100644 --- a/gns3server/handlers/api/controller/project_handler.py +++ b/gns3server/handlers/api/controller/project_handler.py @@ -220,24 +220,31 @@ class ProjectHandler: async def notification(request, response): controller = Controller.instance() - project = controller.get_project(request.match_info["project_id"]) - + project_id = request.match_info["project_id"] response.content_type = "application/json" response.set_status(200) response.enable_chunked_encoding() - await response.prepare(request) - with controller.notification.project_queue(project) as queue: - while True: - msg = await queue.get_json(5) - await response.write(("{}\n".format(msg)).encode("utf-8")) + log.info("New client has connected to the notification stream for project ID '{}' (HTTP long-polling method)".format(project_id)) - if project.auto_close: - # To avoid trouble with client connecting disconnecting we sleep few seconds before checking - # if someone else is not connected - await asyncio.sleep(5) - if not controller.notification.project_has_listeners(project): - await project.close() + try: + with controller.notification.project_queue(project_id) as queue: + while True: + msg = await queue.get_json(5) + await response.write(("{}\n".format(msg)).encode("utf-8")) + finally: + log.info("Client has disconnected from notification for project ID '{}' (HTTP long-polling method)".format(project_id)) + try: + project = controller.get_project(project_id) + if project.auto_close: + # To avoid trouble with client connecting disconnecting we sleep few seconds before checking + # if someone else is not connected + await asyncio.sleep(5) + if not controller.notification.project_has_listeners(project.id): + log.info("Project '{}' is automatically closing due to no client listening".format(project.id)) + await project.close() + except aiohttp.web.HTTPNotFound: + pass @Route.get( r"/projects/{project_id}/notifications/ws", @@ -252,31 +259,36 @@ class ProjectHandler: async def notification_ws(request, response): controller = Controller.instance() - project = controller.get_project(request.match_info["project_id"]) - + project_id = request.match_info["project_id"] ws = aiohttp.web.WebSocketResponse() await ws.prepare(request) request.app['websockets'].add(ws) asyncio.ensure_future(process_websocket(ws)) + log.info("New client has connected to the notification stream for project ID '{}' (WebSocket method)".format(project_id)) try: - with controller.notification.project_queue(project) as queue: + with controller.notification.project_queue(project_id) as queue: while True: notification = await queue.get_json(5) if ws.closed: break await ws.send_str(notification) finally: + log.info("Client has disconnected from notification stream for project ID '{}' (WebSocket method)".format(project_id)) if not ws.closed: await ws.close() request.app['websockets'].discard(ws) - - if project.auto_close: - # To avoid trouble with client connecting disconnecting we sleep few seconds before checking - # if someone else is not connected - await asyncio.sleep(5) - if not controller.notification.project_has_listeners(project): - await project.close() + try: + project = controller.get_project(project_id) + if project.auto_close: + # To avoid trouble with client connecting disconnecting we sleep few seconds before checking + # if someone else is not connected + await asyncio.sleep(5) + if not controller.notification.project_has_listeners(project_id): + log.info("Project '{}' is automatically closing due to no client listening".format(project.id)) + await project.close() + except aiohttp.web.HTTPNotFound: + pass return ws @@ -298,9 +310,7 @@ class ProjectHandler: try: with tempfile.TemporaryDirectory() as tmp_dir: - stream = await export_project(project, - tmp_dir, - include_images=bool(int(request.query.get("include_images", "0")))) + stream = await export_project(project, tmp_dir, include_images=bool(int(request.query.get("include_images", "0")))) # We need to do that now because export could failed and raise an HTTP error # that why response start need to be the later possible response.content_type = 'application/gns3project'