diff --git a/gns3server/handlers/api/qemu_handler.py b/gns3server/handlers/api/qemu_handler.py index 085ac286..7520aa4f 100644 --- a/gns3server/handlers/api/qemu_handler.py +++ b/gns3server/handlers/api/qemu_handler.py @@ -357,7 +357,7 @@ class QEMUHandler: response.json(vms) @Route.post( - r"/qemu/vms/{path}", + r"/qemu/vms/{path:.+}", status_codes={ 204: "Image uploaded", }, diff --git a/gns3server/modules/base_manager.py b/gns3server/modules/base_manager.py index 79dbf1b6..e8a68b16 100644 --- a/gns3server/modules/base_manager.py +++ b/gns3server/modules/base_manager.py @@ -31,6 +31,7 @@ from uuid import UUID, uuid4 from gns3server.utils.interfaces import is_interface_up from ..config import Config from ..utils.asyncio import wait_run_in_executor +from ..utils import force_unix_path from .project_manager import ProjectManager from .nios.nio_udp import NIOUDP @@ -372,7 +373,7 @@ class BaseManager: nio = NIOUDP(lport, rhost, rport) elif nio_settings["type"] == "nio_tap": tap_device = nio_settings["tap_device"] - #if not is_interface_up(tap_device): + # if not is_interface_up(tap_device): # raise aiohttp.web.HTTPConflict(text="TAP interface {} does not exist or is down".format(tap_device)) # FIXME: check for permissions on tap device # if not self.has_privileged_access(executable): @@ -408,10 +409,10 @@ class BaseManager: if not os.path.exists(path): old_path = os.path.normpath(os.path.join(img_directory, '..', *s)) if os.path.exists(old_path): - return old_path + return force_unix_path(old_path) - return path - return path + return force_unix_path(path) + return force_unix_path(path) def get_relative_image_path(self, path): """ @@ -427,8 +428,8 @@ class BaseManager: return "" img_directory = self.get_images_directory() path = self.get_abs_image_path(path) - if os.path.dirname(path) == img_directory: - return os.path.basename(path) + if os.path.commonprefix([img_directory, path]) == img_directory: + return os.path.relpath(path, img_directory) return path @asyncio.coroutine @@ -460,11 +461,13 @@ class BaseManager: @asyncio.coroutine def write_image(self, filename, stream): directory = self.get_images_directory() - path = os.path.join(directory, os.path.basename(filename)) + path = os.path.abspath(os.path.join(directory, *os.path.split(filename))) + if os.path.commonprefix([directory, path]) != directory: + raise aiohttp.web.HTTPForbidden(text="Could not write image: {}, {} is forbiden".format(filename, path)) log.info("Writting image file %s", path) try: remove_checksum(path) - os.makedirs(directory, exist_ok=True) + os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, 'wb+') as f: while True: packet = yield from stream.read(512) @@ -474,4 +477,4 @@ class BaseManager: os.chmod(path, stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC) md5sum(path) except OSError as e: - raise aiohttp.web.HTTPConflict(text="Could not write image: {} to {}".format(filename, e)) + raise aiohttp.web.HTTPConflict(text="Could not write image: {} because {}".format(filename, e)) diff --git a/gns3server/utils/__init__.py b/gns3server/utils/__init__.py index e69de29b..753df140 100644 --- a/gns3server/utils/__init__.py +++ b/gns3server/utils/__init__.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python +# +# Copyright (C) 2015 GNS3 Technologies Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import posixpath + + +def force_unix_path(path): + """ + :param path: Path to convert + """ + + path = path.replace("\\", "/") + return posixpath.normpath(path) diff --git a/tests/handlers/api/test_qemu.py b/tests/handlers/api/test_qemu.py index 80cd7600..46614d9b 100644 --- a/tests/handlers/api/test_qemu.py +++ b/tests/handlers/api/test_qemu.py @@ -232,6 +232,25 @@ def test_upload_vm(server, tmpdir): assert checksum == "033bd94b1168d7e4f0d644c3c95e35bf" +def test_upload_vm_ova(server, tmpdir): + with patch("gns3server.modules.Qemu.get_images_directory", return_value=str(tmpdir),): + response = server.post("/qemu/vms/test2.ova/test2.vmdk", body="TEST", raw=True) + assert response.status == 204 + + with open(str(tmpdir / "test2.ova" / "test2.vmdk")) as f: + assert f.read() == "TEST" + + with open(str(tmpdir / "test2.ova" / "test2.vmdk.md5sum")) as f: + checksum = f.read() + assert checksum == "033bd94b1168d7e4f0d644c3c95e35bf" + + +def test_upload_vm_forbiden_location(server, tmpdir): + with patch("gns3server.modules.Qemu.get_images_directory", return_value=str(tmpdir),): + response = server.post("/qemu/vms/../../test2", body="TEST", raw=True) + assert response.status == 403 + + def test_upload_vm_permission_denied(server, tmpdir): with open(str(tmpdir / "test2"), "w+") as f: f.write("") diff --git a/tests/modules/qemu/test_qemu_vm.py b/tests/modules/qemu/test_qemu_vm.py index f42ca403..d66cd32b 100644 --- a/tests/modules/qemu/test_qemu_vm.py +++ b/tests/modules/qemu/test_qemu_vm.py @@ -396,6 +396,13 @@ def test_hda_disk_image(vm, tmpdir): assert vm.hda_disk_image == str(tmpdir / "QEMU" / "test") +def test_hda_disk_image_ova(vm, tmpdir): + + with patch("gns3server.config.Config.get_section_config", return_value={"images_path": str(tmpdir)}): + vm.hda_disk_image = "test.ovf/test.vmdk" + assert vm.hda_disk_image == str(tmpdir / "QEMU" / "test.ovf" / "test.vmdk") + + def test_hdb_disk_image(vm, tmpdir): with patch("gns3server.config.Config.get_section_config", return_value={"images_path": str(tmpdir)}): diff --git a/tests/modules/test_manager.py b/tests/modules/test_manager.py index d76c61cc..6bc32726 100644 --- a/tests/modules/test_manager.py +++ b/tests/modules/test_manager.py @@ -123,6 +123,16 @@ def test_get_relative_image_path(qemu, tmpdir): assert qemu.get_relative_image_path("../test1.bin") == path1 +def test_get_relative_image_path_ova(qemu, tmpdir): + os.makedirs(str(tmpdir / "QEMU" / "test.ovf")) + path = str(tmpdir / "QEMU" / "test.ovf" / "test.bin") + open(path, 'w+').close() + + with patch("gns3server.config.Config.get_section_config", return_value={"images_path": str(tmpdir)}): + assert qemu.get_relative_image_path(path) == os.path.join("test.ovf", "test.bin") + assert qemu.get_relative_image_path(os.path.join("test.ovf", "test.bin")) == os.path.join("test.ovf", "test.bin") + + def test_list_images(loop, qemu, tmpdir): fake_images = ["a.bin", "b.bin", ".blu.bin", "a.bin.md5sum"] diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 00000000..33af1a0b --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +# +# Copyright (C) 2015 GNS3 Technologies Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +from gns3server.utils import force_unix_path + + +def test_force_unix_path(): + assert force_unix_path("a/b") == "a/b" + assert force_unix_path("a\\b") == "a/b" + assert force_unix_path("a\\b\\..\\c") == "a/c"