Merge branch '2.2' of github.com:GNS3/gns3-server into 2.2

This commit is contained in:
ziajka 2019-03-21 10:39:55 +01:00
commit cbb6eccad8
22 changed files with 197 additions and 148 deletions

View File

@ -1,5 +1,27 @@
# Change Log
## 2.2.0a2 14/03/2019
* Web-UI v2019.1.0-alpha.1
* Update docs for update-bundled-web-ui.sh
* Fix issue when loading and quickly closing a project and opening it again. Fixes #1501.
* Disable unreliable nested virtualization check.
* Fix issue not checking build number on Windows.
* Change Hyper-V requirement checks.
* Early support for symbol themes.
* Re-order handlers in order to prevent CORS
* Download custom appliance symbols from GitHub Fix symbol cache issue. Ref https://github.com/GNS3/gns3-gui/issues/2671 Fix temporary directory for symbols was not deleted Fix temporary appliance file was not deleted
* Option to export snapshots.
* Support tags versioned WebUI when bundling
* Support selecting a compression type when exporting a project.
* Change how VPCS executable is searched.
* Use aiofiles where relevant.
* Update paths for binaries moved to the MacOS directory in GNS3.app
* Locked state should not be used when duplicating a node.
* Handle locking/unlocking items independently from the layer position.
* Use aiozipstream for snapshots. Fix tests.
* Project duplication support.
## 2.2.0a1 29/01/2019
* Restore reload support for nodes.

View File

@ -22,7 +22,7 @@ http://github.com/GNS3/dynamips/blob/master/README.hypervisor#L558
import asyncio
from gns3server.utils import parse_version
from gns3server.utils.asyncio.embed_shell import EmbedShell, create_telnet_shell
#from gns3server.utils.asyncio.embed_shell import EmbedShell, create_telnet_shell
from .device import Device
@ -34,36 +34,36 @@ import logging
log = logging.getLogger(__name__)
class EthernetSwitchConsole(EmbedShell):
"""
Console for the ethernet switch
"""
def __init__(self, node):
super().__init__(welcome_message="Welcome to GNS3 builtin Ethernet switch.\n\nType help for available commands\n")
self._node = node
async def mac(self):
"""
Show MAC address table
"""
res = 'Port Mac VLAN\n'
result = (await self._node._hypervisor.send('ethsw show_mac_addr_table {}'.format(self._node.name)))
for line in result:
mac, vlan, nio = line.replace(' ', ' ').split(' ')
mac = mac.replace('.', '')
mac = "{}:{}:{}:{}:{}:{}".format(
mac[0:2],
mac[2:4],
mac[4:6],
mac[6:8],
mac[8:10],
mac[10:12])
for port_number, switch_nio in self._node.nios.items():
if switch_nio.name == nio:
res += 'Ethernet' + str(port_number) + ' ' + mac + ' ' + vlan + '\n'
break
return res
# class EthernetSwitchConsole(EmbedShell):
# """
# Console for the ethernet switch
# """
#
# def __init__(self, node):
# super().__init__(welcome_message="Welcome to GNS3 builtin Ethernet switch.\n\nType help for available commands\n")
# self._node = node
#
# async def mac(self):
# """
# Show MAC address table
# """
# res = 'Port Mac VLAN\n'
# result = (await self._node._hypervisor.send('ethsw show_mac_addr_table {}'.format(self._node.name)))
# for line in result:
# mac, vlan, nio = line.replace(' ', ' ').split(' ')
# mac = mac.replace('.', '')
# mac = "{}:{}:{}:{}:{}:{}".format(
# mac[0:2],
# mac[2:4],
# mac[4:6],
# mac[6:8],
# mac[8:10],
# mac[10:12])
# for port_number, switch_nio in self._node.nios.items():
# if switch_nio.name == nio:
# res += 'Ethernet' + str(port_number) + ' ' + mac + ' ' + vlan + '\n'
# break
# return res
class EthernetSwitch(Device):
@ -85,8 +85,8 @@ class EthernetSwitch(Device):
self._nios = {}
self._mappings = {}
self._telnet_console = None
self._telnet_shell = None
self._telnet_server = None
#self._telnet_shell = None
#self._telnet_server = None
self._console = console
self._console_type = console_type
@ -177,13 +177,13 @@ class EthernetSwitch(Device):
await self._hypervisor.send('ethsw create "{}"'.format(self._name))
log.info('Ethernet switch "{name}" [{id}] has been created'.format(name=self._name, id=self._id))
self._telnet_shell = EthernetSwitchConsole(self)
self._telnet_shell.prompt = self._name + '> '
self._telnet = create_telnet_shell(self._telnet_shell)
try:
self._telnet_server = (await asyncio.start_server(self._telnet.run, self._manager.port_manager.console_host, self.console))
except OSError as e:
self.project.emit("log.warning", {"message": "Could not start Telnet server on socket {}:{}: {}".format(self._manager.port_manager.console_host, self.console, e)})
#self._telnet_shell = EthernetSwitchConsole(self)
#self._telnet_shell.prompt = self._name + '> '
#self._telnet = create_telnet_shell(self._telnet_shell)
#try:
# self._telnet_server = (await asyncio.start_server(self._telnet.run, self._manager.port_manager.console_host, self.console))
#except OSError as e:
# self.project.emit("log.warning", {"message": "Could not start Telnet server on socket {}:{}: {}".format(self._manager.port_manager.console_host, self.console, e)})
self._hypervisor.devices.append(self)
async def set_name(self, new_name):
@ -227,9 +227,9 @@ class EthernetSwitch(Device):
Deletes this Ethernet switch.
"""
await self._telnet.close()
if self._telnet_server:
self._telnet_server.close()
#await self._telnet.close()
#if self._telnet_server:
# self._telnet_server.close()
for nio in self._nios.values():
if nio:

View File

@ -674,6 +674,13 @@ class IOUVM(BaseNode):
pass
self._iou_process = None
try:
symlink = os.path.join(self.working_dir, os.path.basename(self.path))
if os.path.islink(symlink):
os.unlink(symlink)
except OSError as e:
log.warning("Could not delete symbolic link: {}".format(e))
self._started = False
self.save_configs()

View File

@ -407,7 +407,7 @@ class Project:
"""
files = []
for dirpath, dirnames, filenames in os.walk(self.path):
for dirpath, dirnames, filenames in os.walk(self.path, followlinks=False):
for filename in filenames:
if not filename.endswith(".ghost"):
path = os.path.relpath(dirpath, self.path)

View File

@ -20,6 +20,7 @@ VirtualBox server module.
"""
import os
import re
import sys
import shutil
import asyncio
@ -177,14 +178,17 @@ class VirtualBox(BaseManager):
for line in result:
if len(line) == 0 or line[0] != '"' or line[-1:] != "}":
continue # Broken output (perhaps a carriage return in VM name)
vmname, _ = line.rsplit(' ', 1)
vmname = vmname.strip('"')
match = re.search(r"\"(.*)\"\ {(.*)}", line)
if not match:
continue
vmname = match.group(1)
uuid = match.group(2)
if vmname == "<inaccessible>":
continue # ignore inaccessible VMs
extra_data = await self.execute("getextradata", [vmname, "GNS3/Clone"])
extra_data = await self.execute("getextradata", [uuid, "GNS3/Clone"])
if allow_clone or len(extra_data) == 0 or not extra_data[0].strip() == "Value: yes":
# get the amount of RAM
info_results = await self.execute("showvminfo", [vmname, "--machinereadable"])
info_results = await self.execute("showvminfo", [uuid, "--machinereadable"])
ram = 0
for info in info_results:
try:

View File

@ -57,6 +57,7 @@ class VirtualBoxVM(BaseNode):
super().__init__(name, node_id, project, manager, console=console, linked_clone=linked_clone, console_type=console_type)
self._uuid = None # UUID in VirtualBox
self._maximum_adapters = 8
self._system_properties = {}
self._telnet_server = None
@ -116,7 +117,7 @@ class VirtualBoxVM(BaseNode):
:returns: state (string)
"""
results = await self.manager.execute("showvminfo", [self._vmname, "--machinereadable"])
results = await self.manager.execute("showvminfo", [self._uuid, "--machinereadable"])
for info in results:
if '=' in info:
name, value = info.split('=', 1)
@ -134,7 +135,7 @@ class VirtualBoxVM(BaseNode):
"""
args = shlex.split(params)
result = await self.manager.execute("controlvm", [self._vmname] + args)
result = await self.manager.execute("controlvm", [self._uuid] + args)
return result
async def _modify_vm(self, params):
@ -145,7 +146,7 @@ class VirtualBoxVM(BaseNode):
"""
args = shlex.split(params)
await self.manager.execute("modifyvm", [self._vmname] + args)
await self.manager.execute("modifyvm", [self._uuid] + args)
async def _check_duplicate_linked_clone(self):
"""
@ -174,6 +175,7 @@ class VirtualBoxVM(BaseNode):
await asyncio.sleep(1)
async def create(self):
if not self.linked_clone:
await self._check_duplicate_linked_clone()
@ -184,21 +186,29 @@ class VirtualBoxVM(BaseNode):
raise VirtualBoxError("The VirtualBox API version is lower than 4.3")
log.info("VirtualBox VM '{name}' [{id}] created".format(name=self.name, id=self.id))
vm_info = await self._get_vm_info()
if "memory" in vm_info:
self._ram = int(vm_info["memory"])
if "UUID" in vm_info:
self._uuid = vm_info["UUID"]
if not self._uuid:
raise VirtualBoxError("Could not find any UUID for VM '{}'".format(self._vmname))
if self.linked_clone:
if self.id and os.path.isdir(os.path.join(self.working_dir, self._vmname)):
self._patch_vm_uuid()
await self.manager.execute("registervm", [self._linked_vbox_file()])
await self._reattach_linked_hdds()
vm_info = await self._get_vm_info()
self._uuid = vm_info.get("UUID")
if not self._uuid:
raise VirtualBoxError("Could not find any UUID for VM '{}'".format(self._vmname))
else:
await self._create_linked_clone()
if self._adapters:
await self.set_adapters(self._adapters)
vm_info = await self._get_vm_info()
if "memory" in vm_info:
self._ram = int(vm_info["memory"])
def _linked_vbox_file(self):
return os.path.join(self.working_dir, self._vmname, self._vmname + ".vbox")
@ -266,7 +276,7 @@ class VirtualBoxVM(BaseNode):
# check if there is enough RAM to run
self.check_available_ram(self.ram)
args = [self._vmname]
args = [self._uuid]
if self._headless:
args.extend(["--type", "headless"])
result = await self.manager.execute("startvm", args)
@ -275,9 +285,9 @@ class VirtualBoxVM(BaseNode):
log.debug("Start result: {}".format(result))
# add a guest property to let the VM know about the GNS3 name
await self.manager.execute("guestproperty", ["set", self._vmname, "NameInGNS3", self.name])
await self.manager.execute("guestproperty", ["set", self._uuid, "NameInGNS3", self.name])
# add a guest property to let the VM know about the GNS3 project directory
await self.manager.execute("guestproperty", ["set", self._vmname, "ProjectDirInGNS3", self.working_dir])
await self.manager.execute("guestproperty", ["set", self._uuid, "ProjectDirInGNS3", self.working_dir])
await self._start_ubridge()
for adapter_number in range(0, self._adapters):
@ -739,7 +749,7 @@ class VirtualBoxVM(BaseNode):
"""
vm_info = {}
results = await self.manager.execute("showvminfo", [self._vmname, "--machinereadable"])
results = await self.manager.execute("showvminfo", ["--machinereadable", "--", self._vmname]) # "--" is to protect against vm names containing the "-" character
for info in results:
try:
name, value = info.split('=', 1)
@ -775,7 +785,7 @@ class VirtualBoxVM(BaseNode):
# set server mode with a pipe on the first serial port
pipe_name = self._get_pipe_name()
args = [self._vmname, "--uartmode1", "server", pipe_name]
args = [self._uuid, "--uartmode1", "server", pipe_name]
await self.manager.execute("modifyvm", args)
async def _storage_attach(self, params):
@ -786,7 +796,7 @@ class VirtualBoxVM(BaseNode):
"""
args = shlex.split(params)
await self.manager.execute("storageattach", [self._vmname] + args)
await self.manager.execute("storageattach", [self._uuid] + args)
async def _get_nic_attachements(self, maximum_adapters):
"""
@ -850,7 +860,7 @@ class VirtualBoxVM(BaseNode):
vbox_adapter_type = "82545EM"
if adapter_type == "Paravirtualized Network (virtio-net)":
vbox_adapter_type = "virtio"
args = [self._vmname, "--nictype{}".format(adapter_number + 1), vbox_adapter_type]
args = [self._uuid, "--nictype{}".format(adapter_number + 1), vbox_adapter_type]
await self.manager.execute("modifyvm", args)
if isinstance(nio, NIOUDP):
@ -888,10 +898,10 @@ class VirtualBoxVM(BaseNode):
gns3_snapshot_exists = True
if not gns3_snapshot_exists:
result = await self.manager.execute("snapshot", [self._vmname, "take", "GNS3 Linked Base for clones"])
result = await self.manager.execute("snapshot", [self._uuid, "take", "GNS3 Linked Base for clones"])
log.debug("GNS3 snapshot created: {}".format(result))
args = [self._vmname,
args = [self._uuid,
"--snapshot",
"GNS3 Linked Base for clones",
"--options",
@ -906,12 +916,12 @@ class VirtualBoxVM(BaseNode):
log.debug("VirtualBox VM: {} cloned".format(result))
self._vmname = self._name
await self.manager.execute("setextradata", [self._vmname, "GNS3/Clone", "yes"])
await self.manager.execute("setextradata", [self._uuid, "GNS3/Clone", "yes"])
# We create a reset snapshot in order to simplify life of user who want to rollback their VM
# Warning: Do not document this it's seem buggy we keep it because Raizo students use it.
try:
args = [self._vmname, "take", "reset"]
args = [self._uuid, "take", "reset"]
result = await self.manager.execute("snapshot", args)
log.debug("Snapshot 'reset' created: {}".format(result))
# It seem sometimes this failed due to internal race condition of Vbox

View File

@ -27,7 +27,6 @@ import io
from operator import itemgetter
from ..utils import parse_version
from ..utils.images import list_images
from ..utils.asyncio import locking
from ..controller.controller_error import ControllerError
from ..version import __version__, __version_info__
@ -405,7 +404,7 @@ class Compute:
raise aiohttp.web.HTTPConflict(text=msg)
else:
msg = "{}\nUsing different versions may result in unexpected problems. Please use at your own risk.".format(msg)
self._controller.notification.emit("log.warning", {"message": msg})
self._controller.notification.controller_emit("log.warning", {"message": msg})
self._notifications = asyncio.gather(self._connect_notification())
self._connected = True
@ -571,8 +570,7 @@ class Compute:
async def images(self, type):
"""
Return the list of images available for this type on controller
and on the compute node.
Return the list of images available for this type on the compute node.
"""
images = []
@ -581,9 +579,9 @@ class Compute:
try:
if type in ["qemu", "dynamips", "iou"]:
for local_image in list_images(type):
if local_image['filename'] not in [i['filename'] for i in images]:
images.append(local_image)
#for local_image in list_images(type):
# if local_image['filename'] not in [i['filename'] for i in images]:
# images.append(local_image)
images = sorted(images, key=itemgetter('filename'))
else:
images = sorted(images, key=itemgetter('image'))

View File

@ -212,9 +212,11 @@ class GNS3VM:
new_settings = copy.copy(self._settings)
new_settings.update(settings)
if self.settings != new_settings:
await self._stop()
self._settings = settings
self._controller.save()
try:
await self._stop()
finally:
self._settings = settings
self._controller.save()
if self.enable:
await self.start()
else:

View File

@ -239,6 +239,8 @@ class HyperVGNS3VM(BaseGNS3VM):
log.info("GNS3 VM has been started")
# Get the guest IP address
# LIS (Linux Integration Services) must be installed on the guest
# See https://oitibs.com/hyper-v-lis-on-ubuntu-18-04/ for details.
trial = 120
guest_ip_address = ""
log.info("Waiting for GNS3 VM IP")

View File

@ -69,18 +69,19 @@ class VMwareGNS3VM(BaseGNS3VM):
if ram % 4 != 0:
raise GNS3VMError("Allocated memory {} for the GNS3 VM must be a multiple of 4".format(ram))
available_vcpus = psutil.cpu_count()
available_vcpus = psutil.cpu_count(logical=True)
if vcpus > available_vcpus:
raise GNS3VMError("You have allocated too many vCPUs for the GNS3 VM! (max available is {} vCPUs)".format(available_vcpus))
try:
pairs = VMware.parse_vmware_file(self._vmx_path)
pairs["numvcpus"] = str(vcpus)
cores_per_sockets = int(available_vcpus / psutil.cpu_count(logical=False))
if cores_per_sockets >= 1:
pairs["cpuid.corespersocket"] = str(cores_per_sockets)
pairs["memsize"] = str(ram)
VMware.write_vmx_file(self._vmx_path, pairs)
if vcpus > 1:
pairs["numvcpus"] = str(vcpus)
cores_per_sockets = int(vcpus / psutil.cpu_count(logical=False))
if cores_per_sockets > 1:
pairs["cpuid.corespersocket"] = str(cores_per_sockets)
pairs["memsize"] = str(ram)
VMware.write_vmx_file(self._vmx_path, pairs)
log.info("GNS3 VM vCPU count set to {} and RAM amount set to {}".format(vcpus, ram))
except OSError as e:
raise GNS3VMError('Could not read/write VMware VMX file "{}": {}'.format(self._vmx_path, e))

View File

@ -58,7 +58,7 @@ class CrashReport:
Report crash to a third party service
"""
DSN = "https://f5e2d00b16764b7d8bb996a9c8980185:4c01020e90ee4657956bc680a25f9959@sentry.io/38482"
DSN = "https://b53e4cbcb01c453f9728dbf891424d18:061bfafc9c184f9bb2012d394f860fc4@sentry.io/38482"
if hasattr(sys, "frozen"):
cacert = get_resource("cacert.pem")
if cacert is not None and os.path.isfile(cacert):

View File

@ -82,13 +82,14 @@ class ComputeHandler:
@Route.get(
r"/computes/{compute_id}/{emulator}/images",
parameters={
"compute_id": "Compute UUID"
"compute_id": "Compute UUID",
"emulator": "Emulator type"
},
status_codes={
200: "OK",
404: "Instance doesn't exist"
},
description="Return the list of images available on compute and controller for this emulator type")
description="Return the list of images available on compute for this emulator type")
async def images(request, response):
controller = Controller.instance()
compute = controller.get_compute(request.match_info["compute_id"])

View File

@ -32,8 +32,8 @@ from prompt_toolkit.shortcuts import create_prompt_application, create_asyncio_e
from prompt_toolkit.terminal.vt100_output import Vt100_Output
from prompt_toolkit.input import StdinInput
from .telnet_server import AsyncioTelnetServer, TelnetConnection
from .input_stream import InputStream
from gns3server.utils.asyncio.telnet_server import AsyncioTelnetServer, TelnetConnection
from gns3server.utils.asyncio.input_stream import InputStream
class EmbedShell:
@ -344,7 +344,7 @@ if __name__ == '__main__':
else:
return 'world\n'
return (await world())
return await world()
# Demo using telnet
shell = Demo(welcome_message="Welcome!\n")

View File

@ -139,7 +139,7 @@ def images_directories(type):
paths.append(directory)
# Compatibility with old topologies we look in parent directory
paths.append(img_dir)
# Return only the existings paths
# Return only the existing paths
return [force_unix_path(p) for p in paths if os.path.exists(p)]

View File

@ -23,7 +23,7 @@
# or negative for a release candidate or beta (after the base version
# number has been incremented)
__version__ = "2.2.0dev6"
__version__ = "2.2.0dev7"
__version_info__ = (2, 2, 0, 99)
# If it's a git checkout try to add the commit

View File

@ -179,6 +179,9 @@ class WebServer:
return ssl_context
async def start_shell(self):
log.error("The embedded shell has been deactivated in this version of GNS3")
return
try:
from ptpython.repl import embed
except ImportError:

View File

@ -6,6 +6,5 @@ async_generator>=1.10
Jinja2>=2.7.3
raven>=5.23.0
psutil>=3.0.0
prompt-toolkit==1.0.15
async-timeout==3.0.1
distro>=1.3.0

View File

@ -16,7 +16,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from tests.utils import AsyncioMagicMock
from gns3server.compute.dynamips.nodes.ethernet_switch import EthernetSwitchConsole
#from gns3server.compute.dynamips.nodes.ethernet_switch import EthernetSwitchConsole
from gns3server.compute.nios.nio_udp import NIOUDP
@ -28,10 +28,10 @@ def test_mac_command(async_run):
node.nios[0].name = "Ethernet0"
node.nios[1] = NIOUDP(55, "127.0.0.1", 56)
node.nios[1].name = "Ethernet1"
node._hypervisor.send = AsyncioMagicMock(return_value=["0050.7966.6801 1 Ethernet0", "0050.7966.6802 1 Ethernet1"])
console = EthernetSwitchConsole(node)
assert async_run(console.mac()) == \
"Port Mac VLAN\n" \
"Ethernet0 00:50:79:66:68:01 1\n" \
"Ethernet1 00:50:79:66:68:02 1\n"
node._hypervisor.send.assert_called_with("ethsw show_mac_addr_table Test")
#node._hypervisor.send = AsyncioMagicMock(return_value=["0050.7966.6801 1 Ethernet0", "0050.7966.6802 1 Ethernet1"])
#console = EthernetSwitchConsole(node)
#assert async_run(console.mac()) == \
# "Port Mac VLAN\n" \
# "Ethernet0 00:50:79:66:68:01 1\n" \
# "Ethernet1 00:50:79:66:68:02 1\n"
#node._hypervisor.send.assert_called_with("ethsw show_mac_addr_table Test")

View File

@ -74,7 +74,7 @@ def test_vboxmanage_path(manager, tmpdir):
def test_list_vms(manager, loop):
vm_list = ['"Windows 8.1" {27b4d095-ff5f-4ac4-bb9d-5f2c7861c1f1}',
'"Carriage',
'Return" {27b4d095-ff5f-4ac4-bb9d-5f2c7861c1f1}',
'Return" {27b4d095-ff5f-4ac4-bb9d-5f2c7861c3f3}',
'',
'"<inaccessible>" {42b4d095-ff5f-4ac4-bb9d-5f2c7861c1f1}',
'"Linux Microcore 4.7.1" {ccd8c50b-c172-457d-99fa-dd69371ede0e}']
@ -83,9 +83,10 @@ def test_list_vms(manager, loop):
if cmd == "list":
return vm_list
else:
if args[0] == "Windows 8.1":
print(args)
if args[0] == "27b4d095-ff5f-4ac4-bb9d-5f2c7861c1f1":
return ["memory=512"]
elif args[0] == "Linux Microcore 4.7.1":
elif args[0] == "ccd8c50b-c172-457d-99fa-dd69371ede0e":
return ["memory=256"]
assert False, "Unknow {} {}".format(cmd, args)

View File

@ -75,6 +75,7 @@ def test_rename_vmname(project, manager, async_run):
def test_vm_valid_virtualbox_api_version(loop, project, manager):
with asyncio_patch("gns3server.compute.virtualbox.VirtualBox.execute", return_value=["API version: 4_3"]):
vm = VirtualBoxVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager, "test", False)
vm._uuid = "00010203-0405-0607-0809-0a0b0c0d0e0f"
loop.run_until_complete(asyncio.ensure_future(vm.create()))

View File

@ -348,7 +348,7 @@ def test_forward_post(compute, async_run):
def test_images(compute, async_run, images_dir):
"""
Will return image on compute and on controller
Will return image on compute
"""
response = MagicMock()
response.status = 200
@ -357,14 +357,12 @@ def test_images(compute, async_run, images_dir):
"path": "linux.qcow2",
"md5sum": "d41d8cd98f00b204e9800998ecf8427e",
"filesize": 0}]).encode())
open(os.path.join(images_dir, "QEMU", "asa.qcow2"), "w+").close()
with asyncio_patch("aiohttp.ClientSession.request", return_value=response) as mock:
images = async_run(compute.images("qemu"))
mock.assert_called_with("GET", "https://example.com:84/v2/compute/qemu/images", auth=None, data=None, headers={'content-type': 'application/json'}, chunked=None, timeout=None)
async_run(compute.close())
assert images == [
{"filename": "asa.qcow2", "path": "asa.qcow2", "md5sum": "d41d8cd98f00b204e9800998ecf8427e", "filesize": 0},
{"filename": "linux.qcow2", "path": "linux.qcow2", "md5sum": "d41d8cd98f00b204e9800998ecf8427e", "filesize": 0}
]

View File

@ -17,7 +17,7 @@
import asyncio
from gns3server.utils.asyncio.embed_shell import EmbedShell
#from gns3server.utils.asyncio.embed_shell import EmbedShell
#FIXME: this is broken with recent Python >= 3.6
# def test_embed_shell_help(async_run):
@ -39,44 +39,44 @@ from gns3server.utils.asyncio.embed_shell import EmbedShell
# assert async_run(app._parse_command('? hello')) == 'hello: The hello world function\n\nThe hello usage\n'
def test_embed_shell_execute(async_run):
class Application(EmbedShell):
async def hello(self):
"""
The hello world function
The hello usage
"""
return 'world'
reader = asyncio.StreamReader()
writer = asyncio.StreamReader()
app = Application(reader, writer)
assert async_run(app._parse_command('hello')) == 'world'
def test_embed_shell_welcome(async_run, loop):
reader = asyncio.StreamReader()
writer = asyncio.StreamReader()
app = EmbedShell(reader, writer, welcome_message="Hello")
task = loop.create_task(app.run())
assert async_run(writer.read(5)) == b"Hello"
task.cancel()
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
def test_embed_shell_prompt(async_run, loop):
reader = asyncio.StreamReader()
writer = asyncio.StreamReader()
app = EmbedShell(reader, writer)
app.prompt = "gbash# "
task = loop.create_task(app.run())
assert async_run(writer.read(7)) == b"gbash# "
task.cancel()
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
# def test_embed_shell_execute(async_run):
# class Application(EmbedShell):
#
# async def hello(self):
# """
# The hello world function
#
# The hello usage
# """
# return 'world'
# reader = asyncio.StreamReader()
# writer = asyncio.StreamReader()
# app = Application(reader, writer)
# assert async_run(app._parse_command('hello')) == 'world'
#
#
# def test_embed_shell_welcome(async_run, loop):
# reader = asyncio.StreamReader()
# writer = asyncio.StreamReader()
# app = EmbedShell(reader, writer, welcome_message="Hello")
# task = loop.create_task(app.run())
# assert async_run(writer.read(5)) == b"Hello"
# task.cancel()
# try:
# loop.run_until_complete(task)
# except asyncio.CancelledError:
# pass
#
#
# def test_embed_shell_prompt(async_run, loop):
# reader = asyncio.StreamReader()
# writer = asyncio.StreamReader()
# app = EmbedShell(reader, writer)
# app.prompt = "gbash# "
# task = loop.create_task(app.run())
# assert async_run(writer.read(7)) == b"gbash# "
# task.cancel()
# try:
# loop.run_until_complete(task)
# except asyncio.CancelledError:
# pass