From 28ca12367a0107d9359c7ea7ab50178abe0ad918 Mon Sep 17 00:00:00 2001 From: grossmj Date: Sat, 26 Apr 2014 17:51:47 -0600 Subject: [PATCH] Proper server shutdown. --- gns3server/handlers/jsonrpc_websocket.py | 6 +++- gns3server/modules/base.py | 39 ++++++++++++++++++------ gns3server/modules/dynamips/__init__.py | 7 +++-- gns3server/modules/iou/__init__.py | 6 ++-- gns3server/server.py | 39 ++++++++++++++++-------- 5 files changed, 70 insertions(+), 27 deletions(-) diff --git a/gns3server/handlers/jsonrpc_websocket.py b/gns3server/handlers/jsonrpc_websocket.py index fa2b2fe9..ecd91cf7 100644 --- a/gns3server/handlers/jsonrpc_websocket.py +++ b/gns3server/handlers/jsonrpc_websocket.py @@ -126,6 +126,10 @@ class JSONRPCWebSocket(tornado.websocket.WebSocketHandler): """ log.debug("Received Websocket message: {}".format(message)) + + if self.zmq_router.closed: + # no need to proceed, the ZeroMQ router has been closed. + return try: request = json_decode(message) @@ -170,7 +174,7 @@ class JSONRPCWebSocket(tornado.websocket.WebSocketHandler): # Reset the modules if there are no clients anymore # Modules must implement a reset destination - if not self.clients: + if not self.clients and not self.zmq_router.closed: for destination, module in self.destinations.items(): if destination.endswith("reset"): # Route to the correct module diff --git a/gns3server/modules/base.py b/gns3server/modules/base.py index 5e08c053..95d7e35e 100644 --- a/gns3server/modules/base.py +++ b/gns3server/modules/base.py @@ -49,6 +49,7 @@ class IModule(multiprocessing.Process): self._context = None self._ioloop = None self._stream = None + self._dealer = None self._zmq_host = args[0] # ZeroMQ server address self._zmq_port = args[1] # ZeroMQ server port self._current_session = None @@ -72,24 +73,24 @@ class IModule(multiprocessing.Process): :returns: ZMQ stream instance """ - socket = self._context.socket(zmq.DEALER) - socket.setsockopt(zmq.IDENTITY, self.name.encode("utf-8")) + self._dealer = self._context.socket(zmq.DEALER) + self._dealer.setsockopt(zmq.IDENTITY, self.name.encode("utf-8")) if host and port: log.info("ZeroMQ client ({}) connecting to {}:{}".format(self.name, host, port)) try: - socket.connect("tcp://{}:{}".format(host, port)) + self._dealer.connect("tcp://{}:{}".format(host, port)) except zmq.error.ZMQError as e: log.critical("Could not connect to ZeroMQ server on {}:{}, reason: {}".format(host, port, e)) raise SystemExit else: log.info("ZeroMQ client ({}) connecting to ipc:///tmp/gns3.ipc".format(self.name)) try: - socket.connect("ipc:///tmp/gns3.ipc") + self._dealer.connect("ipc:///tmp/gns3.ipc") except zmq.error.ZMQError as e: log.critical("Could not connect to ZeroMQ server on ipc:///tmp/gns3.ipc, reason: {}".format(e)) raise SystemExit - stream = zmq.eventloop.zmqstream.ZMQStream(socket, self._ioloop) + stream = zmq.eventloop.zmqstream.ZMQStream(self._dealer, self._ioloop) if callback: stream.on_recv(callback) return stream @@ -112,7 +113,7 @@ class IModule(multiprocessing.Process): def signal_handler(signum=None, frame=None): log.warning("Module {} got signal {}, exiting...".format(self.name, signum)) - self.stop() + self.stop(signum) signals = [signal.SIGTERM, signal.SIGINT] if not sys.platform.startswith("win"): @@ -131,14 +132,34 @@ class IModule(multiprocessing.Process): log.info("{} module has stopped".format(self.name)) - def stop(self): + def _shutdown(self): """ - Stops the event loop. + Shutdowns the I/O loop and the ZeroMQ stream & socket + """ + + self._ioloop.stop() + + if self._stream and not self._stream.closed: + # close the zeroMQ stream + self._stream.close() + + if self._dealer and not self._dealer.closed: + # close the ZeroMQ dealer socket + self._dealer.close() + + def stop(self, signum=None): + """ + Adds a callback to stop the event loop & ZeroMQ. + + :param signum: signal number (if called by the signal handler) """ if not self._stopping: self._stopping = True - self._ioloop.add_callback_from_signal(self._ioloop.stop) + if signum: + self._ioloop.add_callback_from_signal(self._shutdown) + else: + self._shutdown() def send_response(self, results): """ diff --git a/gns3server/modules/dynamips/__init__.py b/gns3server/modules/dynamips/__init__.py index e7944152..d6e08698 100644 --- a/gns3server/modules/dynamips/__init__.py +++ b/gns3server/modules/dynamips/__init__.py @@ -122,16 +122,19 @@ class Dynamips(IModule): self._callback = self.add_periodic_callback(self._check_hypervisors, 5000) self._callback.start() - def stop(self): + def stop(self, signum=None): """ Properly stops the module. + + :param signum: signal number (if called by the signal handler) """ if not sys.platform.startswith("win32"): self._callback.stop() if self._hypervisor_manager: self._hypervisor_manager.stop_all_hypervisors() - IModule.stop(self) # this will stop the I/O loop + + IModule.stop(self, signum) # this will stop the I/O loop def _check_hypervisors(self): """ diff --git a/gns3server/modules/iou/__init__.py b/gns3server/modules/iou/__init__.py index 5e896e08..f13d7de9 100644 --- a/gns3server/modules/iou/__init__.py +++ b/gns3server/modules/iou/__init__.py @@ -104,9 +104,11 @@ class IOU(IModule): self._iou_callback = self.add_periodic_callback(self._check_iou_is_alive, 5000) self._iou_callback.start() - def stop(self): + def stop(self, signum=None): """ Properly stops the module. + + :param signum: signal number (if called by the signal handler) """ self._iou_callback.stop() @@ -115,7 +117,7 @@ class IOU(IModule): iou_instance = self._iou_instances[iou_id] iou_instance.delete() - IModule.stop(self) # this will stop the I/O loop + IModule.stop(self, signum) # this will stop the I/O loop def _check_iou_is_alive(self): """ diff --git a/gns3server/server.py b/gns3server/server.py index 961dbdea..e8104fc7 100644 --- a/gns3server/server.py +++ b/gns3server/server.py @@ -55,6 +55,9 @@ class Server(object): self._host = host self._port = port + self._router = None + self._stream = None + if ipc: self._zmq_port = 0 # this forces to use IPC for communications with the ZeroMQ server else: @@ -156,13 +159,13 @@ class Server(object): self._cleanup() ioloop = tornado.ioloop.IOLoop.instance() - stream = zmqstream.ZMQStream(router, ioloop) - stream.on_recv_stream(JSONRPCWebSocket.dispatch_message) + self._stream = zmqstream.ZMQStream(router, ioloop) + self._stream.on_recv_stream(JSONRPCWebSocket.dispatch_message) tornado.autoreload.add_reload_hook(functools.partial(self._cleanup, stop=False)) def signal_handler(signum=None, frame=None): log.warning("Server got signal {}, exiting...".format(signum)) - self._cleanup() + self._cleanup(signum) signals = [signal.SIGTERM, signal.SIGINT] if not sys.platform.startswith("win"): @@ -188,10 +191,10 @@ class Server(object): context = zmq.Context() context.linger = 0 - router = context.socket(zmq.ROUTER) + self._router = context.socket(zmq.ROUTER) if self._ipc: try: - router.bind("ipc:///tmp/gns3.ipc") + self._router.bind("ipc:///tmp/gns3.ipc") except zmq.error.ZMQError as e: log.critical("Could not start ZeroMQ server on ipc:///tmp/gns3.ipc, reason: {}".format(e)) self._cleanup() @@ -199,34 +202,41 @@ class Server(object): log.info("ZeroMQ server listening to ipc:///tmp/gns3.ipc") else: try: - router.bind("tcp://127.0.0.1:{}".format(self._zmq_port)) + self._router.bind("tcp://127.0.0.1:{}".format(self._zmq_port)) except zmq.error.ZMQError as e: log.critical("Could not start ZeroMQ server on 127.0.0.1:{}, reason: {}".format(self._zmq_port, e)) self._cleanup() raise SystemExit log.info("ZeroMQ server listening to 127.0.0.1:{}".format(self._zmq_port)) - return router + return self._router def _shutdown(self): """ - Shutdowns the I/O loop. + Shutdowns the I/O loop and the ZeroMQ stream & socket. """ + if self._stream and not self._stream.closed: + # close the ZeroMQ stream + self._stream.close() + + if self._router and not self._router.closed: + # close the ZeroMQ router socket + self._router.close() + ioloop = tornado.ioloop.IOLoop.instance() ioloop.stop() - def _cleanup(self, stop=True): + def _cleanup(self, signum=None, stop=True): """ Shutdowns any running module processes - and close remaining Tornado ioloop file descriptors + and adds a callback to stop the event loop & ZeroMQ + :param signum: signal number (if called by the signal handler) :param stop: stops the ioloop if True (default) """ # terminate all modules for module in self._modules: -# if not sys.platform.startswith("win"): -# module.join(timeout=0.5) if module.is_alive(): log.info("terminating {}".format(module.name)) module.terminate() @@ -234,4 +244,7 @@ class Server(object): if stop: ioloop = tornado.ioloop.IOLoop.instance() - ioloop.add_callback_from_signal(self._shutdown) + if signum: + ioloop.add_callback_from_signal(self._shutdown) + else: + self._shutdown()