gns3-server/tests/test_stomp.py
2013-12-05 21:39:27 -07:00

128 lines
4.2 KiB
Python

import uuid
from tornado.testing import AsyncTestCase
from tornado.escape import json_encode, json_decode
from ws4py.client.tornadoclient import TornadoWebSocketClient
from gns3server.stomp import frame as stomp_frame
from gns3server.stomp import protocol as stomp_protocol
"""
Tests STOMP protocol over Websockets
"""
class Stomp(AsyncTestCase):
URL = "ws://127.0.0.1:8000/"
def setUp(self):
self.stomp = stomp_protocol.clientProtocol()
AsyncTestCase.setUp(self)
def test_connect(self):
"""
Sends a STOMP CONNECT frame and
check for a STOMP CONNECTED frame.
"""
request = self.stomp.connect("localhost")
AsyncWSRequest(self.URL, self.io_loop, self.stop, request)
response = self.wait()
assert response
frame = stomp_frame.Frame.parse_frame(response.decode("utf-8"))
assert frame.cmd == stomp_protocol.CMD_CONNECTED
def test_protocol_negotiation_failure(self):
"""
Sends a STOMP CONNECT frame with protocol version 1.0 required
and check for a STOMP ERROR sent back by the server which supports
STOMP version 1.2 only.
"""
request = self.stomp.connect("localhost", accept_version='1.0')
AsyncWSRequest(self.URL, self.io_loop, self.stop, request)
response = self.wait()
assert response
frame = stomp_frame.Frame.parse_frame(response.decode("utf-8"))
assert frame.cmd == stomp_protocol.CMD_ERROR
def test_malformed_frame(self):
"""
Sends an empty frame and check for a STOMP ERROR.
"""
request = b""
AsyncWSRequest(self.URL, self.io_loop, self.stop, request)
response = self.wait()
assert response
frame = stomp_frame.Frame.parse_frame(response.decode("utf-8"))
assert frame.cmd == stomp_protocol.CMD_ERROR
def test_send(self):
"""
Sends a STOMP SEND frame with a message and a destination
and check for a STOMP MESSAGE with echoed message and destination.
"""
destination = "dynamips/echo"
message = {"ping": "test"}
request = self.stomp.send(destination, json_encode(message), "application/json")
AsyncWSRequest(self.URL, self.io_loop, self.stop, request)
response = self.wait()
assert response
frame = stomp_frame.Frame.parse_frame(response.decode("utf-8"))
assert frame.cmd == stomp_protocol.CMD_MESSAGE
assert frame.headers[stomp_protocol.HDR_DESTINATION] == destination
json_reply = json_decode(frame.body)
assert message == json_reply
def test_unimplemented_frame(self):
"""
Sends an STOMP BEGIN frame which is not implemented by the server
and check for a STOMP ERROR frame.
"""
frame = stomp_frame.Frame(stomp_protocol.CMD_BEGIN)
request = frame.encode()
AsyncWSRequest(self.URL, self.io_loop, self.stop, request)
response = self.wait()
assert response
frame = stomp_frame.Frame.parse_frame(response.decode("utf-8"))
assert frame.cmd == stomp_protocol.CMD_ERROR
def test_disconnect(self):
"""
Sends a STOMP DISCONNECT frame is a receipt id
and check for a STOMP RECEIPT frame with the same receipt id
confirming the disconnection.
"""
myid = str(uuid.uuid4())
request = self.stomp.disconnect(myid)
AsyncWSRequest(self.URL, self.io_loop, self.stop, request)
response = self.wait()
assert response
frame = stomp_frame.Frame.parse_frame(response.decode("utf-8"))
assert frame.cmd == stomp_protocol.CMD_RECEIPT
assert frame.headers[stomp_protocol.HDR_RECEIPT_ID] == myid
class AsyncWSRequest(TornadoWebSocketClient):
"""
Very basic Websocket client for the tests
"""
def __init__(self, url, io_loop, callback, message):
TornadoWebSocketClient.__init__(self, url, io_loop=io_loop)
self._callback = callback
self._message = message
self.connect()
def opened(self):
self.send(self._message, binary=False)
def received_message(self, message):
self.close()
if self._callback:
self._callback(message.data)