Validate appliance files with Pydantic

This commit is contained in:
grossmj 2021-10-18 21:46:50 +10:30
parent 04934691df
commit 13ff7df9fa
6 changed files with 491 additions and 22 deletions

View File

@ -18,11 +18,9 @@
API routes for appliances. API routes for appliances.
""" """
import os
import logging import logging
from fastapi import APIRouter, Depends, Response, status from fastapi import APIRouter, Depends, Response, status
from fastapi.responses import FileResponse
from typing import Optional, List from typing import Optional, List
from uuid import UUID from uuid import UUID
@ -42,7 +40,10 @@ router = APIRouter()
@router.get("") @router.get("")
async def get_appliances(update: Optional[bool] = False, symbol_theme: Optional[str] = "Classic") -> List[dict]: async def get_appliances(
update: Optional[bool] = False,
symbol_theme: Optional[str] = "Classic"
) -> List[schemas.Appliance]:
""" """
Return all appliances known by the controller. Return all appliances known by the controller.
""" """
@ -54,21 +55,17 @@ async def get_appliances(update: Optional[bool] = False, symbol_theme: Optional[
return [c.asdict() for c in controller.appliance_manager.appliances.values()] return [c.asdict() for c in controller.appliance_manager.appliances.values()]
@router.get("/{appliance_id}/download") @router.get("/{appliance_id}")
def download_appliance(appliance_id: UUID) -> FileResponse: def get_appliance(appliance_id: UUID) -> schemas.Appliance:
""" """
Download an appliance file. Get an appliance file.
""" """
controller = Controller.instance() controller = Controller.instance()
appliance = controller.appliance_manager.appliances.get(str(appliance_id)) appliance = controller.appliance_manager.appliances.get(str(appliance_id))
if not appliance: if not appliance:
raise ControllerNotFoundError(message=f"Could not find appliance '{appliance_id}'") raise ControllerNotFoundError(message=f"Could not find appliance '{appliance_id}'")
return appliance.asdict()
if not os.path.exists(appliance.path):
raise ControllerNotFoundError(message=f"Could not find appliance file '{appliance.path}'")
return FileResponse(appliance.path, media_type="application/json")
@router.post("/{appliance_id}/install", status_code=status.HTTP_204_NO_CONTENT) @router.post("/{appliance_id}/install", status_code=status.HTTP_204_NO_CONTENT)

View File

@ -16,7 +16,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy import copy
import uuid
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -27,12 +26,9 @@ class Appliance:
def __init__(self, path, data, builtin=True): def __init__(self, path, data, builtin=True):
self._data = data.copy() self._data = data.copy()
self._id = data.get("appliance_id", uuid.uuid5(uuid.NAMESPACE_X500, path)) self._id = self._data.get("appliance_id")
self._path = path self._path = path
self._builtin = builtin self._builtin = builtin
if "appliance_id" in self._data:
del self._data["appliance_id"]
if self.status != "broken": if self.status != "broken":
log.debug(f'Appliance "{self.name}" [{self._id}] loaded') log.debug(f'Appliance "{self.name}" [{self._id}] loaded')
@ -84,6 +80,7 @@ class Appliance:
""" """
Appliance data (a hash) Appliance data (a hash)
""" """
data = copy.deepcopy(self._data) data = copy.deepcopy(self._data)
data["builtin"] = self._builtin data["builtin"] = self._builtin
return data return data

View File

@ -214,6 +214,10 @@ class ApplianceManager:
appliances_info = self._find_appliances_from_image_checksum(image_checksum) appliances_info = self._find_appliances_from_image_checksum(image_checksum)
for appliance, image_version in appliances_info: for appliance, image_version in appliances_info:
try:
schemas.Appliance.parse_obj(appliance.asdict())
except ValidationError as e:
log.warning(message=f"Could not validate appliance '{appliance.id}': {e}")
if appliance.versions: if appliance.versions:
for version in appliance.versions: for version in appliance.versions:
if version.get("name") == image_version: if version.get("name") == image_version:
@ -241,6 +245,11 @@ class ApplianceManager:
if not appliance: if not appliance:
raise ControllerNotFoundError(message=f"Could not find appliance '{appliance_id}'") raise ControllerNotFoundError(message=f"Could not find appliance '{appliance_id}'")
try:
schemas.Appliance.parse_obj(appliance.asdict())
except ValidationError as e:
raise ControllerError(message=f"Could not validate appliance '{appliance_id}': {e}")
if version: if version:
if not appliance.versions: if not appliance.versions:
raise ControllerBadRequestError(message=f"Appliance '{appliance_id}' do not have versions") raise ControllerBadRequestError(message=f"Appliance '{appliance_id}' do not have versions")
@ -289,16 +298,18 @@ class ApplianceManager:
path = os.path.join(directory, file) path = os.path.join(directory, file)
try: try:
with open(path, encoding="utf-8") as f: with open(path, encoding="utf-8") as f:
appliance = Appliance(path, json.load(f), builtin=builtin) json_data = json.load(f)
json_data = appliance.asdict() # Check if loaded without error schemas.Appliance.parse_obj(json_data)
appliance = Appliance(path, json_data, builtin=builtin)
appliance_data = appliance.asdict() # Check if loaded without error
if appliance.status != "broken": if appliance.status != "broken":
self._appliances[appliance.id] = appliance self._appliances[appliance.id] = appliance
if not appliance.symbol or appliance.symbol.startswith(":/symbols/"): if not appliance.symbol or appliance.symbol.startswith(":/symbols/"):
# apply a default symbol if the appliance has none or a default symbol # apply a default symbol if the appliance has none or a default symbol
default_symbol = self._get_default_symbol(json_data, symbol_theme) default_symbol = self._get_default_symbol(appliance_data, symbol_theme)
if default_symbol: if default_symbol:
appliance.symbol = default_symbol appliance.symbol = default_symbol
except (ValueError, OSError, KeyError) as e: except (ValueError, OSError, KeyError, ValidationError) as e:
log.warning(f"Cannot load appliance file '{path}': {e}") log.warning(f"Cannot load appliance file '{path}': {e}")
continue continue

View File

@ -24,6 +24,7 @@ from .controller.links import LinkCreate, LinkUpdate, Link
from .controller.computes import ComputeCreate, ComputeUpdate, AutoIdlePC, Compute from .controller.computes import ComputeCreate, ComputeUpdate, AutoIdlePC, Compute
from .controller.templates import TemplateCreate, TemplateUpdate, TemplateUsage, Template from .controller.templates import TemplateCreate, TemplateUpdate, TemplateUsage, Template
from .controller.images import Image, ImageType from .controller.images import Image, ImageType
from .controller.appliances import Appliance
from .controller.drawings import Drawing from .controller.drawings import Drawing
from .controller.gns3vm import GNS3VM from .controller.gns3vm import GNS3VM
from .controller.nodes import NodeCreate, NodeUpdate, NodeDuplicate, NodeCapture, Node from .controller.nodes import NodeCreate, NodeUpdate, NodeDuplicate, NodeCapture, Node

View File

@ -0,0 +1,463 @@
#
# Copyright (C) 2021 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from enum import Enum
from typing import List, Optional, Union
from uuid import UUID
from pydantic import AnyUrl, BaseModel, EmailStr, Field, confloat, conint, constr
class Category(Enum):
router = 'router'
multilayer_switch = 'multilayer_switch'
switch = 'switch'
firewall = 'firewall'
guest = 'guest'
class RegistryVersion(Enum):
version1 = 1
version2 = 2
version3 = 3
version4 = 4
version5 = 5
version6 = 6
class Status(Enum):
stable = 'stable'
experimental = 'experimental'
broken = 'broken'
class Availability(Enum):
free = 'free'
with_registration = 'with-registration'
free_to_try = 'free-to-try'
service_contract = 'service-contract'
class ConsoleType(Enum):
telnet = 'telnet'
vnc = 'vnc'
http = 'http'
https = 'https'
none = 'none'
class Docker(BaseModel):
adapters: int = Field(..., title='Number of ethernet adapters')
image: str = Field(..., title='Docker image in the Docker Hub')
start_command: Optional[str] = Field(
None,
title='Command executed when the container start. Empty will use the default',
)
environment: Optional[str] = Field(None, title='One KEY=VAR environment by line')
console_type: Optional[ConsoleType] = Field(
None, title='Type of console connection for the administration of the appliance'
)
console_http_port: Optional[int] = Field(
None, description='Internal port in the container of the HTTP server'
)
console_http_path: Optional[str] = Field(
None, description='Path of the web interface'
)
extra_hosts: Optional[str] = Field(
None, description='Hosts which will be written to /etc/hosts into container'
)
extra_volumes: Optional[List[str]] = Field(
None,
description='Additional directories to make persistent that are not included in the images VOLUME directive',
)
class Iou(BaseModel):
ethernet_adapters: int = Field(..., title='Number of ethernet adapters')
serial_adapters: int = Field(..., title='Number of serial adapters')
nvram: int = Field(..., title='Host NVRAM')
ram: int = Field(..., title='Host RAM')
startup_config: str = Field(..., title='Config loaded at startup')
class Chassis(Enum):
chassis_1720 = '1720'
chassis_1721 = '1721'
chassis_1750 = '1750'
chassis_1751 = '1751'
chassis_1760 = '1760'
chassis_2610 = '2610'
chassis_2620 = '2620'
chassis_2610XM = '2610XM'
chassis_2620XM = '2620XM'
chassis_2650XM = '2650XM'
chassis_2621 = '2621'
chassis_2611XM = '2611XM'
chassis_2621XM = '2621XM'
chassis_2651XM = '2651XM'
chassis_3620 = '3620'
chassis_3640 = '3640'
chassis_3660 = '3660'
class Platform(Enum):
c1700 = 'c1700'
c2600 = 'c2600'
c2691 = 'c2691'
c3725 = 'c3725'
c3745 = 'c3745'
c3600 = 'c3600'
c7200 = 'c7200'
class Midplane(Enum):
std = 'std'
vxr = 'vxr'
class Npe(Enum):
npe_100 = 'npe-100'
npe_150 = 'npe-150'
npe_175 = 'npe-175'
npe_200 = 'npe-200'
npe_225 = 'npe-225'
npe_300 = 'npe-300'
npe_400 = 'npe-400'
npe_g2 = 'npe-g2'
class AdapterType(Enum):
e1000 = 'e1000'
e1000_82544gc = 'e1000-82544gc'
e1000_82545em = 'e1000-82545em'
e1000e = 'e1000e'
i82550 = 'i82550'
i82551 = 'i82551'
i82557a = 'i82557a'
i82557b = 'i82557b'
i82557c = 'i82557c'
i82558a = 'i82558a'
i82558b = 'i82558b'
i82559a = 'i82559a'
i82559b = 'i82559b'
i82559c = 'i82559c'
i82559er = 'i82559er'
i82562 = 'i82562'
i82801 = 'i82801'
ne2k_pci = 'ne2k_pci'
pcnet = 'pcnet'
rocker = 'rocker'
rtl8139 = 'rtl8139'
virtio = 'virtio'
virtio_net_pci = 'virtio-net-pci'
vmxnet3 = 'vmxnet3'
class DiskInterface(Enum):
ide = 'ide'
sata = 'sata'
nvme = 'nvme'
scsi = 'scsi'
sd = 'sd'
mtd = 'mtd'
floppy = 'floppy'
pflash = 'pflash'
virtio = 'virtio'
none = 'none'
class Arch(Enum):
aarch64 = 'aarch64'
alpha = 'alpha'
arm = 'arm'
cris = 'cris'
i386 = 'i386'
lm32 = 'lm32'
m68k = 'm68k'
microblaze = 'microblaze'
microblazeel = 'microblazeel'
mips = 'mips'
mips64 = 'mips64'
mips64el = 'mips64el'
mipsel = 'mipsel'
moxie = 'moxie'
or32 = 'or32'
ppc = 'ppc'
ppc64 = 'ppc64'
ppcemb = 'ppcemb'
s390x = 's390x'
sh4 = 'sh4'
sh4eb = 'sh4eb'
sparc = 'sparc'
sparc64 = 'sparc64'
tricore = 'tricore'
unicore32 = 'unicore32'
x86_64 = 'x86_64'
xtensa = 'xtensa'
xtensaeb = 'xtensaeb'
class ConsoleType1(Enum):
telnet = 'telnet'
vnc = 'vnc'
spice = 'spice'
spice_agent = 'spice+agent'
none = 'none'
class BootPriority(Enum):
c = 'c'
d = 'd'
n = 'n'
cn = 'cn'
cd = 'cd'
dn = 'dn'
dc = 'dc'
nc = 'nc'
nd = 'nd'
class Kvm(Enum):
require = 'require'
allow = 'allow'
disable = 'disable'
class ProcessPriority(Enum):
realtime = 'realtime'
very_high = 'very high'
high = 'high'
normal = 'normal'
low = 'low'
very_low = 'very low'
null = 'null'
class Qemu(BaseModel):
adapter_type: AdapterType = Field(..., title='Type of network adapter')
adapters: int = Field(..., title='Number of adapters')
ram: int = Field(..., title='Ram allocated to the appliance (MB)')
cpus: Optional[int] = Field(None, title='Number of Virtual CPU')
hda_disk_interface: Optional[DiskInterface] = Field(
None, title='Disk interface for the installed hda_disk_image'
)
hdb_disk_interface: Optional[DiskInterface] = Field(
None, title='Disk interface for the installed hdb_disk_image'
)
hdc_disk_interface: Optional[DiskInterface] = Field(
None, title='Disk interface for the installed hdc_disk_image'
)
hdd_disk_interface: Optional[DiskInterface] = Field(
None, title='Disk interface for the installed hdd_disk_image'
)
arch: Arch = Field(..., title='Architecture emulated')
console_type: ConsoleType1 = Field(
..., title='Type of console connection for the administration of the appliance'
)
boot_priority: Optional[BootPriority] = Field(
None,
title='Disk boot priority. Refer to -boot option in qemu manual for more details.',
)
kernel_command_line: Optional[str] = Field(
None, title='Command line parameters send to the kernel'
)
kvm: Kvm = Field(..., title='KVM requirements')
options: Optional[str] = Field(
None, title='Optional additional qemu command line options'
)
cpu_throttling: Optional[confloat(ge=0.0, le=100.0)] = Field(
None, title='Throttle the CPU'
)
process_priority: Optional[ProcessPriority] = Field(
None, title='Process priority for QEMU'
)
class Compression(Enum):
bzip2 = 'bzip2'
gzip = 'gzip'
lzma = 'lzma'
xz = 'xz'
rar = 'rar'
zip = 'zip'
field_7z = '7z'
class Image(BaseModel):
filename: str = Field(..., title='Filename')
version: str = Field(..., title='Version of the file')
md5sum: constr(regex=r'^[a-f0-9]{32}$') = Field(..., title='md5sum of the file')
filesize: int = Field(..., title='File size in bytes')
download_url: Optional[Union[AnyUrl, constr(max_length=0)]] = Field(
None, title='Download url where you can download the appliance from a browser'
)
direct_download_url: Optional[Union[AnyUrl, constr(max_length=0)]] = Field(
None,
title='Optional. Non authenticated url to the image file where you can download the image.',
)
compression: Optional[Compression] = Field(
None, title='Optional, compression type of direct download url image.'
)
class Images(BaseModel):
kernel_image: Optional[str] = Field(None, title='Kernel image')
initrd: Optional[str] = Field(None, title='Initrd disk image')
image: Optional[str] = Field(None, title='OS image')
bios_image: Optional[str] = Field(None, title='Bios image')
hda_disk_image: Optional[str] = Field(None, title='Hda disk image')
hdb_disk_image: Optional[str] = Field(None, title='Hdc disk image')
hdc_disk_image: Optional[str] = Field(None, title='Hdd disk image')
hdd_disk_image: Optional[str] = Field(None, title='Hdd diskimage')
cdrom_image: Optional[str] = Field(None, title='cdrom image')
class Version(BaseModel):
name: str = Field(..., title='Name of the version')
idlepc: Optional[constr(regex=r'^0x[0-9a-f]{8}')] = None
images: Optional[Images] = Field(None, title='Images used for this version')
class DynamipsSlot(Enum):
C7200_IO_2FE = 'C7200-IO-2FE'
C7200_IO_FE = 'C7200-IO-FE'
C7200_IO_GE_E = 'C7200-IO-GE-E'
NM_16ESW = 'NM-16ESW'
NM_1E = 'NM-1E'
NM_1FE_TX = 'NM-1FE-TX'
NM_4E = 'NM-4E'
NM_4T = 'NM-4T'
PA_2FE_TX = 'PA-2FE-TX'
PA_4E = 'PA-4E'
PA_4T_ = 'PA-4T+'
PA_8E = 'PA-8E'
PA_8T = 'PA-8T'
PA_A1 = 'PA-A1'
PA_FE_TX = 'PA-FE-TX'
PA_GE = 'PA-GE'
PA_POS_OC3 = 'PA-POS-OC3'
C2600_MB_2FE = 'C2600-MB-2FE'
C2600_MB_1E = 'C2600-MB-1E'
C1700_MB_1FE = 'C1700-MB-1FE'
C2600_MB_2E = 'C2600-MB-2E'
C2600_MB_1FE = 'C2600-MB-1FE'
C1700_MB_WIC1 = 'C1700-MB-WIC1'
GT96100_FE = 'GT96100-FE'
Leopard_2FE = 'Leopard-2FE'
_ = ''
class DynamipsWic(Enum):
WIC_1ENET = 'WIC-1ENET'
WIC_1T = 'WIC-1T'
WIC_2T = 'WIC-2T'
class Dynamips(BaseModel):
chassis: Optional[Chassis] = Field(None, title='Chassis type')
platform: Platform = Field(..., title='Platform type')
ram: conint(ge=1) = Field(..., title='Amount of ram')
nvram: conint(ge=1) = Field(..., title='Amount of nvram')
startup_config: Optional[str] = Field(None, title='Config loaded at startup')
wic0: Optional[DynamipsWic] = None
wic1: Optional[DynamipsWic] = None
wic2: Optional[DynamipsWic] = None
slot0: Optional[DynamipsSlot] = None
slot1: Optional[DynamipsSlot] = None
slot2: Optional[DynamipsSlot] = None
slot3: Optional[DynamipsSlot] = None
slot4: Optional[DynamipsSlot] = None
slot5: Optional[DynamipsSlot] = None
slot6: Optional[DynamipsSlot] = None
midplane: Optional[Midplane] = None
npe: Optional[Npe] = None
class Appliance(BaseModel):
appliance_id: UUID = Field(..., title='Appliance ID')
name: str = Field(..., title='Appliance name')
category: Category = Field(..., title='Category of the appliance')
description: str = Field(
..., title='Description of the appliance. Could be a marketing description'
)
vendor_name: str = Field(..., title='Name of the vendor')
vendor_url: Union[AnyUrl, constr(max_length=0)] = Field(..., title='Website of the vendor')
documentation_url: Optional[Union[AnyUrl, constr(max_length=0)]] = Field(
None,
title='An optional documentation for using the appliance on vendor website',
)
product_name: str = Field(..., title='Product name')
product_url: Optional[Union[AnyUrl, constr(max_length=0)]] = Field(
None, title='An optional product url on vendor website'
)
registry_version: RegistryVersion = Field(
..., title='Version of the registry compatible with this appliance'
)
status: Status = Field(..., title='Document if the appliance is working or not')
availability: Optional[Availability] = Field(
None,
title='About image availability: can be downloaded directly; download requires a free registration; paid but a trial version (time or feature limited) is available; not available publicly',
)
maintainer: str = Field(..., title='Maintainer name')
maintainer_email: Union[EmailStr, constr(max_length=0)] = Field(..., title='Maintainer email')
usage: Optional[str] = Field(None, title='How to use the appliance')
symbol: Optional[str] = Field(None, title='An optional symbol for the appliance')
first_port_name: Optional[str] = Field(
None, title='Optional name of the first networking port example: eth0'
)
port_name_format: Optional[str] = Field(
None, title='Optional formating of the networking port example: eth{0}'
)
port_segment_size: Optional[int] = Field(
None,
title='Optional port segment size. A port segment is a block of port. For example Ethernet0/0 Ethernet0/1 is the module 0 with a port segment size of 2',
)
linked_clone: Optional[bool] = Field(
None, title="False if you don't want to use a single image for all nodes"
)
docker: Optional[Docker] = Field(None, title='Docker specific options')
iou: Optional[Iou] = Field(None, title='IOU specific options')
dynamips: Optional[Dynamips] = Field(None, title='Dynamips specific options')
qemu: Optional[Qemu] = Field(None, title='Qemu specific options')
images: Optional[List[Image]] = Field(None, title='Images for this appliance')
versions: Optional[List[Version]] = Field(None, title='Versions of the appliance')

View File

@ -33,10 +33,10 @@ class TestApplianceRoutes:
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
assert len(response.json()) > 0 assert len(response.json()) > 0
async def test_appliance_download(self, app: FastAPI, client: AsyncClient) -> None: async def test_get_appliance(self, app: FastAPI, client: AsyncClient) -> None:
appliance_id = "3bf492b6-5717-4257-9bfd-b34617c6f133" # Cisco IOSv appliance appliance_id = "3bf492b6-5717-4257-9bfd-b34617c6f133" # Cisco IOSv appliance
response = await client.get(app.url_path_for("download_appliance", appliance_id=appliance_id)) response = await client.get(app.url_path_for("get_appliance", appliance_id=appliance_id))
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
assert response.json()["appliance_id"] == appliance_id assert response.json()["appliance_id"] == appliance_id