diff --git a/gns3server/db/repositories/templates.py b/gns3server/db/repositories/templates.py index 4e6f50b1..27cc41b7 100644 --- a/gns3server/db/repositories/templates.py +++ b/gns3server/db/repositories/templates.py @@ -16,9 +16,10 @@ # along with this program. If not, see . from uuid import UUID -from typing import List, Union -from sqlalchemy import select, update, delete +from typing import List, Union, Optional +from sqlalchemy import select, delete from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload from sqlalchemy.orm.session import make_transient from .base import BaseRepository @@ -41,19 +42,22 @@ TEMPLATE_TYPE_TO_MODEL = { class TemplatesRepository(BaseRepository): + def __init__(self, db_session: AsyncSession) -> None: super().__init__(db_session) async def get_template(self, template_id: UUID) -> Union[None, models.Template]: - query = select(models.Template).where(models.Template.template_id == template_id) + query = select(models.Template).\ + options(selectinload(models.Template.images)).\ + where(models.Template.template_id == template_id) result = await self._db_session.execute(query) return result.scalars().first() async def get_templates(self) -> List[models.Template]: - query = select(models.Template) + query = select(models.Template).options(selectinload(models.Template.images)) result = await self._db_session.execute(query) return result.scalars().all() @@ -66,20 +70,14 @@ class TemplatesRepository(BaseRepository): await self._db_session.refresh(db_template) return db_template - async def update_template(self, template_id: UUID, template_update: schemas.TemplateUpdate) -> schemas.Template: + async def update_template(self, db_template: models.Template, template_settings: dict) -> schemas.Template: - update_values = template_update.dict(exclude_unset=True) - - query = update(models.Template). \ - where(models.Template.template_id == template_id). \ - values(update_values) - - await self._db_session.execute(query) + # update the fields directly because update() query couldn't work + for key, value in template_settings.items(): + setattr(db_template, key, value) await self._db_session.commit() - template_db = await self.get_template(template_id) - if template_db: - await self._db_session.refresh(template_db) # force refresh of updated_at value - return template_db + await self._db_session.refresh(db_template) # force refresh of updated_at value + return db_template async def delete_template(self, template_id: UUID) -> bool: @@ -88,18 +86,72 @@ class TemplatesRepository(BaseRepository): await self._db_session.commit() return result.rowcount > 0 - async def duplicate_template(self, template_id: UUID) -> schemas.Template: + async def duplicate_template(self, template_id: UUID) -> Optional[schemas.Template]: - query = select(models.Template).where(models.Template.template_id == template_id) + query = select(models.Template).\ + options(selectinload(models.Template.images)).\ + where(models.Template.template_id == template_id) db_template = (await self._db_session.execute(query)).scalars().first() - if not db_template: - return db_template - - # duplicate db object with new primary key (template_id) - self._db_session.expunge(db_template) - make_transient(db_template) - db_template.template_id = None - self._db_session.add(db_template) - await self._db_session.commit() - await self._db_session.refresh(db_template) + if db_template: + # duplicate db object with new primary key (template_id) + self._db_session.expunge(db_template) + make_transient(db_template) + db_template.template_id = None + self._db_session.add(db_template) + await self._db_session.commit() + await self._db_session.refresh(db_template) return db_template + + async def get_image(self, image_name: str) -> Optional[models.Image]: + """ + Get an image by its name (filename). + """ + + query = select(models.Image).where(models.Image.filename == image_name) + result = await self._db_session.execute(query) + return result.scalars().first() + + async def add_image_to_template( + self, + template_id: UUID, + image: models.Image + ) -> Union[None, models.Template]: + """ + Add an image to template. + """ + + query = select(models.Template).\ + options(selectinload(models.Template.images)).\ + where(models.Template.template_id == template_id) + result = await self._db_session.execute(query) + template_in_db = result.scalars().first() + if not template_in_db: + return None + + template_in_db.images.append(image) + await self._db_session.commit() + await self._db_session.refresh(template_in_db) + return template_in_db + + async def remove_image_from_template( + self, + template_id: UUID, + image: models.Image + ) -> Union[None, models.Template]: + """ + Remove an image from a template. + """ + + query = select(models.Template).\ + options(selectinload(models.Template.images)).\ + where(models.Template.template_id == template_id) + result = await self._db_session.execute(query) + template_in_db = result.scalars().first() + if not template_in_db: + return None + + if image in template_in_db.images: + template_in_db.images.remove(image) + await self._db_session.commit() + await self._db_session.refresh(template_in_db) + return template_in_db diff --git a/gns3server/services/templates.py b/gns3server/services/templates.py index c36bfeb6..6683b39e 100644 --- a/gns3server/services/templates.py +++ b/gns3server/services/templates.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import os import uuid import pydantic @@ -22,6 +23,7 @@ from fastapi.encoders import jsonable_encoder from typing import List from gns3server import schemas +import gns3server.db.models as models from gns3server.db.repositories.templates import TemplatesRepository from gns3server.controller import Controller from gns3server.controller.controller_error import ( @@ -131,6 +133,7 @@ BUILTIN_TEMPLATES = [ class TemplatesService: + def __init__(self, templates_repo: TemplatesRepository): self._templates_repo = templates_repo @@ -152,6 +155,44 @@ class TemplatesService: templates.append(jsonable_encoder(builtin_template)) return templates + async def _find_image(self, image_name): + + image = await self._templates_repo.get_image(image_name) + if not image or not os.path.exists(image.path): + raise ControllerNotFoundError(f"Image {image_name} could not be found") + return image + + async def _find_images(self, template_type: str, settings: dict) -> List[models.Image]: + + images_to_add_to_template = [] + if template_type == "dynamips": + if settings["image"]: + image = await self._find_image(settings["image"]) + if image.image_type != "ios": + raise ControllerBadRequestError( + f"Image '{image.filename}' type is not 'ios' but '{image.image_type}'" + ) + images_to_add_to_template.append(image) + elif template_type == "iou": + if settings["path"]: + image = await self._find_image(settings["path"]) + if image.image_type != "iou": + raise ControllerBadRequestError( + f"Image '{image.filename}' type is not 'iou' but '{image.image_type}'" + ) + images_to_add_to_template.append(image) + elif template_type == "qemu": + for key, value in settings.items(): + if key.endswith("_image") and value: + image = await self._find_image(value) + if image.image_type != "qemu": + raise ControllerBadRequestError( + f"Image '{image.filename}' type is not 'qemu' but '{image.image_type}'" + ) + if image not in images_to_add_to_template: + images_to_add_to_template.append(image) + return images_to_add_to_template + async def create_template(self, template_create: schemas.TemplateCreate) -> dict: try: @@ -167,7 +208,11 @@ class TemplatesService: settings = dynamips_template_settings_with_defaults.dict() except pydantic.ValidationError as e: raise ControllerBadRequestError(f"JSON schema error received while creating new template: {e}") + + images_to_add_to_template = await self._find_images(template_create.template_type, settings) db_template = await self._templates_repo.create_template(template_create.template_type, settings) + for image in images_to_add_to_template: + await self._templates_repo.add_image_to_template(db_template.template_id, image) template = db_template.asjson() self._controller.notification.controller_emit("template.created", template) return template @@ -183,13 +228,34 @@ class TemplatesService: raise ControllerNotFoundError(f"Template '{template_id}' not found") return template + async def _remove_image(self, template_id: UUID, image:str) -> None: + + image = await self._templates_repo.get_image(image) + await self._templates_repo.remove_image_from_template(template_id, image) + async def update_template(self, template_id: UUID, template_update: schemas.TemplateUpdate) -> dict: if self.get_builtin_template(template_id): raise ControllerForbiddenError(f"Template '{template_id}' cannot be updated because it is built-in") - db_template = await self._templates_repo.update_template(template_id, template_update) + template_settings = jsonable_encoder(template_update, exclude_unset=True) + + db_template = await self._templates_repo.get_template(template_id) if not db_template: raise ControllerNotFoundError(f"Template '{template_id}' not found") + + images_to_add_to_template = await self._find_images(db_template.template_type, template_settings) + if db_template.template_type == "dynamips" and "image" in template_settings: + await self._remove_image(db_template.template_id, db_template.image) + elif db_template.template_type == "iou" and "path" in template_settings: + await self._remove_image(db_template.template_id, db_template.path) + elif db_template.template_type == "qemu": + for key in template_update.dict().keys(): + if key.endswith("_image") and key in template_settings: + await self._remove_image(db_template.template_id, db_template.__dict__[key]) + + db_template = await self._templates_repo.update_template(db_template, template_settings) + for image in images_to_add_to_template: + await self._templates_repo.add_image_to_template(db_template.template_id, image) template = db_template.asjson() self._controller.notification.controller_emit("template.updated", template) return template diff --git a/tests/api/routes/controller/test_templates.py b/tests/api/routes/controller/test_templates.py index 2c008558..5e174ebc 100644 --- a/tests/api/routes/controller/test_templates.py +++ b/tests/api/routes/controller/test_templates.py @@ -15,13 +15,18 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import os import pytest import uuid from pathlib import Path from fastapi import FastAPI, status from httpx import AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession +from tests.utils import asyncio_patch +from gns3server.db.repositories.images import ImagesRepository +from gns3server.db.repositories.templates import TemplatesRepository from gns3server.controller import Controller from gns3server.services.templates import BUILTIN_TEMPLATES @@ -91,7 +96,7 @@ class TestTemplateRoutes: assert response.status_code == status.HTTP_200_OK assert response.json()["template_id"] == template_id - params["name"] = "VPCS_TEST_RENAMED" + params = {"name": "VPCS_TEST_RENAMED", "console_auto_start": True} response = await client.put(app.url_path_for("update_template", template_id=template_id), json=params) assert response.status_code == status.HTTP_200_OK @@ -210,42 +215,43 @@ class TestDynamipsTemplate: "image": "c7200-adventerprisek9-mz.124-24.T5.image", "template_type": "dynamips"} - response = await client.post(app.url_path_for("create_template"), json=params) - assert response.status_code == status.HTTP_201_CREATED - assert response.json()["template_id"] is not None + with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock: + response = await client.post(app.url_path_for("create_template"), json=params) + assert mock.called + assert response.status_code == status.HTTP_201_CREATED + assert response.json()["template_id"] is not None - expected_response = {"template_type": "dynamips", - "auto_delete_disks": False, - "builtin": False, - "category": "router", - "compute_id": "local", - "console_auto_start": False, - "console_type": "telnet", - "default_name_format": "R{0}", - "disk0": 0, - "disk1": 0, - "exec_area": 64, - "idlemax": 500, - "idlepc": "", - "idlesleep": 30, - "image": "c7200-adventerprisek9-mz.124-24.T5.image", - "mac_addr": "", - "midplane": "vxr", - "mmap": True, - "name": "Cisco c7200 template", - "npe": "npe-400", - "nvram": 512, - "platform": "c7200", - "private_config": "", - "ram": 512, - "sparsemem": True, - "startup_config": "ios_base_startup-config.txt", - "symbol": ":/symbols/router.svg", - "system_id": "FTX0945W0MY"} - - for item, value in expected_response.items(): - assert response.json().get(item) == value + expected_response = {"template_type": "dynamips", + "auto_delete_disks": False, + "builtin": False, + "category": "router", + "compute_id": "local", + "console_auto_start": False, + "console_type": "telnet", + "default_name_format": "R{0}", + "disk0": 0, + "disk1": 0, + "exec_area": 64, + "idlemax": 500, + "idlepc": "", + "idlesleep": 30, + "image": "c7200-adventerprisek9-mz.124-24.T5.image", + "mac_addr": "", + "midplane": "vxr", + "mmap": True, + "name": "Cisco c7200 template", + "npe": "npe-400", + "nvram": 512, + "platform": "c7200", + "private_config": "", + "ram": 512, + "sparsemem": True, + "startup_config": "ios_base_startup-config.txt", + "symbol": ":/symbols/router.svg", + "system_id": "FTX0945W0MY"} + for item, value in expected_response.items(): + assert response.json().get(item) == value async def test_c3745_dynamips_template_create(self, app: FastAPI, client: AsyncClient) -> None: @@ -255,40 +261,42 @@ class TestDynamipsTemplate: "image": "c3745-adventerprisek9-mz.124-25d.image", "template_type": "dynamips"} - response = await client.post(app.url_path_for("create_template"), json=params) - assert response.status_code == status.HTTP_201_CREATED - assert response.json()["template_id"] is not None + with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock: + response = await client.post(app.url_path_for("create_template"), json=params) + assert mock.called + assert response.status_code == status.HTTP_201_CREATED + assert response.json()["template_id"] is not None - expected_response = {"template_type": "dynamips", - "auto_delete_disks": False, - "builtin": False, - "category": "router", - "compute_id": "local", - "console_auto_start": False, - "console_type": "telnet", - "default_name_format": "R{0}", - "disk0": 0, - "disk1": 0, - "exec_area": 64, - "idlemax": 500, - "idlepc": "", - "idlesleep": 30, - "image": "c3745-adventerprisek9-mz.124-25d.image", - "mac_addr": "", - "mmap": True, - "name": "Cisco c3745 template", - "iomem": 5, - "nvram": 256, - "platform": "c3745", - "private_config": "", - "ram": 256, - "sparsemem": True, - "startup_config": "ios_base_startup-config.txt", - "symbol": ":/symbols/router.svg", - "system_id": "FTX0945W0MY"} + expected_response = {"template_type": "dynamips", + "auto_delete_disks": False, + "builtin": False, + "category": "router", + "compute_id": "local", + "console_auto_start": False, + "console_type": "telnet", + "default_name_format": "R{0}", + "disk0": 0, + "disk1": 0, + "exec_area": 64, + "idlemax": 500, + "idlepc": "", + "idlesleep": 30, + "image": "c3745-adventerprisek9-mz.124-25d.image", + "mac_addr": "", + "mmap": True, + "name": "Cisco c3745 template", + "iomem": 5, + "nvram": 256, + "platform": "c3745", + "private_config": "", + "ram": 256, + "sparsemem": True, + "startup_config": "ios_base_startup-config.txt", + "symbol": ":/symbols/router.svg", + "system_id": "FTX0945W0MY"} - for item, value in expected_response.items(): - assert response.json().get(item) == value + for item, value in expected_response.items(): + assert response.json().get(item) == value async def test_c3725_dynamips_template_create(self, app: FastAPI, client: AsyncClient) -> None: @@ -298,40 +306,42 @@ class TestDynamipsTemplate: "image": "c3725-adventerprisek9-mz.124-25d.image", "template_type": "dynamips"} - response = await client.post(app.url_path_for("create_template"), json=params) - assert response.status_code == status.HTTP_201_CREATED - assert response.json()["template_id"] is not None + with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock: + response = await client.post(app.url_path_for("create_template"), json=params) + assert mock.called + assert response.status_code == status.HTTP_201_CREATED + assert response.json()["template_id"] is not None - expected_response = {"template_type": "dynamips", - "auto_delete_disks": False, - "builtin": False, - "category": "router", - "compute_id": "local", - "console_auto_start": False, - "console_type": "telnet", - "default_name_format": "R{0}", - "disk0": 0, - "disk1": 0, - "exec_area": 64, - "idlemax": 500, - "idlepc": "", - "idlesleep": 30, - "image": "c3725-adventerprisek9-mz.124-25d.image", - "mac_addr": "", - "mmap": True, - "name": "Cisco c3725 template", - "iomem": 5, - "nvram": 256, - "platform": "c3725", - "private_config": "", - "ram": 128, - "sparsemem": True, - "startup_config": "ios_base_startup-config.txt", - "symbol": ":/symbols/router.svg", - "system_id": "FTX0945W0MY"} + expected_response = {"template_type": "dynamips", + "auto_delete_disks": False, + "builtin": False, + "category": "router", + "compute_id": "local", + "console_auto_start": False, + "console_type": "telnet", + "default_name_format": "R{0}", + "disk0": 0, + "disk1": 0, + "exec_area": 64, + "idlemax": 500, + "idlepc": "", + "idlesleep": 30, + "image": "c3725-adventerprisek9-mz.124-25d.image", + "mac_addr": "", + "mmap": True, + "name": "Cisco c3725 template", + "iomem": 5, + "nvram": 256, + "platform": "c3725", + "private_config": "", + "ram": 128, + "sparsemem": True, + "startup_config": "ios_base_startup-config.txt", + "symbol": ":/symbols/router.svg", + "system_id": "FTX0945W0MY"} - for item, value in expected_response.items(): - assert response.json().get(item) == value + for item, value in expected_response.items(): + assert response.json().get(item) == value async def test_c3600_dynamips_template_create(self, app: FastAPI, client: AsyncClient) -> None: @@ -342,41 +352,43 @@ class TestDynamipsTemplate: "image": "c3660-a3jk9s-mz.124-25d.image", "template_type": "dynamips"} - response = await client.post(app.url_path_for("create_template"), json=params) - assert response.status_code == status.HTTP_201_CREATED - assert response.json()["template_id"] is not None + with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock: + response = await client.post(app.url_path_for("create_template"), json=params) + assert mock.called + assert response.status_code == status.HTTP_201_CREATED + assert response.json()["template_id"] is not None - expected_response = {"template_type": "dynamips", - "auto_delete_disks": False, - "builtin": False, - "category": "router", - "compute_id": "local", - "console_auto_start": False, - "console_type": "telnet", - "default_name_format": "R{0}", - "disk0": 0, - "disk1": 0, - "exec_area": 64, - "idlemax": 500, - "idlepc": "", - "idlesleep": 30, - "image": "c3660-a3jk9s-mz.124-25d.image", - "mac_addr": "", - "mmap": True, - "name": "Cisco c3600 template", - "iomem": 5, - "nvram": 128, - "platform": "c3600", - "chassis": "3660", - "private_config": "", - "ram": 192, - "sparsemem": True, - "startup_config": "ios_base_startup-config.txt", - "symbol": ":/symbols/router.svg", - "system_id": "FTX0945W0MY"} + expected_response = {"template_type": "dynamips", + "auto_delete_disks": False, + "builtin": False, + "category": "router", + "compute_id": "local", + "console_auto_start": False, + "console_type": "telnet", + "default_name_format": "R{0}", + "disk0": 0, + "disk1": 0, + "exec_area": 64, + "idlemax": 500, + "idlepc": "", + "idlesleep": 30, + "image": "c3660-a3jk9s-mz.124-25d.image", + "mac_addr": "", + "mmap": True, + "name": "Cisco c3600 template", + "iomem": 5, + "nvram": 128, + "platform": "c3600", + "chassis": "3660", + "private_config": "", + "ram": 192, + "sparsemem": True, + "startup_config": "ios_base_startup-config.txt", + "symbol": ":/symbols/router.svg", + "system_id": "FTX0945W0MY"} - for item, value in expected_response.items(): - assert response.json().get(item) == value + for item, value in expected_response.items(): + assert response.json().get(item) == value async def test_c3600_dynamips_template_create_wrong_chassis(self, app: FastAPI, client: AsyncClient) -> None: @@ -398,40 +410,42 @@ class TestDynamipsTemplate: "image": "c2691-adventerprisek9-mz.124-25d.image", "template_type": "dynamips"} - response = await client.post(app.url_path_for("create_template"), json=params) - assert response.status_code == status.HTTP_201_CREATED - assert response.json()["template_id"] is not None + with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock: + response = await client.post(app.url_path_for("create_template"), json=params) + assert mock.called + assert response.status_code == status.HTTP_201_CREATED + assert response.json()["template_id"] is not None - expected_response = {"template_type": "dynamips", - "auto_delete_disks": False, - "builtin": False, - "category": "router", - "compute_id": "local", - "console_auto_start": False, - "console_type": "telnet", - "default_name_format": "R{0}", - "disk0": 0, - "disk1": 0, - "exec_area": 64, - "idlemax": 500, - "idlepc": "", - "idlesleep": 30, - "image": "c2691-adventerprisek9-mz.124-25d.image", - "mac_addr": "", - "mmap": True, - "name": "Cisco c2691 template", - "iomem": 5, - "nvram": 256, - "platform": "c2691", - "private_config": "", - "ram": 192, - "sparsemem": True, - "startup_config": "ios_base_startup-config.txt", - "symbol": ":/symbols/router.svg", - "system_id": "FTX0945W0MY"} + expected_response = {"template_type": "dynamips", + "auto_delete_disks": False, + "builtin": False, + "category": "router", + "compute_id": "local", + "console_auto_start": False, + "console_type": "telnet", + "default_name_format": "R{0}", + "disk0": 0, + "disk1": 0, + "exec_area": 64, + "idlemax": 500, + "idlepc": "", + "idlesleep": 30, + "image": "c2691-adventerprisek9-mz.124-25d.image", + "mac_addr": "", + "mmap": True, + "name": "Cisco c2691 template", + "iomem": 5, + "nvram": 256, + "platform": "c2691", + "private_config": "", + "ram": 192, + "sparsemem": True, + "startup_config": "ios_base_startup-config.txt", + "symbol": ":/symbols/router.svg", + "system_id": "FTX0945W0MY"} - for item, value in expected_response.items(): - assert response.json().get(item) == value + for item, value in expected_response.items(): + assert response.json().get(item) == value async def test_c2600_dynamips_template_create(self, app: FastAPI, client: AsyncClient) -> None: @@ -442,41 +456,43 @@ class TestDynamipsTemplate: "image": "c2600-adventerprisek9-mz.124-25d.image", "template_type": "dynamips"} - response = await client.post(app.url_path_for("create_template"), json=params) - assert response.status_code == status.HTTP_201_CREATED - assert response.json()["template_id"] is not None + with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock: + response = await client.post(app.url_path_for("create_template"), json=params) + assert mock.called + assert response.status_code == status.HTTP_201_CREATED + assert response.json()["template_id"] is not None - expected_response = {"template_type": "dynamips", - "auto_delete_disks": False, - "builtin": False, - "category": "router", - "compute_id": "local", - "console_auto_start": False, - "console_type": "telnet", - "default_name_format": "R{0}", - "disk0": 0, - "disk1": 0, - "exec_area": 64, - "idlemax": 500, - "idlepc": "", - "idlesleep": 30, - "image": "c2600-adventerprisek9-mz.124-25d.image", - "mac_addr": "", - "mmap": True, - "name": "Cisco c2600 template", - "iomem": 15, - "nvram": 128, - "platform": "c2600", - "chassis": "2651XM", - "private_config": "", - "ram": 160, - "sparsemem": True, - "startup_config": "ios_base_startup-config.txt", - "symbol": ":/symbols/router.svg", - "system_id": "FTX0945W0MY"} + expected_response = {"template_type": "dynamips", + "auto_delete_disks": False, + "builtin": False, + "category": "router", + "compute_id": "local", + "console_auto_start": False, + "console_type": "telnet", + "default_name_format": "R{0}", + "disk0": 0, + "disk1": 0, + "exec_area": 64, + "idlemax": 500, + "idlepc": "", + "idlesleep": 30, + "image": "c2600-adventerprisek9-mz.124-25d.image", + "mac_addr": "", + "mmap": True, + "name": "Cisco c2600 template", + "iomem": 15, + "nvram": 128, + "platform": "c2600", + "chassis": "2651XM", + "private_config": "", + "ram": 160, + "sparsemem": True, + "startup_config": "ios_base_startup-config.txt", + "symbol": ":/symbols/router.svg", + "system_id": "FTX0945W0MY"} - for item, value in expected_response.items(): - assert response.json().get(item) == value + for item, value in expected_response.items(): + assert response.json().get(item) == value async def test_c2600_dynamips_template_create_wrong_chassis(self, app: FastAPI, client: AsyncClient) -> None: @@ -499,41 +515,43 @@ class TestDynamipsTemplate: "image": "c1700-adventerprisek9-mz.124-25d.image", "template_type": "dynamips"} - response = await client.post(app.url_path_for("create_template"), json=params) - assert response.status_code == status.HTTP_201_CREATED - assert response.json()["template_id"] is not None + with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock: + response = await client.post(app.url_path_for("create_template"), json=params) + assert mock.called + assert response.status_code == status.HTTP_201_CREATED + assert response.json()["template_id"] is not None - expected_response = {"template_type": "dynamips", - "auto_delete_disks": False, - "builtin": False, - "category": "router", - "compute_id": "local", - "console_auto_start": False, - "console_type": "telnet", - "default_name_format": "R{0}", - "disk0": 0, - "disk1": 0, - "exec_area": 64, - "idlemax": 500, - "idlepc": "", - "idlesleep": 30, - "image": "c1700-adventerprisek9-mz.124-25d.image", - "mac_addr": "", - "mmap": True, - "name": "Cisco c1700 template", - "iomem": 15, - "nvram": 128, - "platform": "c1700", - "chassis": "1760", - "private_config": "", - "ram": 160, - "sparsemem": False, - "startup_config": "ios_base_startup-config.txt", - "symbol": ":/symbols/router.svg", - "system_id": "FTX0945W0MY"} + expected_response = {"template_type": "dynamips", + "auto_delete_disks": False, + "builtin": False, + "category": "router", + "compute_id": "local", + "console_auto_start": False, + "console_type": "telnet", + "default_name_format": "R{0}", + "disk0": 0, + "disk1": 0, + "exec_area": 64, + "idlemax": 500, + "idlepc": "", + "idlesleep": 30, + "image": "c1700-adventerprisek9-mz.124-25d.image", + "mac_addr": "", + "mmap": True, + "name": "Cisco c1700 template", + "iomem": 15, + "nvram": 128, + "platform": "c1700", + "chassis": "1760", + "private_config": "", + "ram": 160, + "sparsemem": False, + "startup_config": "ios_base_startup-config.txt", + "symbol": ":/symbols/router.svg", + "system_id": "FTX0945W0MY"} - for item, value in expected_response.items(): - assert response.json().get(item) == value + for item, value in expected_response.items(): + assert response.json().get(item) == value async def test_c1700_dynamips_template_create_wrong_chassis(self, app: FastAPI, client: AsyncClient) -> None: @@ -569,31 +587,33 @@ class TestIOUTemplate: "path": image_path, "template_type": "iou"} - response = await client.post(app.url_path_for("create_template"), json=params) - assert response.status_code == status.HTTP_201_CREATED - assert response.json()["template_id"] is not None + with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock: + response = await client.post(app.url_path_for("create_template"), json=params) + assert mock.called + assert response.status_code == status.HTTP_201_CREATED + assert response.json()["template_id"] is not None - expected_response = {"template_type": "iou", - "builtin": False, - "category": "router", - "compute_id": "local", - "console_auto_start": False, - "console_type": "telnet", - "default_name_format": "IOU{0}", - "ethernet_adapters": 2, - "name": "IOU template", - "nvram": 128, - "path": image_path, - "private_config": "", - "ram": 256, - "serial_adapters": 2, - "startup_config": "iou_l3_base_startup-config.txt", - "symbol": ":/symbols/multilayer_switch.svg", - "use_default_iou_values": True, - "l1_keepalives": False} + expected_response = {"template_type": "iou", + "builtin": False, + "category": "router", + "compute_id": "local", + "console_auto_start": False, + "console_type": "telnet", + "default_name_format": "IOU{0}", + "ethernet_adapters": 2, + "name": "IOU template", + "nvram": 128, + "path": image_path, + "private_config": "", + "ram": 256, + "serial_adapters": 2, + "startup_config": "iou_l3_base_startup-config.txt", + "symbol": ":/symbols/multilayer_switch.svg", + "use_default_iou_values": True, + "l1_keepalives": False} - for item, value in expected_response.items(): - assert response.json().get(item) == value + for item, value in expected_response.items(): + assert response.json().get(item) == value class TestDockerTemplate: @@ -643,54 +663,57 @@ class TestQemuTemplate: "ram": 512, "template_type": "qemu"} - response = await client.post(app.url_path_for("create_template"), json=params) - assert response.status_code == status.HTTP_201_CREATED - assert response.json()["template_id"] is not None + with asyncio_patch("gns3server.services.templates.TemplatesService._find_images", return_value=[]) as mock: + response = await client.post(app.url_path_for("create_template"), json=params) + assert mock.called + assert response.status_code == status.HTTP_201_CREATED + assert response.json()["template_id"] is not None - expected_response = {"adapter_type": "e1000", - "adapters": 1, - "template_type": "qemu", - "bios_image": "", - "boot_priority": "c", - "builtin": False, - "category": "guest", - "cdrom_image": "", - "compute_id": "local", - "console_auto_start": False, - "console_type": "telnet", - "cpu_throttling": 0, - "cpus": 1, - "default_name_format": "{name}-{0}", - "first_port_name": "", - "hda_disk_image": "IOSvL2-15.2.4.0.55E.qcow2", - "hda_disk_interface": "none", - "hdb_disk_image": "", - "hdb_disk_interface": "none", - "hdc_disk_image": "", - "hdc_disk_interface": "none", - "hdd_disk_image": "", - "hdd_disk_interface": "none", - "initrd": "", - "kernel_command_line": "", - "kernel_image": "", - "legacy_networking": False, - "linked_clone": True, - "mac_address": "", - "name": "Qemu template", - "on_close": "power_off", - "options": "", - "platform": "i386", - "port_name_format": "Ethernet{0}", - "port_segment_size": 0, - "process_priority": "normal", - "qemu_path": "", - "ram": 512, - "symbol": ":/symbols/qemu_guest.svg", - "usage": "", - "custom_adapters": []} + expected_response = {"adapter_type": "e1000", + "adapters": 1, + "template_type": "qemu", + "bios_image": "", + "boot_priority": "c", + "builtin": False, + "category": "guest", + "cdrom_image": "", + "compute_id": "local", + "console_auto_start": False, + "console_type": "telnet", + "cpu_throttling": 0, + "cpus": 1, + "default_name_format": "{name}-{0}", + "first_port_name": "", + "hda_disk_image": "IOSvL2-15.2.4.0.55E.qcow2", + "hda_disk_interface": "none", + "hdb_disk_image": "", + "hdb_disk_interface": "none", + "hdc_disk_image": "", + "hdc_disk_interface": "none", + "hdd_disk_image": "", + "hdd_disk_interface": "none", + "initrd": "", + "kernel_command_line": "", + "kernel_image": "", + "legacy_networking": False, + "linked_clone": True, + "mac_address": "", + "name": "Qemu template", + "on_close": "power_off", + "options": "", + "platform": "i386", + "port_name_format": "Ethernet{0}", + "port_segment_size": 0, + "process_priority": "normal", + "qemu_path": "", + "ram": 512, + "symbol": ":/symbols/qemu_guest.svg", + "usage": "", + "custom_adapters": []} + + for item, value in expected_response.items(): + assert response.json().get(item) == value - for item, value in expected_response.items(): - assert response.json().get(item) == value class TestVMwareTemplate: @@ -944,3 +967,204 @@ class TestCloudTemplate: for item, value in expected_response.items(): assert response.json().get(item) == value + + +class TestImageAssociationWithTemplate: + + @pytest.mark.parametrize( + "image_name, image_type, params", + ( + ( + "c7200-adventerprisek9-mz.124-24.T5.image", + "ios", + { + "template_id": "6d85c8db-640f-4547-8955-bc132f7d7196", + "name": "Cisco c7200 template", + "platform": "c7200", + "compute_id": "local", + "image": "", + "template_type": "dynamips" + } + ), + ( + "i86bi_linux-ipbase-ms-12.4.bin", + "iou", + { + "template_id": "0014185e-bdfe-454b-86cd-9009c23900c5", + "name": "IOU template", + "compute_id": "local", + "path": "", + "template_type": "iou" + } + ), + ( + "image.qcow2", + "qemu", + { + "template_id": "97ef56a5-7ae4-4795-ad4c-e7dcdd745cff", + "name": "Qemu template", + "compute_id": "local", + "platform": "i386", + "hda_disk_image": "", + "hdb_disk_image": "", + "hdc_disk_image": "", + "hdd_disk_image": "", + "cdrom_image": "", + "kernel_image": "", + "bios_image": "", + "ram": 512, + "template_type": "qemu" + } + ), + ), + ) + async def test_template_create_with_images( + self, + app: FastAPI, + client: AsyncClient, + db_session: AsyncSession, + tmpdir: str, + image_name: str, + image_type: str, + params: dict + ) -> None: + + path = os.path.join(tmpdir, image_name) + with open(path, "wb+") as f: + f.write(b'\x42\x42\x42\x42') + images_repo = ImagesRepository(db_session) + await images_repo.add_image(image_name, image_type, 42, path, "e342eb86c1229b6c154367a5476969b5", "md5") + for key, value in params.items(): + if value == "": + params[key] = image_name + response = await client.post(app.url_path_for("create_template"), json=params) + assert response.status_code == status.HTTP_201_CREATED + + templates_repo = TemplatesRepository(db_session) + db_template = await templates_repo.get_template(uuid.UUID(params["template_id"])) + assert len(db_template.images) == 1 + assert db_template.images[0].filename == image_name + + @pytest.mark.parametrize( + "image_name, image_type, template_id, params", + ( + ( + "c7200-adventerprisek9-mz.155-2.XB.image", + "ios", + "6d85c8db-640f-4547-8955-bc132f7d7196", + { + "image": "", + } + ), + ( + "i86bi-linux-l2-adventerprisek9-15.2d.bin", + "iou", + "0014185e-bdfe-454b-86cd-9009c23900c5", + { + "path": "", + } + ), + ( + "new_image.qcow2", + "qemu", + "97ef56a5-7ae4-4795-ad4c-e7dcdd745cff", + { + "hda_disk_image": "", + "hdb_disk_image": "", + "hdc_disk_image": "", + "hdd_disk_image": "", + "cdrom_image": "", + "kernel_image": "", + "bios_image": "", + } + ), + ), + ) + async def test_template_update_with_images( + self, + app: FastAPI, + client: AsyncClient, + db_session: AsyncSession, + tmpdir: str, + image_name: str, + image_type: str, + template_id: str, + params: dict + ) -> None: + + path = os.path.join(tmpdir, image_name) + with open(path, "wb+") as f: + f.write(b'\x42\x42\x42\x42') + images_repo = ImagesRepository(db_session) + await images_repo.add_image(image_name, image_type, 42, path, "e342eb86c1229b6c154367a5476969b5", "md5") + + for key, value in params.items(): + if value == "": + params[key] = image_name + response = await client.put(app.url_path_for("update_template", template_id=template_id), json=params) + assert response.status_code == status.HTTP_200_OK + + templates_repo = TemplatesRepository(db_session) + db_template = await templates_repo.get_template(uuid.UUID(template_id)) + assert len(db_template.images) == 1 + assert db_template.images[0].filename == image_name + + @pytest.mark.parametrize( + "template_id, params", + ( + ( + "6d85c8db-640f-4547-8955-bc132f7d7196", + { + "image": "", + } + ), + ( + "0014185e-bdfe-454b-86cd-9009c23900c5", + { + "path": "", + } + ), + ( + "97ef56a5-7ae4-4795-ad4c-e7dcdd745cff", + { + "hda_disk_image": "", + "hdb_disk_image": "", + "hdc_disk_image": "", + "hdd_disk_image": "", + "cdrom_image": "", + "kernel_image": "", + "bios_image": "", + } + ), + ), + ) + async def test_remove_images_from_template( + self, + app: FastAPI, + client: AsyncClient, + db_session: AsyncSession, + template_id: str, + params: dict + ) -> None: + + for key, value in params.items(): + if value == "": + params[key] = "" + response = await client.put(app.url_path_for("update_template", template_id=template_id), json=params) + assert response.status_code == status.HTTP_200_OK + + templates_repo = TemplatesRepository(db_session) + db_template = await templates_repo.get_template(uuid.UUID(template_id)) + assert len(db_template.images) == 0 + + async def test_template_create_with_non_existing_image(self, app: FastAPI, client: AsyncClient) -> None: + + params = {"name": "Qemu template", + "compute_id": "local", + "platform": "i386", + "hda_disk_image": "unkown_image.qcow2", + "ram": 512, + "template_type": "qemu"} + + response = await client.post(app.url_path_for("create_template"), json=params) + assert response.status_code == status.HTTP_404_NOT_FOUND