From a8990c9e8945862b67e87fd3e502f5891c491cac Mon Sep 17 00:00:00 2001 From: grossmj Date: Tue, 26 Feb 2019 15:55:07 +0700 Subject: [PATCH] Non blocking project exportation. --- gns3server/controller/export_project.py | 12 +- gns3server/controller/project.py | 9 +- .../api/controller/project_handler.py | 26 +- gns3server/utils/asyncio/aiozipstream.py | 430 ++++++++++++++++++ requirements.txt | 3 +- 5 files changed, 459 insertions(+), 21 deletions(-) create mode 100644 gns3server/utils/asyncio/aiozipstream.py diff --git a/gns3server/controller/export_project.py b/gns3server/controller/export_project.py index 9afcd906..c2641364 100644 --- a/gns3server/controller/export_project.py +++ b/gns3server/controller/export_project.py @@ -22,7 +22,6 @@ import asyncio import aiohttp import zipfile import tempfile -import zipstream from datetime import datetime @@ -30,7 +29,7 @@ import logging log = logging.getLogger(__name__) -async def export_project(project, temporary_dir, include_images=False, keep_compute_id=False, allow_all_nodes=False, reset_mac_addresses=False): +async def export_project(zstream, project, temporary_dir, include_images=False, keep_compute_id=False, allow_all_nodes=False, reset_mac_addresses=False): """ Export a project to a zip file. @@ -53,8 +52,6 @@ async def export_project(project, temporary_dir, include_images=False, keep_comp # Make sure we save the project project.dump() - zstream = zipstream.ZipFile(allowZip64=True) - if not os.path.exists(project._path): raise aiohttp.web.HTTPNotFound(text="Project could not be found at '{}'".format(project._path)) @@ -80,7 +77,7 @@ async def export_project(project, temporary_dir, include_images=False, keep_comp if file.endswith(".gns3"): continue _patch_mtime(path) - zstream.write(path, os.path.relpath(path, project._path), compress_type=zipfile.ZIP_DEFLATED) + zstream.write(path, os.path.relpath(path, project._path)) # Export files from remote computes downloaded_files = set() @@ -103,11 +100,9 @@ async def export_project(project, temporary_dir, include_images=False, keep_comp response.close() f.close() _patch_mtime(temp_path) - zstream.write(temp_path, arcname=compute_file["path"], compress_type=zipfile.ZIP_DEFLATED) + zstream.write(temp_path, arcname=compute_file["path"]) downloaded_files.add(compute_file['path']) - return zstream - def _patch_mtime(path): """ @@ -232,6 +227,7 @@ async def _patch_project_file(project, path, zstream, include_images, keep_compu zstream.writestr("project.gns3", json.dumps(topology).encode()) return images + def _export_local_image(image, zstream): """ Exports a local image to the zip file. diff --git a/gns3server/controller/project.py b/gns3server/controller/project.py index a7407a69..5ca2d30b 100644 --- a/gns3server/controller/project.py +++ b/gns3server/controller/project.py @@ -24,6 +24,7 @@ import shutil import asyncio import aiohttp import tempfile +import zipfile from uuid import UUID, uuid4 @@ -38,6 +39,7 @@ from ..utils.path import check_path_allowed, get_default_project_directory from ..utils.asyncio.pool import Pool from ..utils.asyncio import locking from ..utils.asyncio import wait_run_in_executor +from ..utils.asyncio import aiozipstream from .export_project import export_project from .import_project import import_project @@ -976,9 +978,10 @@ class Project: assert self._status != "closed" try: with tempfile.TemporaryDirectory() as tmpdir: - zipstream = await export_project(self, tmpdir, keep_compute_id=True, allow_all_nodes=True, reset_mac_addresses=True) - project_path = os.path.join(tmpdir, "project.gns3p") - await wait_run_in_executor(self._create_duplicate_project_file, project_path, zipstream) + with aiozipstream.ZipFile(compression=zipfile.ZIP_STORED) as zstream: + zipstream = await export_project(zstream, self, tmpdir, keep_compute_id=True, allow_all_nodes=True, reset_mac_addresses=True) + project_path = os.path.join(tmpdir, "project.gns3p") + await wait_run_in_executor(self._create_duplicate_project_file, project_path, zipstream) with open(project_path, "rb") as f: project = await import_project(self._controller, str(uuid.uuid4()), f, location=location, name=name, keep_compute_id=True) except (ValueError, OSError, UnicodeEncodeError) as e: diff --git a/gns3server/handlers/api/controller/project_handler.py b/gns3server/handlers/api/controller/project_handler.py index 8a1ad4d9..a59802d5 100644 --- a/gns3server/handlers/api/controller/project_handler.py +++ b/gns3server/handlers/api/controller/project_handler.py @@ -20,11 +20,14 @@ import sys import aiohttp import asyncio import tempfile +import zipfile +import time from gns3server.web.route import Route from gns3server.controller import Controller from gns3server.controller.import_project import import_project from gns3server.controller.export_project import export_project +from gns3server.utils.asyncio import aiozipstream from gns3server.config import Config @@ -301,19 +304,24 @@ class ProjectHandler: controller = Controller.instance() project = await controller.get_loaded_project(request.match_info["project_id"]) + try: + begin = time.time() with tempfile.TemporaryDirectory() as tmp_dir: - stream = await export_project(project, tmp_dir, include_images=bool(int(request.query.get("include_images", "0")))) - # We need to do that now because export could failed and raise an HTTP error - # that why response start need to be the later possible - response.content_type = 'application/gns3project' - response.headers['CONTENT-DISPOSITION'] = 'attachment; filename="{}.gns3project"'.format(project.name) - response.enable_chunked_encoding() - await response.prepare(request) + with aiozipstream.ZipFile(compression=zipfile.ZIP_DEFLATED) as zstream: + await export_project(zstream, project, tmp_dir, include_images=bool(int(request.query.get("include_images", "0")))) - for data in stream: - await response.write(data) + # We need to do that now because export could failed and raise an HTTP error + # that why response start need to be the later possible + response.content_type = 'application/octet-stream' + response.headers['CONTENT-DISPOSITION'] = 'attachment; filename="{}.gns3project"'.format(project.name) + response.enable_chunked_encoding() + await response.prepare(request) + async for chunk in zstream: + await response.write(chunk) + + log.info("Project '{}' exported in {:.4f} seconds".format(project.id, time.time() - begin)) #await response.write_eof() #FIXME: shound't be needed anymore # Will be raise if you have no space left or permission issue on your temporary directory # RuntimeError: something was wrong during the zip process diff --git a/gns3server/utils/asyncio/aiozipstream.py b/gns3server/utils/asyncio/aiozipstream.py new file mode 100644 index 00000000..dfcd49ba --- /dev/null +++ b/gns3server/utils/asyncio/aiozipstream.py @@ -0,0 +1,430 @@ +#!/usr/bin/env python +# +# Copyright (C) 2019 GNS3 Technologies Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Iterable ZIP archive generator. + +Derived directly from zipfile.py and the zipstream project +https://github.com/allanlei/python-zipstream +""" + +import os +import sys +import stat +import struct +import time +import zipfile +import asyncio +import aiofiles +from concurrent import futures +from async_generator import async_generator, yield_ + +from zipfile import (structCentralDir, structEndArchive64, structEndArchive, structEndArchive64Locator, + stringCentralDir, stringEndArchive64, stringEndArchive, stringEndArchive64Locator) + +stringDataDescriptor = b'PK\x07\x08' # magic number for data descriptor + + +def _get_compressor(compress_type): + """ + Return the compressor. + """ + + if compress_type == zipfile.ZIP_DEFLATED: + from zipfile import zlib + return zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15) + elif compress_type == zipfile.ZIP_BZIP2: + from zipfile import bz2 + return bz2.BZ2Compressor() + elif compress_type == zipfile.ZIP_LZMA: + from zipfile import LZMACompressor + return LZMACompressor() + else: + return None + + +class PointerIO(object): + + def __init__(self, mode='wb'): + if mode not in ('wb', ): + raise RuntimeError('zipstream.ZipFile() requires mode "wb"') + self.data_pointer = 0 + self.__mode = mode + self.__closed = False + + @property + def mode(self): + return self.__mode + + @property + def closed(self): + return self.__closed + + def close(self): + self.__closed = True + + def flush(self): + pass + + def next(self): + raise NotImplementedError() + + def tell(self): + return self.data_pointer + + def truncate(size=None): + raise NotImplementedError() + + def write(self, data): + if self.closed: + raise ValueError('I/O operation on closed file') + + if isinstance(data, str): + data = data.encode('utf-8') + if not isinstance(data, bytes): + raise TypeError('expected bytes') + self.data_pointer += len(data) + return data + + +class ZipInfo(zipfile.ZipInfo): + + def __init__(self, *args, **kwargs): + zipfile.ZipInfo.__init__(self, *args, **kwargs) + + def DataDescriptor(self): + """ + crc-32 4 bytes + compressed size 4 bytes + uncompressed size 4 bytes + """ + + if self.compress_size > zipfile.ZIP64_LIMIT or self.file_size > zipfile.ZIP64_LIMIT: + fmt = b'<4sLQQ' + else: + fmt = b'<4sLLL' + return struct.pack(fmt, stringDataDescriptor, self.CRC, self.compress_size, self.file_size) + + +class ZipFile(zipfile.ZipFile): + + def __init__(self, fileobj=None, mode='w', compression=zipfile.ZIP_STORED, allowZip64=True, chunksize=32768): + """Open the ZIP file with mode write "w".""" + + if mode not in ('w', ): + raise RuntimeError('aiozipstream.ZipFile() requires mode "w"') + if fileobj is None: + fileobj = PointerIO() + + self._comment = b'' + zipfile.ZipFile.__init__(self, fileobj, mode=mode, compression=compression, allowZip64=allowZip64) + self._chunksize = chunksize + self.paths_to_write = [] + + def __aiter__(self): + return self._stream() + + @property + def comment(self): + """ + The comment text associated with the ZIP file. + """ + + return self._comment + + @comment.setter + def comment(self, comment): + """ + Add a comment text associated with the ZIP file. + """ + + if not isinstance(comment, bytes): + raise TypeError("comment: expected bytes, got %s" % type(comment)) + # check for valid comment length + if len(comment) >= zipfile.ZIP_MAX_COMMENT: + if self.debug: + print('Archive comment is too long; truncating to %d bytes' % zipfile.ZIP_MAX_COMMENT) + comment = comment[:zipfile.ZIP_MAX_COMMENT] + self._comment = comment + self._didModify = True + + @async_generator + async def data_generator(self, path): + + async with aiofiles.open(path, "rb") as f: + while True: + part = await f.read(self._chunksize) + if not part: + break + await yield_(part) + return + + async def _run_in_executor(self, task, *args, **kwargs): + """ + Run synchronous task in separate thread and await for result. + """ + + loop = asyncio.get_event_loop() + return await loop.run_in_executor(futures.ThreadPoolExecutor(max_workers=1), task, *args, **kwargs) + + @async_generator + async def _stream(self): + + for kwargs in self.paths_to_write: + async for chunk in self._write(**kwargs): + await yield_(chunk) + for chunk in self._close(): + await yield_(chunk) + + def write(self, filename, arcname=None, compress_type=None): + """ + Write a file to the archive under the name `arcname`. + """ + + kwargs = {'filename': filename, 'arcname': arcname, 'compress_type': compress_type} + self.paths_to_write.append(kwargs) + + def write_iter(self, arcname, iterable, compress_type=None): + """ + Write the bytes iterable `iterable` to the archive under the name `arcname`. + """ + + kwargs = {'arcname': arcname, 'iterable': iterable, 'compress_type': compress_type} + self.paths_to_write.append(kwargs) + + def writestr(self, arcname, data, compress_type=None): + """ + Writes a str into ZipFile by wrapping data as a generator + """ + + def _iterable(): + yield data + return self.write_iter(arcname, _iterable(), compress_type=compress_type) + + @async_generator + async def _write(self, filename=None, iterable=None, arcname=None, compress_type=None): + """ + Put the bytes from filename into the archive under the name `arcname`. + """ + + if not self.fp: + raise RuntimeError( + "Attempt to write to ZIP archive that was already closed") + if (filename is None and iterable is None) or (filename is not None and iterable is not None): + raise ValueError("either (exclusively) filename or iterable shall be not None") + + if filename: + st = os.stat(filename) + isdir = stat.S_ISDIR(st.st_mode) + mtime = time.localtime(st.st_mtime) + date_time = mtime[0:6] + else: + st, isdir, date_time = None, False, time.localtime()[0:6] + # Create ZipInfo instance to store file information + if arcname is None: + arcname = filename + arcname = os.path.normpath(os.path.splitdrive(arcname)[1]) + while arcname[0] in (os.sep, os.altsep): + arcname = arcname[1:] + if isdir: + arcname += '/' + zinfo = ZipInfo(arcname, date_time) + if st: + zinfo.external_attr = (st[0] & 0xFFFF) << 16 # Unix attributes + else: + zinfo.external_attr = 0o600 << 16 # ?rw------- + if compress_type is None: + zinfo.compress_type = self.compression + else: + zinfo.compress_type = compress_type + + if st: + zinfo.file_size = st[6] + else: + zinfo.file_size = 0 + zinfo.flag_bits = 0x00 + zinfo.flag_bits |= 0x08 # ZIP flag bits, bit 3 indicates presence of data descriptor + zinfo.header_offset = self.fp.tell() # Start of header bytes + if zinfo.compress_type == zipfile.ZIP_LZMA: + # Compressed data includes an end-of-stream (EOS) marker + zinfo.flag_bits |= 0x02 + + self._writecheck(zinfo) + self._didModify = True + + if isdir: + zinfo.file_size = 0 + zinfo.compress_size = 0 + zinfo.CRC = 0 + self.filelist.append(zinfo) + self.NameToInfo[zinfo.filename] = zinfo + await yield_(self.fp.write(zinfo.FileHeader(False))) + return + + cmpr = _get_compressor(zinfo.compress_type) + + # Must overwrite CRC and sizes with correct data later + zinfo.CRC = CRC = 0 + zinfo.compress_size = compress_size = 0 + # Compressed size can be larger than uncompressed size + zip64 = self._allowZip64 and zinfo.file_size * 1.05 > zipfile.ZIP64_LIMIT + await yield_(self.fp.write(zinfo.FileHeader(zip64))) + + file_size = 0 + if filename: + async for buf in self.data_generator(filename): + file_size = file_size + len(buf) + CRC = zipfile.crc32(buf, CRC) & 0xffffffff + if cmpr: + buf = await self._run_in_executor(cmpr.compress, buf) + compress_size = compress_size + len(buf) + await yield_(self.fp.write(buf)) + else: # we have an iterable + for buf in iterable: + file_size = file_size + len(buf) + CRC = zipfile.crc32(buf, CRC) & 0xffffffff + if cmpr: + buf = await self._run_in_executor(cmpr.compress, buf) + compress_size = compress_size + len(buf) + await yield_(self.fp.write(buf)) + + if cmpr: + buf = cmpr.flush() + compress_size = compress_size + len(buf) + await yield_(self.fp.write(buf)) + zinfo.compress_size = compress_size + else: + zinfo.compress_size = file_size + zinfo.CRC = CRC + zinfo.file_size = file_size + if not zip64 and self._allowZip64: + if file_size > zipfile.ZIP64_LIMIT: + raise RuntimeError('File size has increased during compressing') + if compress_size > zipfile.ZIP64_LIMIT: + raise RuntimeError('Compressed size larger than uncompressed size') + + await yield_(self.fp.write(zinfo.DataDescriptor())) + self.filelist.append(zinfo) + self.NameToInfo[zinfo.filename] = zinfo + + def _close(self): + """ + Close the file, and for mode "w" write the ending records. + """ + + if self.fp is None: + return + + try: + if self.mode in ('w', 'a') and self._didModify: # write ending records + count = 0 + pos1 = self.fp.tell() + for zinfo in self.filelist: # write central directory + count = count + 1 + dt = zinfo.date_time + dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2] + dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2) + extra = [] + if zinfo.file_size > zipfile.ZIP64_LIMIT or zinfo.compress_size > zipfile.ZIP64_LIMIT: + extra.append(zinfo.file_size) + extra.append(zinfo.compress_size) + file_size = 0xffffffff + compress_size = 0xffffffff + else: + file_size = zinfo.file_size + compress_size = zinfo.compress_size + + if zinfo.header_offset > zipfile.ZIP64_LIMIT: + extra.append(zinfo.header_offset) + header_offset = 0xffffffff + else: + header_offset = zinfo.header_offset + + extra_data = zinfo.extra + min_version = 0 + if extra: + # Append a ZIP64 field to the extra's + extra_data = struct.pack( + b'= zipfile.ZIP_FILECOUNT_LIMIT or + centDirOffset > zipfile.ZIP64_LIMIT or + centDirSize > zipfile.ZIP64_LIMIT): + # Need to write the ZIP64 end-of-archive records + zip64endrec = struct.pack( + structEndArchive64, stringEndArchive64, + 44, 45, 45, 0, 0, centDirCount, centDirCount, + centDirSize, centDirOffset) + yield self.fp.write(zip64endrec) + + zip64locrec = struct.pack( + structEndArchive64Locator, + stringEndArchive64Locator, 0, pos2, 1) + yield self.fp.write(zip64locrec) + centDirCount = min(centDirCount, 0xFFFF) + centDirSize = min(centDirSize, 0xFFFFFFFF) + centDirOffset = min(centDirOffset, 0xFFFFFFFF) + + endrec = struct.pack(structEndArchive, stringEndArchive, + 0, 0, centDirCount, centDirCount, + centDirSize, centDirOffset, len(self._comment)) + yield self.fp.write(endrec) + yield self.fp.write(self._comment) + self.fp.flush() + finally: + fp = self.fp + self.fp = None + if not self._filePassed: + fp.close() diff --git a/requirements.txt b/requirements.txt index 9b6b5904..36bb8b4a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,11 @@ jsonschema>=2.4.0 aiohttp==3.5.4 aiohttp-cors==0.7.0 +aiofiles==0.4.0 +async_generator>=1.10 Jinja2>=2.7.3 raven>=5.23.0 psutil>=3.0.0 -zipstream>=1.1.4 prompt-toolkit==1.0.15 async-timeout==3.0.1 distro>=1.3.0