From e9ec5c8a37e35857a76e3db80f0b13d812a69097 Mon Sep 17 00:00:00 2001 From: Julien Duponchelle Date: Wed, 4 Mar 2015 16:01:56 +0100 Subject: [PATCH] A notification stream with process monitoring --- gns3server/handlers/api/project_handler.py | 56 ++++++++++++++++++++- gns3server/modules/base_vm.py | 13 +++++ gns3server/modules/dynamips/hypervisor.py | 10 ++++ gns3server/modules/dynamips/nodes/router.py | 17 ++++++- gns3server/modules/iou/iou_vm.py | 42 +++++++++++----- gns3server/modules/iou/ioucon.py | 1 - gns3server/modules/project.py | 26 ++++++++++ gns3server/modules/qemu/qemu_vm.py | 22 ++++++-- gns3server/modules/vpcs/vpcs_vm.py | 18 ++++++- gns3server/schemas/vpcs.py | 6 ++- gns3server/utils/asyncio.py | 18 +++++++ tests/handlers/api/test_project.py | 36 +++++++++++++ tests/handlers/api/test_vpcs.py | 4 +- tests/modules/vpcs/test_vpcs_vm.py | 10 ++++ tox.ini | 2 +- 15 files changed, 256 insertions(+), 25 deletions(-) diff --git a/gns3server/handlers/api/project_handler.py b/gns3server/handlers/api/project_handler.py index e31d32bc..74744a45 100644 --- a/gns3server/handlers/api/project_handler.py +++ b/gns3server/handlers/api/project_handler.py @@ -15,14 +15,23 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import asyncio +import json + from ...web.route import Route from ...schemas.project import PROJECT_OBJECT_SCHEMA, PROJECT_CREATE_SCHEMA, PROJECT_UPDATE_SCHEMA from ...modules.project_manager import ProjectManager from ...modules import MODULES +import logging +log = logging.getLogger() + class ProjectHandler: + # How many clients has subcribe to notifications + _notifications_listening = 0 + @classmethod @Route.post( r"/projects", @@ -123,8 +132,9 @@ class ProjectHandler: pm = ProjectManager.instance() project = pm.get_project(request.match_info["project_id"]) - yield from project.close() - pm.remove_project(project.id) + if ProjectHandler._notifications_listening == 0: + yield from project.close() + pm.remove_project(project.id) response.set_status(204) @classmethod @@ -145,3 +155,45 @@ class ProjectHandler: yield from project.delete() pm.remove_project(project.id) response.set_status(204) + + @classmethod + @Route.get( + r"/projects/{project_id}/notifications", + description="Receive notifications about the projects", + parameters={ + "project_id": "The UUID of the project", + }, + status_codes={ + 200: "End of stream", + 404: "The project doesn't exist" + }) + def notification(request, response): + + pm = ProjectManager.instance() + project = pm.get_project(request.match_info["project_id"]) + + response.content_type = "application/json" + response.set_status(200) + response.enable_chunked_encoding() + # Very important: do not send a content lenght otherwise QT close the connection but curl can consume the Feed + response.content_length = None + + response.start(request) + queue = project.get_listen_queue() + ProjectHandler._notifications_listening += 1 + response.write("{\"action\": \"ping\"}\n".encode("utf-8")) + while True: + try: + (action, msg) = yield from asyncio.wait_for(queue.get(), 5) + if hasattr(msg, "__json__"): + msg = json.dumps({"action": action, "event": msg.__json__()}, sort_keys=True) + else: + msg = json.dumps({"action": action, "event": msg}, sort_keys=True) + log.debug("Send notification: %s", msg) + response.write(("{}\n".format(msg)).encode("utf-8")) + except asyncio.futures.CancelledError as e: + break + except asyncio.futures.TimeoutError as e: + response.write("{\"action\": \"ping\"}\n".encode("utf-8")) + project.stop_listen_queue(queue) + ProjectHandler._notifications_listening -= 1 diff --git a/gns3server/modules/base_vm.py b/gns3server/modules/base_vm.py index ffea595e..f9dbc2df 100644 --- a/gns3server/modules/base_vm.py +++ b/gns3server/modules/base_vm.py @@ -48,6 +48,7 @@ class BaseVM: self._manager = manager self._console = console self._temporary_directory = None + self._vm_status = "stopped" if self._console is not None: self._console = self._manager.port_manager.reserve_tcp_port(self._console, self._project) @@ -66,6 +67,18 @@ class BaseVM: if os.path.exists(self._temporary_directory): shutil.rmtree(self._temporary_directory, ignore_errors=True) + @property + def status(self): + """Return current VM status""" + + return self._vm_status + + @status.setter + def status(self, status): + + self._vm_status = status + self._project.emit("vm.{}".format(status), self) + @property def project(self): """ diff --git a/gns3server/modules/dynamips/hypervisor.py b/gns3server/modules/dynamips/hypervisor.py index a568ff51..10349019 100644 --- a/gns3server/modules/dynamips/hypervisor.py +++ b/gns3server/modules/dynamips/hypervisor.py @@ -70,6 +70,16 @@ class Hypervisor(DynamipsHypervisor): return self._id + @property + def process(self): + """ + Returns the subprocess of the Hypervisor + + :returns: subprocess + """ + + return self._process + @property def started(self): """ diff --git a/gns3server/modules/dynamips/nodes/router.py b/gns3server/modules/dynamips/nodes/router.py index 77045290..adb1a1ef 100644 --- a/gns3server/modules/dynamips/nodes/router.py +++ b/gns3server/modules/dynamips/nodes/router.py @@ -35,7 +35,8 @@ from ...base_vm import BaseVM from ..dynamips_error import DynamipsError from ..nios.nio_udp import NIOUDP -from gns3server.utils.asyncio import wait_run_in_executor +from gns3server.config import Config +from gns3server.utils.asyncio import wait_run_in_executor, monitor_process class Router(BaseVM): @@ -162,7 +163,7 @@ class Router(BaseVM): slot_number += 1 # add the wics - if self._slots[0] and self._slots[0].wics: + if len(self._slots) > 0 and self._slots[0] and self._slots[0].wics: for wic_slot_number in range(0, len(self._slots[0].wics)): if self._slots[0].wics[wic_slot_number]: router_info["wic" + str(wic_slot_number)] = str(self._slots[0].wics[wic_slot_number]) @@ -251,7 +252,18 @@ class Router(BaseVM): raise DynamipsError('"{}" is not a valid IOS image'.format(self._image)) yield from self._hypervisor.send('vm start "{name}"'.format(name=self._name)) + self.status = "started" log.info('router "{name}" [{id}] has been started'.format(name=self._name, id=self._id)) + monitor_process(self._hypervisor.process, self._termination_callback) + + @asyncio.coroutine + def _termination_callback(self, returncode): + """ + Called when the process is killed + + :param returncode: Process returncode + """ + self.status = "stopped" @asyncio.coroutine def stop(self): @@ -262,6 +274,7 @@ class Router(BaseVM): status = yield from self.get_status() if status != "inactive": yield from self._hypervisor.send('vm stop "{name}"'.format(name=self._name)) + self.status = "stopped" log.info('Router "{name}" [{id}] has been stopped'.format(name=self._name, id=self._id)) @asyncio.coroutine diff --git a/gns3server/modules/iou/iou_vm.py b/gns3server/modules/iou/iou_vm.py index b7214bd4..3e86e8aa 100644 --- a/gns3server/modules/iou/iou_vm.py +++ b/gns3server/modules/iou/iou_vm.py @@ -465,6 +465,8 @@ class IOUVM(BaseVM): env=env) log.info("IOU instance {} started PID={}".format(self._id, self._iou_process.pid)) self._started = True + self.status = "started" + gns3server.utils.asyncio.monitor_process(self._iou_process, self._termination_callback) except FileNotFoundError as e: raise IOUError("Could not start IOU: {}: 32-bit binary support is probably not installed".format(e)) except (OSError, subprocess.SubprocessError) as e: @@ -477,6 +479,17 @@ class IOUVM(BaseVM): # connections support yield from self._start_iouyap() + def _termination_callback(self, returncode): + """ + Called when the process is killed + + :param returncode: Process returncode + """ + log.info("IOU process crash return code: %d", returncode) + self._terminate_process_iou() + self._terminate_process_iouyap() + self._ioucon_thread_stop_event.set() + def _rename_nvram_file(self): """ Before starting the VM, rename the nvram and vlan.dat files with the correct IOU application identifier. @@ -507,6 +520,7 @@ class IOUVM(BaseVM): stderr=subprocess.STDOUT, cwd=self.working_dir) + gns3server.utils.asyncio.monitor_process(self._iouyap_process, self._termination_callback) log.info("iouyap started PID={}".format(self._iouyap_process.pid)) except (OSError, subprocess.SubprocessError) as e: iouyap_stdout = self.read_iouyap_stdout() @@ -615,24 +629,28 @@ class IOUVM(BaseVM): Terminate the IOUYAP process if running. """ - log.info('Stopping IOUYAP process for IOU VM "{}" PID={}'.format(self.name, self._iouyap_process.pid)) - try: - self._iouyap_process.terminate() - # Sometime the process may already be dead when we garbage collect - except ProcessLookupError: - pass + if self._iouyap_process: + log.info('Stopping IOUYAP process for IOU VM "{}" PID={}'.format(self.name, self._iouyap_process.pid)) + try: + self._iouyap_process.terminate() + # Sometime the process can already be dead when we garbage collect + except ProcessLookupError: + pass def _terminate_process_iou(self): """ Terminate the IOU process if running """ - log.info('Stopping IOU process for IOU VM "{}" PID={}'.format(self.name, self._iou_process.pid)) - try: - self._iou_process.terminate() - # Sometime the process may already be dead when we garbage collect - except ProcessLookupError: - pass + if self._iou_process: + log.info('Stopping IOU process for IOU VM "{}" PID={}'.format(self.name, self._iou_process.pid)) + try: + self._iou_process.terminate() + # Sometime the process can already be dead when we garbage collect + except ProcessLookupError: + pass + self._started = False + self.status = "stopped" @asyncio.coroutine def reload(self): diff --git a/gns3server/modules/iou/ioucon.py b/gns3server/modules/iou/ioucon.py index 764e81e8..475109b6 100644 --- a/gns3server/modules/iou/ioucon.py +++ b/gns3server/modules/iou/ioucon.py @@ -541,7 +541,6 @@ def send_recv_loop(epoll, console, router, esc_char, stop_event): else: router.write(buf) finally: - log.debug("Finally") router.unregister(epoll) console.unregister(epoll) diff --git a/gns3server/modules/project.py b/gns3server/modules/project.py index edb1b6cf..8639b8ce 100644 --- a/gns3server/modules/project.py +++ b/gns3server/modules/project.py @@ -65,6 +65,9 @@ class Project: self._used_tcp_ports = set() self._used_udp_ports = set() + # List of clients listen for notifications + self._listeners = set() + if path is None: path = os.path.join(self._location, self._id) try: @@ -416,3 +419,26 @@ class Project: # We import it at the last time to avoid circular dependencies from ..modules import MODULES return MODULES + + def emit(self, action, event): + """ + Send an event to all the client listens for notifications + + :param action: Action happened + :param event: Event sended to the client + """ + for listener in self._listeners: + listener.put_nowait((action, event, )) + + def get_listen_queue(self): + """Get a queue where you receive all the events related to the + project.""" + + queue = asyncio.Queue() + self._listeners.add(queue) + return queue + + def stop_listen_queue(self, queue): + """Stop sending notification to this clients""" + + self._listeners.remove(queue) diff --git a/gns3server/modules/qemu/qemu_vm.py b/gns3server/modules/qemu/qemu_vm.py index 85788a80..cfcbf2c8 100644 --- a/gns3server/modules/qemu/qemu_vm.py +++ b/gns3server/modules/qemu/qemu_vm.py @@ -35,6 +35,8 @@ from ..nios.nio_udp import NIOUDP from ..nios.nio_tap import NIOTAP from ..base_vm import BaseVM from ...schemas.qemu import QEMU_OBJECT_SCHEMA +from ...utils.asyncio import monitor_process +from ...config import Config import logging log = logging.getLogger(__name__) @@ -62,7 +64,6 @@ class QemuVM(BaseVM): self._host = server_config.get("host", "127.0.0.1") self._monitor_host = server_config.get("monitor_host", "127.0.0.1") self._command = [] - self._started = False self._process = None self._cpulimit_process = None self._monitor = None @@ -581,7 +582,9 @@ class QemuVM(BaseVM): stderr=subprocess.STDOUT, cwd=self.working_dir) log.info('QEMU VM "{}" started PID={}'.format(self._name, self._process.pid)) - self._started = True + + self.status = "started" + monitor_process(self._process, self._termination_callback) except (OSError, subprocess.SubprocessError) as e: stdout = self.read_stdout() log.error("Could not start QEMU {}: {}\n{}".format(self.qemu_path, e, stdout)) @@ -591,6 +594,17 @@ class QemuVM(BaseVM): if self._cpu_throttling: self._set_cpu_throttling() + def _termination_callback(self, returncode): + """ + Called when the process is killed + + :param returncode: Process returncode + """ + if self.started: + log.info("Process Qemu is dead. Return code: %d", returncode) + self.status = "stopped" + self._process = None + @asyncio.coroutine def stop(self): """ @@ -608,7 +622,7 @@ class QemuVM(BaseVM): if self._process.returncode is None: log.warn('QEMU VM "{}" PID={} is still running'.format(self._name, self._process.pid)) self._process = None - self._started = False + self.status = "stopped" self._stop_cpulimit() @asyncio.coroutine @@ -807,7 +821,7 @@ class QemuVM(BaseVM): :returns: boolean """ - return self._started + return self.status == "started" def read_stdout(self): """ diff --git a/gns3server/modules/vpcs/vpcs_vm.py b/gns3server/modules/vpcs/vpcs_vm.py index cb7229d7..ddb97865 100644 --- a/gns3server/modules/vpcs/vpcs_vm.py +++ b/gns3server/modules/vpcs/vpcs_vm.py @@ -35,7 +35,7 @@ from ..adapters.ethernet_adapter import EthernetAdapter from ..nios.nio_udp import NIOUDP from ..nios.nio_tap import NIOTAP from ..base_vm import BaseVM -from ...utils.asyncio import subprocess_check_output +from ...utils.asyncio import subprocess_check_output, monitor_process import logging @@ -109,6 +109,7 @@ class VPCSVM(BaseVM): return {"name": self.name, "vm_id": self.id, + "status": self.status, "console": self._console, "project_id": self.project.id, "startup_script": self.startup_script, @@ -233,13 +234,27 @@ class VPCSVM(BaseVM): stderr=subprocess.STDOUT, cwd=self.working_dir, creationflags=flags) + monitor_process(self._process, self._termination_callback) log.info("VPCS instance {} started PID={}".format(self.name, self._process.pid)) self._started = True + self.status = "started" except (OSError, subprocess.SubprocessError) as e: vpcs_stdout = self.read_vpcs_stdout() log.error("Could not start VPCS {}: {}\n{}".format(self.vpcs_path, e, vpcs_stdout)) raise VPCSError("Could not start VPCS {}: {}\n{}".format(self.vpcs_path, e, vpcs_stdout)) + def _termination_callback(self, returncode): + """ + Called when the process is killed + + :param returncode: Process returncode + """ + if self._started: + log.info("Process VPCS is dead. Return code: %d", returncode) + self._started = False + self.status = "stopped" + self._process = None + @asyncio.coroutine def stop(self): """ @@ -258,6 +273,7 @@ class VPCSVM(BaseVM): self._process = None self._started = False + self.status = "stopped" @asyncio.coroutine def reload(self): diff --git a/gns3server/schemas/vpcs.py b/gns3server/schemas/vpcs.py index 05d60d98..c2dba3d9 100644 --- a/gns3server/schemas/vpcs.py +++ b/gns3server/schemas/vpcs.py @@ -92,6 +92,10 @@ VPCS_OBJECT_SCHEMA = { "maxLength": 36, "pattern": "^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$" }, + "status": { + "description": "VM status", + "enum": ["started", "stopped"] + }, "console": { "description": "console TCP port", "minimum": 1, @@ -115,5 +119,5 @@ VPCS_OBJECT_SCHEMA = { }, }, "additionalProperties": False, - "required": ["name", "vm_id", "console", "project_id", "startup_script_path"] + "required": ["name", "vm_id", "status", "console", "project_id", "startup_script_path"] } diff --git a/gns3server/utils/asyncio.py b/gns3server/utils/asyncio.py index c84e0fbe..7c9cc606 100644 --- a/gns3server/utils/asyncio.py +++ b/gns3server/utils/asyncio.py @@ -17,6 +17,8 @@ import asyncio +import shutil +import sys @asyncio.coroutine @@ -76,3 +78,19 @@ def wait_for_process_termination(process, timeout=10): yield from asyncio.sleep(0.1) timeout -= 0.1 raise asyncio.TimeoutError() + + +@asyncio.coroutine +def _check_process(process, termination_callback): + if not hasattr(sys, "_called_from_test") or not sys._called_from_test: + returncode = yield from process.wait() + if asyncio.iscoroutinefunction(termination_callback): + yield from termination_callback(returncode) + else: + termination_callback(returncode) + + +def monitor_process(process, termination_callback): + """Call termination_callback when process die""" + + asyncio.async(_check_process(process, termination_callback)) diff --git a/tests/handlers/api/test_project.py b/tests/handlers/api/test_project.py index cd0bc419..604ec756 100644 --- a/tests/handlers/api/test_project.py +++ b/tests/handlers/api/test_project.py @@ -20,9 +20,13 @@ This test suite check /project endpoint """ import uuid +import asyncio +import aiohttp from unittest.mock import patch from tests.utils import asyncio_patch +from gns3server.handlers.api.project_handler import ProjectHandler + def test_create_project_with_path(server, tmpdir): with patch("gns3server.modules.project.Project.is_local", return_value=True): @@ -139,6 +143,38 @@ def test_close_project(server, project): assert mock.called +def test_close_project_two_client_connected(server, project): + + ProjectHandler._notifications_listening = 2 + + with asyncio_patch("gns3server.modules.project.Project.close", return_value=True) as mock: + response = server.post("/projects/{project_id}/close".format(project_id=project.id), example=True) + assert response.status == 204 + assert not mock.called + + def test_close_project_invalid_uuid(server): response = server.post("/projects/{project_id}/close".format(project_id=uuid.uuid4())) assert response.status == 404 + + +def test_notification(server, project, loop): + @asyncio.coroutine + def go(future): + response = yield from aiohttp.request("GET", server.get_url("/projects/{project_id}/notifications".format(project_id=project.id), 1)) + response.body = yield from response.content.read(19) + project.emit("vm.created", {"a": "b"}) + response.body += yield from response.content.read(47) + response.close() + future.set_result(response) + + future = asyncio.Future() + asyncio.async(go(future)) + response = loop.run_until_complete(future) + assert response.status == 200 + assert response.body == b'{"action": "ping"}\n{"action": "vm.created", "event": {"a": "b"}}\n' + + +def test_notification_invalid_id(server, project): + response = server.get("/projects/{project_id}/notifications".format(project_id=uuid.uuid4())) + assert response.status == 404 diff --git a/tests/handlers/api/test_vpcs.py b/tests/handlers/api/test_vpcs.py index 473a533e..5970306e 100644 --- a/tests/handlers/api/test_vpcs.py +++ b/tests/handlers/api/test_vpcs.py @@ -42,7 +42,8 @@ def test_vpcs_get(server, project, vm): assert response.route == "/projects/{project_id}/vpcs/vms/{vm_id}" assert response.json["name"] == "PC TEST 1" assert response.json["project_id"] == project.id - assert response.json["startup_script_path"] == None + assert response.json["startup_script_path"] is None + assert response.json["status"] == "stopped" def test_vpcs_create_startup_script(server, project): @@ -95,6 +96,7 @@ def test_vpcs_delete_nio(server, vm): def test_vpcs_start(server, vm): + with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM.start", return_value=True) as mock: response = server.post("/projects/{project_id}/vpcs/vms/{vm_id}/start".format(project_id=vm["project_id"], vm_id=vm["vm_id"]), example=True) assert mock.called diff --git a/tests/modules/vpcs/test_vpcs_vm.py b/tests/modules/vpcs/test_vpcs_vm.py index c16e10f0..a33d4397 100644 --- a/tests/modules/vpcs/test_vpcs_vm.py +++ b/tests/modules/vpcs/test_vpcs_vm.py @@ -72,6 +72,7 @@ def test_vm_invalid_vpcs_path(project, manager, loop): def test_start(loop, vm): process = MagicMock() process.returncode = None + queue = vm.project.get_listen_queue() with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True): with asyncio_patch("asyncio.create_subprocess_exec", return_value=process): @@ -79,6 +80,9 @@ def test_start(loop, vm): vm.port_add_nio_binding(0, nio) loop.run_until_complete(asyncio.async(vm.start())) assert vm.is_running() + (action, event) = queue.get_nowait() + assert action == "vm.started" + assert event == vm def test_stop(loop, vm): @@ -98,6 +102,8 @@ def test_stop(loop, vm): loop.run_until_complete(asyncio.async(vm.start())) assert vm.is_running() + queue = vm.project.get_listen_queue() + with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"): loop.run_until_complete(asyncio.async(vm.stop())) assert vm.is_running() is False @@ -107,6 +113,10 @@ def test_stop(loop, vm): else: process.terminate.assert_called_with() + (action, event) = queue.get_nowait() + assert action == "vm.stopped" + assert event == vm + def test_reload(loop, vm): process = MagicMock() diff --git a/tox.ini b/tox.ini index 155cefab..8c8c31ae 100644 --- a/tox.ini +++ b/tox.ini @@ -10,4 +10,4 @@ ignore = E501,E402 [pytest] norecursedirs = old_tests .tox -timeout = 2 +timeout = 5