It's was hard but i have finally a beginning of test for start VPCS

This commit is contained in:
Julien Duponchelle 2015-01-15 13:02:43 +01:00
parent 0afea48e63
commit 6bb2b88f1a
6 changed files with 62 additions and 47 deletions

View File

@ -75,7 +75,7 @@ class BaseVM:
"""
if console in self._allocated_console_ports:
raise VPCSError("Console port {} is already used by another VPCS device".format(console))
raise DeviceError("Console port {} is already used by another VM device".format(console))
self._allocated_console_ports.remove(self._console)
self._console = console
@ -146,8 +146,6 @@ class BaseVM:
"""
raise NotImplementedError
def put(self, *args):
"""
Add to the processing queue of the VM

View File

@ -87,16 +87,13 @@ class VPCSDevice(BaseVM):
# # create the device own working directory
# self.working_dir = working_dir_path
#
self._check_requirements()
super().__init__(name, vpcs_id)
@asyncio.coroutine
def _create(self):
"""Called when run loop is started"""
self._check_requirement()
def _check_requirement(self):
"""Check if VPCS is available with the correct version"""
def _check_requirements(self):
"""
Check if VPCS is available with the correct version
"""
if not self._path:
raise VPCSError("No path to a VPCS executable has been set")
@ -106,30 +103,7 @@ class VPCSDevice(BaseVM):
if not os.access(self._path, os.X_OK):
raise VPCSError("VPCS program '{}' is not executable".format(self._path))
yield from self._check_vpcs_version()
def defaults(self):
"""
Returns all the default attribute values for VPCS.
:returns: default values (dictionary)
"""
vpcs_defaults = {"name": self._name,
"script_file": self._script_file,
"console": self._console}
return vpcs_defaults
@classmethod
def reset(cls):
"""
Resets allocated instance list.
"""
cls._instances.clear()
cls._allocated_console_ports.clear()
self._check_vpcs_version()
@property
def name(self):
@ -167,7 +141,6 @@ class VPCSDevice(BaseVM):
new_name=new_name))
self._name = new_name
@asyncio.coroutine
def _check_vpcs_version(self):
"""
Checks if the VPCS executable version is >= 0.5b1.
@ -185,6 +158,9 @@ class VPCSDevice(BaseVM):
except (OSError, subprocess.SubprocessError) as e:
raise VPCSError("Error while looking for the VPCS version: {}".format(e))
@asyncio.coroutine
def _create(self):
pass
@asyncio.coroutine
def start(self):
@ -204,6 +180,7 @@ class VPCSDevice(BaseVM):
flags = 0
if sys.platform.startswith("win32"):
flags = subprocess.CREATE_NEW_PROCESS_GROUP
yield from asyncio.create_subprocess_exec()
with open(self._vpcs_stdout_file, "w") as fd:
self._process = yield from asyncio.create_subprocess_exec(*self._command,
stdout=fd,
@ -241,7 +218,7 @@ class VPCSDevice(BaseVM):
Reads the standard output of the VPCS process.
Only use when the process has been stopped or has crashed.
"""
#TODO: should be async
output = ""
if self._vpcs_stdout_file:
try:

View File

@ -52,7 +52,7 @@ def test_version_invalid_input_schema(server):
@asyncio_patch("gns3server.handlers.version_handler.VersionHandler", return_value={})
def test_version_invalid_output_schema():
def test_version_invalid_output_schema(server):
query = {'version': "0.4.2"}
response = server.post('/version', query)
assert response.status == 400

View File

@ -21,7 +21,7 @@ from gns3server import modules
@asyncio_patch('gns3server.modules.VPCS.create_vm', return_value=84)
def test_vpcs_create(server, mock):
def test_vpcs_create(server):
response = server.post('/vpcs', {'name': 'PC TEST 1'}, example=False)
assert response.status == 200
assert response.route == '/vpcs'

View File

@ -16,7 +16,13 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
from unittest.mock import patch
import asyncio
from tests.utils import asyncio_patch
#Move loop to util
from tests.api.base import loop
from asyncio.subprocess import Process
from unittest.mock import patch, Mock
from gns3server.modules.vpcs.vpcs_device import VPCSDevice
from gns3server.modules.vpcs.vpcs_error import VPCSError
@ -39,3 +45,9 @@ def test_vm_invalid_vpcs_path(tmpdir):
assert vm.name == "test"
assert vm.id == 42
def test_start(tmpdir, loop):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=Mock()):
vm = VPCSDevice("test", 42, working_dir=str(tmpdir), path="/bin/test_fake")
loop.run_until_complete(asyncio.async(vm.start()))
assert vm.is_running() == True

View File

@ -19,15 +19,43 @@ import asyncio
from unittest.mock import patch
def asyncio_patch(function, *args, **kwargs):
@asyncio.coroutine
def fake_anwser(*a, **kw):
return kwargs["return_value"]
class _asyncio_patch:
"""
A wrapper around python patch supporting asyncio.
Like the original patch you can use it as context
manager (with) or decorator
def register(func):
@patch(function, return_value=fake_anwser)
The original patch source code is the main source of
inspiration:
https://hg.python.org/cpython/file/3.4/Lib/unittest/mock.py
"""
def __init__(self, function, *args, **kwargs):
self.function = function
self.args = args
self.kwargs = kwargs
def __enter__(self):
"""Used when enter in the with block"""
self._patcher = patch(self.function, return_value=self._fake_anwser())
self._patcher.start()
def __exit__(self, *exc_info):
"""Used when leaving the with block"""
self._patcher.stop()
def __call__(self, func, *args, **kwargs):
"""Call is used when asyncio_patch is used as decorator"""
@patch(self.function, return_value=self._fake_anwser())
@asyncio.coroutine
def inner(*a, **kw):
return func(*a, **kw)
return inner
return register
def _fake_anwser(self):
future = asyncio.Future()
future.set_result(self.kwargs["return_value"])
return future
def asyncio_patch(function, *args, **kwargs):
return _asyncio_patch(function, *args, **kwargs)