From e28079096ee01c41d1dff2802a6f140ab7cee938 Mon Sep 17 00:00:00 2001 From: grossmj Date: Sat, 21 Dec 2013 17:34:51 -0700 Subject: [PATCH] JSON-RPC over Websockets implementation. --- README.rst | 2 +- gns3server/handlers/jsonrpc_websocket.py | 148 ++++++++++++++++++ gns3server/jsonrpc.py | 182 +++++++++++++++++++++++ gns3server/main.py | 2 +- gns3server/modules/base.py | 83 +++++++++-- gns3server/server.py | 14 +- tests/test_jsonrpc.py | 91 ++++++++++++ tests/test_stomp.py | 6 +- 8 files changed, 500 insertions(+), 28 deletions(-) create mode 100644 gns3server/handlers/jsonrpc_websocket.py create mode 100644 gns3server/jsonrpc.py create mode 100644 tests/test_jsonrpc.py diff --git a/README.rst b/README.rst index 0aacb51e..97f6078f 100644 --- a/README.rst +++ b/README.rst @@ -2,4 +2,4 @@ GNS3-server =========== GNS3 server manages emulators such as Dynamips, VirtualBox or Qemu/KVM. -Clients like the GNS3 GUI controls the server using a HTTP REST API. +Clients like the GNS3 GUI controls the server using an API over Websockets. diff --git a/gns3server/handlers/jsonrpc_websocket.py b/gns3server/handlers/jsonrpc_websocket.py new file mode 100644 index 00000000..59aa6352 --- /dev/null +++ b/gns3server/handlers/jsonrpc_websocket.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 GNS3 Technologies Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +JSON-RPC protocol over Websockets. +""" + +import zmq +import uuid +import tornado.websocket +from tornado.escape import json_decode +from ..jsonrpc import JSONRPCParseError, JSONRPCInvalidRequest, JSONRPCMethodNotFound + +import logging +log = logging.getLogger(__name__) + + +class JSONRPCWebSocket(tornado.websocket.WebSocketHandler): + """ + STOMP protocol over Tornado Websockets with message + routing to ZeroMQ dealer clients. + + :param application: Tornado Application object + :param request: Tornado Request object + :param zmq_router: ZeroMQ router socket + """ + + clients = set() + destinations = {} + version = 2.0 # only JSON-RPC version 2.0 is supported + + def __init__(self, application, request, zmq_router): + tornado.websocket.WebSocketHandler.__init__(self, application, request) + self._session_id = str(uuid.uuid4()) + self.zmq_router = zmq_router + + @property + def session_id(self): + """ + Session ID uniquely representing a Websocket client + + :returns: the session id + """ + + return self._session_id + + @classmethod + def dispatch_message(cls, message): + """ + Sends a message to Websocket client + + :param message: message from a module (received via ZeroMQ) + """ + + # Module name that is replying + module = message[0].decode("utf-8") + + # ZMQ responses are encoded in JSON + # format is a JSON array: [session ID, JSON-RPC response] + json_message = json_decode(message[1]) + session_id = json_message[0] + jsonrpc_response = json_message[1] + + log.debug("Received message from module {}: {}".format(module, json_message)) + + for client in cls.clients: + if client.session_id == session_id: + client.write_message(jsonrpc_response) + + @classmethod + def register_destination(cls, destination, module): + """ + Registers a destination handled by a module. + Used to route requests to the right module. + + :param destination: destination string + :param module: module string + """ + + # Make sure the destination is not already registered + # by another module for instance + assert destination not in cls.destinations + log.info("registering {} as a destination for {}".format(destination, + module)) + cls.destinations[destination] = module + + def open(self): + """ + Invoked when a new WebSocket is opened. + """ + + log.info("Websocket client {} connected".format(self.session_id)) + self.clients.add(self) + + def on_message(self, message): + """ + Handles incoming messages. + + :param message: message received over the Websocket + """ + + log.debug("Received Websocket message: {}".format(message)) + + try: + request = json_decode(message) + jsonrpc_version = request["jsonrpc"] + method = request["method"] + # warning: notifications cannot be sent by a client because check for an "id" here + request_id = request["id"] + except: + return self.write_message(JSONRPCParseError()()) + + if jsonrpc_version != self.version: + return self.write_message(JSONRPCInvalidRequest()()) + + if method not in self.destinations: + return self.write_message(JSONRPCMethodNotFound(request_id)()) + + module = self.destinations[method] + # ZMQ requests are encoded in JSON + # format is a JSON array: [session ID, JSON-RPC request] + zmq_request = [self.session_id, request] + # Route to the correct module + self.zmq_router.send_string(module, zmq.SNDMORE) + # Send the encoded JSON request + self.zmq_router.send_json(zmq_request) + + def on_close(self): + """ + Invoked when the WebSocket is closed. + """ + + log.info("Websocket client {} disconnected".format(self.session_id)) + self.clients.remove(self) diff --git a/gns3server/jsonrpc.py b/gns3server/jsonrpc.py new file mode 100644 index 00000000..21396b9d --- /dev/null +++ b/gns3server/jsonrpc.py @@ -0,0 +1,182 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 GNS3 Technologies Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +JSON-RPC protocol implementation. +http://www.jsonrpc.org/specification +""" + +import json +import uuid + + +class JSONRPCObject(object): + """ + Base object for JSON-RPC requests, responses, + notifications and errors. + """ + + def __init__(self): + return JSONRPCEncoder().default(self) + + def __str__(self, *args, **kwargs): + return json.dumps(self, cls=JSONRPCEncoder) + + def __call__(self): + return JSONRPCEncoder().default(self) + + +class JSONRPCEncoder(json.JSONEncoder): + """ + Creates the JSON-RPC message. + """ + + def default(self, obj): + """ + Returns a Python dictionary corresponding to a JSON-RPC message. + """ + + if isinstance(obj, JSONRPCObject): + message = {"jsonrpc": 2.0} + for field in dir(obj): + if not field.startswith('_'): + value = getattr(obj, field) + message[field] = value + return message + return json.JSONEncoder.default(self, obj) + + +class JSONRPCInvalidRequest(JSONRPCObject): + """ + Error response for an invalid request. + """ + + def __init__(self): + JSONRPCObject.__init__(self) + self.id = None + self.error = {"code": -32600, "message": "Invalid Request"} + + +class JSONRPCMethodNotFound(JSONRPCObject): + """ + Error response for an method not found. + + :param request_id: JSON-RPC identifier + """ + + def __init__(self, request_id): + JSONRPCObject.__init__(self) + self.id = request_id + self.error = {"code": -32601, "message": "Method not found"} + + +class JSONRPCInvalidParams(JSONRPCObject): + """ + Error response for invalid parameters. + + :param request_id: JSON-RPC identifier + """ + + def __init__(self, request_id): + JSONRPCObject.__init__(self) + self.id = request_id + self.error = {"code": -32602, "message": "Invalid params"} + + +class JSONRPCInternalError(JSONRPCObject): + """ + Error response for an internal error. + + :param request_id: JSON-RPC identifier (optional) + """ + + def __init__(self, request_id=None): + JSONRPCObject.__init__(self) + self.id = request_id + self.error = {"code": -32603, "message": "Internal error"} + + +class JSONRPCParseError(JSONRPCObject): + """ + Error response for parsing error. + """ + + def __init__(self): + JSONRPCObject.__init__(self) + self.id = None + self.error = {"code": -32700, "message": "Parse error"} + + +class JSONRPCCustomError(JSONRPCObject): + """ + Error response for an custom error. + + :param code: JSON-RPC error code + :param message: JSON-RPC error message + :param request_id: JSON-RPC identifier (optional) + """ + + def __init__(self, code, message, request_id=None): + JSONRPCObject.__init__(self) + self.id = request_id + self.error = {"code": code, "message": message} + + +class JSONRPCResponse(JSONRPCObject): + """ + JSON-RPC successful response. + + :param result: JSON-RPC result + :param request_id: JSON-RPC identifier + """ + + def __init__(self, result, request_id): + JSONRPCObject.__init__(self) + self.id = request_id + self.result = result + + +class JSONRPCRequest(JSONRPCObject): + """ + JSON-RPC request. + + :param method: JSON-RPC destination method + :param params: JSON-RPC params for the corresponding method (optional) + :param request_id: JSON-RPC identifier (generated by default) + """ + + def __init__(self, method, params=None, request_id=str(uuid.uuid1())): + JSONRPCObject.__init__(self) + self.id = request_id + self.method = method + if params: + self.params = params + + +class JSONRPCNotification(JSONRPCObject): + """ + JSON-RPC notification. + + :param method: JSON-RPC destination method + :param params: JSON-RPC params for the corresponding method (optional) + """ + + def __init__(self, method, params=None): + JSONRPCObject.__init__(self) + self.method = method + if params: + self.params = params diff --git a/gns3server/main.py b/gns3server/main.py index 5f176883..55b9ee3a 100644 --- a/gns3server/main.py +++ b/gns3server/main.py @@ -19,8 +19,8 @@ import datetime import sys import logging -import gns3server import tornado.options +import gns3server # command line options from tornado.options import define diff --git a/gns3server/modules/base.py b/gns3server/modules/base.py index 0023f9cb..b12b7a5f 100644 --- a/gns3server/modules/base.py +++ b/gns3server/modules/base.py @@ -19,6 +19,7 @@ Base class (interface) for modules """ +import gns3server.jsonrpc as jsonrpc import multiprocessing import zmq @@ -28,7 +29,11 @@ log = logging.getLogger(__name__) class IModule(multiprocessing.Process): """ - Module interface + Module interface. + + :param name: module name + :param args: arguments for the module + :param kwargs: named arguments for the module """ destination = {} @@ -47,6 +52,7 @@ class IModule(multiprocessing.Process): self._port = args[1] self._current_session = None self._current_destination = None + self._current_call_id = None def _setup(self): """ @@ -59,7 +65,9 @@ class IModule(multiprocessing.Process): def _create_stream(self, host=None, port=0, callback=None): """ - Creates a new ZMQ stream + Creates a new ZMQ stream. + + :returns: ZMQ stream object """ socket = self._context.socket(zmq.DEALER) @@ -98,27 +106,66 @@ class IModule(multiprocessing.Process): def stop(self): """ - Stops the event loop + Stops the event loop. """ - #zmq.eventloop.ioloop.IOLoop.instance().stop() self._ioloop.stop() - def send_response(self, response): + def send_response(self, results): """ - Sends a response back to the requester + Sends a response back to the requester. - :param response: + :param results: JSON results to the ZeroMQ server """ - # add session and destination to the response - response = [self._current_session, self._current_destination, response] + jsonrpc_response = jsonrpc.JSONRPCResponse(results, self._current_call_id)() + + # add session to the response + response = [self._current_session, jsonrpc_response] log.debug("ZeroMQ client ({}) sending: {}".format(self.name, response)) self._stream.send_json(response) + def send_param_error(self): + """ + Sends a param error back to the requester. + """ + + jsonrpc_response = jsonrpc.JSONRPCInvalidParams(self._current_call_id)() + + # add session to the response + response = [self._current_session, jsonrpc_response] + log.info("ZeroMQ client ({}) sending JSON-RPC param error for call id {}".format(self.name, self._current_call_id)) + self._stream.send_json(response) + + def send_internal_error(self): + """ + Sends a param error back to the requester. + """ + + jsonrpc_response = jsonrpc.JSONRPCInternalError()() + + # add session to the response + response = [self._current_session, jsonrpc_response] + log.critical("ZeroMQ client ({}) sending JSON-RPC internal error".format(self.name)) + self._stream.send_json(response) + + def send_custom_error(self, message, code=-3200): + """ + Sends a custom error back to the requester. + """ + + jsonrpc_response = jsonrpc.JSONRPCCustomError(code, message, self._current_call_id)() + + # add session to the response + response = [self._current_session, jsonrpc_response] + log.info("ZeroMQ client ({}) sending JSON-RPC custom error {} for call id {}".format(self.name, + message, + self._current_call_id)) + self._stream.send_json(response) + def _decode_request(self, request): """ - Decodes the request to JSON + Decodes the request to JSON. :param request: request from ZeroMQ server """ @@ -126,18 +173,22 @@ class IModule(multiprocessing.Process): try: request = zmq.utils.jsonapi.loads(request[0]) except ValueError: - self.send_response("ValueError") # FIXME: explicit json error + self._current_session = None + self.send_internal_error() return log.debug("ZeroMQ client ({}) received: {}".format(self.name, request)) self._current_session = request[0] - self._current_destination = request[1] + self._current_call_id = request[1].get("id") + destination = request[1].get("method") + params = request[1].get("params") - if self._current_destination not in self.destination: - # FIXME: return error if destination not found! + if destination not in self.destination: + self.send_internal_error() return - log.debug("Routing request to {}: {}".format(self._current_destination, request[2])) - self.destination[self._current_destination](self, request[2]) + + log.debug("Routing request to {}: {}".format(destination, request[1])) + self.destination[destination](self, params) def destinations(self): """ diff --git a/gns3server/server.py b/gns3server/server.py index 4a6059d7..00654f32 100644 --- a/gns3server/server.py +++ b/gns3server/server.py @@ -31,7 +31,7 @@ import socket import tornado.ioloop import tornado.web import tornado.autoreload -from .handlers.stomp_websocket import StompWebSocket +from .handlers.jsonrpc_websocket import JSONRPCWebSocket from .handlers.version_handler import VersionHandler from .module_manager import ModuleManager @@ -57,7 +57,7 @@ class Server(object): def load_modules(self): """ - Loads the modules + Loads the modules. """ cwd = os.path.dirname(os.path.abspath(__file__)) @@ -70,17 +70,17 @@ class Server(object): self._modules.append(instance) destinations = instance.destinations() for destination in destinations: - StompWebSocket.register_destination(destination, module.name) + JSONRPCWebSocket.register_destination(destination, module.name) instance.start() # starts the new process def run(self): """ - Starts the Tornado web server and ZeroMQ server + Starts the Tornado web server and ZeroMQ server. """ router = self._create_zmq_router() - # Add our Stomp Websocket handler to Tornado - self.handlers.extend([(r"/", StompWebSocket, dict(zmq_router=router))]) + # Add our JSON-RPC Websocket handler to Tornado + self.handlers.extend([(r"/", JSONRPCWebSocket, dict(zmq_router=router))]) tornado_app = tornado.web.Application(self.handlers, debug=True) # FIXME: debug mode! try: print("Starting server on port {}".format(self._port)) @@ -92,7 +92,7 @@ class Server(object): ioloop = tornado.ioloop.IOLoop.instance() stream = zmqstream.ZMQStream(router, ioloop) - stream.on_recv(StompWebSocket.dispatch_message) + stream.on_recv(JSONRPCWebSocket.dispatch_message) tornado.autoreload.add_reload_hook(functools.partial(self._cleanup, stop=False)) def signal_handler(signum=None, frame=None): diff --git a/tests/test_jsonrpc.py b/tests/test_jsonrpc.py new file mode 100644 index 00000000..a58d4266 --- /dev/null +++ b/tests/test_jsonrpc.py @@ -0,0 +1,91 @@ +import uuid +from tornado.testing import AsyncTestCase +from tornado.escape import json_encode, json_decode +from ws4py.client.tornadoclient import TornadoWebSocketClient +import gns3server.jsonrpc as jsonrpc + +""" +Tests for JSON-RPC protocol over Websockets +""" + + +class JSONRPC(AsyncTestCase): + + URL = "ws://127.0.0.1:8000/" + + def test_request(self): + + params = {"echo": "test"} + request = jsonrpc.JSONRPCRequest("dynamips.echo", params) + AsyncWSRequest(self.URL, self.io_loop, self.stop, str(request)) + response = self.wait() + json_response = json_decode(response) + assert json_response["jsonrpc"] == 2.0 + assert json_response["id"] == request.id + assert json_response["result"] == params + + def test_request_with_invalid_method(self): + + message = {"echo": "test"} + request = jsonrpc.JSONRPCRequest("dynamips.non_existent", message) + AsyncWSRequest(self.URL, self.io_loop, self.stop, str(request)) + response = self.wait() + json_response = json_decode(response) + assert json_response["error"].get("code") == -32601 + assert json_response["id"] == request.id + + def test_request_with_invalid_version(self): + + request = {"jsonrpc": "1.0", "method": "dynamips.echo", "id": 1} + AsyncWSRequest(self.URL, self.io_loop, self.stop, json_encode(request)) + response = self.wait() + json_response = json_decode(response) + assert json_response["id"] == None + assert json_response["error"].get("code") == -32600 + + def test_request_with_invalid_json(self): + + request = "my non JSON request" + AsyncWSRequest(self.URL, self.io_loop, self.stop, request) + response = self.wait() + json_response = json_decode(response) + assert json_response["id"] == None + assert json_response["error"].get("code") == -32700 + + def test_request_with_invalid_jsonrpc_field(self): + + request = {"jsonrpc": "2.0", "method_bogus": "dynamips.echo", "id": 1} + AsyncWSRequest(self.URL, self.io_loop, self.stop, json_encode(request)) + response = self.wait() + json_response = json_decode(response) + assert json_response["id"] == None + assert json_response["error"].get("code") == -32700 + + def test_request_with_no_params(self): + + request = jsonrpc.JSONRPCRequest("dynamips.echo") + AsyncWSRequest(self.URL, self.io_loop, self.stop, str(request)) + response = self.wait() + json_response = json_decode(response) + assert json_response["id"] == request.id + assert json_response["error"].get("code") == -32602 + + +class AsyncWSRequest(TornadoWebSocketClient): + """ + Very basic Websocket client for tests + """ + + def __init__(self, url, io_loop, callback, message): + TornadoWebSocketClient.__init__(self, url, io_loop=io_loop) + self._callback = callback + self._message = message + self.connect() + + def opened(self): + self.send(self._message, binary=False) + + def received_message(self, message): + self.close() + if self._callback: + self._callback(message.data) diff --git a/tests/test_stomp.py b/tests/test_stomp.py index 443d4a1d..b6346e2c 100644 --- a/tests/test_stomp.py +++ b/tests/test_stomp.py @@ -6,7 +6,7 @@ from gns3server.stomp import frame as stomp_frame from gns3server.stomp import protocol as stomp_protocol """ -Tests STOMP protocol over Websockets +Tests for STOMP protocol over Websockets """ @@ -64,7 +64,7 @@ class Stomp(AsyncTestCase): and check for a STOMP MESSAGE with echoed message and destination. """ - destination = "dynamips/echo" + destination = "dynamips.echo" message = {"ping": "test"} request = self.stomp.send(destination, json_encode(message), "application/json") AsyncWSRequest(self.URL, self.io_loop, self.stop, request) @@ -109,7 +109,7 @@ class Stomp(AsyncTestCase): class AsyncWSRequest(TornadoWebSocketClient): """ - Very basic Websocket client for the tests + Very basic Websocket client for tests """ def __init__(self, url, io_loop, callback, message):