Add azcopy upload support and switch to the default max_concurrency (#1556)

* Add azcopy copy support and switch to the default max_concurrency

* Modify upload_file_data to use upload_file for consistency

* Fix lint issues

* Apply black formatting rules

* Apply isort formatting rules

Co-authored-by: Stas <stishkin@live.com>
This commit is contained in:
Peleus Uhley
2022-06-30 11:30:52 -07:00
committed by GitHub
parent ed69cb48b5
commit fb0e055ca9
2 changed files with 38 additions and 8 deletions

View File

@ -21,9 +21,18 @@ def find_azcopy() -> str:
def azcopy_sync(src: str, dst: str) -> None: def azcopy_sync(src: str, dst: str) -> None:
"""Expose azcopy for uploading/downloading files""" """Expose azcopy for syncing existing files"""
azcopy = find_azcopy() azcopy = find_azcopy()
# security note: callers need to understand the src/dst for this. # security note: callers need to understand the src/dst for this.
subprocess.check_output([azcopy, "sync", src, dst, "--recursive=true"]) # nosec subprocess.check_output([azcopy, "sync", src, dst, "--recursive=true"]) # nosec
def azcopy_copy(src: str, dst: str) -> None:
"""Expose azcopy for uploading/downloading files"""
azcopy = find_azcopy()
# security note: callers need to understand the src/dst for this.
subprocess.check_output([azcopy, "cp", src, dst, "--recursive=true"]) # nosec

View File

@ -9,6 +9,7 @@ import json
import logging import logging
import os import os
import sys import sys
import tempfile
import time import time
from dataclasses import asdict, is_dataclass from dataclasses import asdict, is_dataclass
from enum import Enum from enum import Enum
@ -36,7 +37,7 @@ from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_attempt from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_random from tenacity.wait import wait_random
from .azcopy import azcopy_sync from .azcopy import azcopy_copy, azcopy_sync
_ACCESSTOKENCACHE_UMASK = 0o077 _ACCESSTOKENCACHE_UMASK = 0o077
@ -364,16 +365,36 @@ class ContainerWrapper:
reraise=True, reraise=True,
) )
def upload_file(self, file_path: str, blob_name: str) -> None: def upload_file(self, file_path: str, blob_name: str) -> None:
with open(file_path, "rb") as handle: try:
self.client.upload_blob( # Split the container URL to insert the blob_name
name=blob_name, data=handle, overwrite=True, max_concurrency=10 url_parts = self.container_url.split("?", 1)
# Default to azcopy if it is installed
azcopy_copy(file_path, url_parts[0] + "/" + blob_name + "?" + url_parts[1])
except Exception as exc:
# A subprocess exception would typically only contain the exit status.
LOGGER.warning(
"Upload using azcopy failed. Check the azcopy logs for more information."
) )
LOGGER.warning(exc)
# Indicate the switch in the approach for clarity in debugging
LOGGER.warning("Now attempting to upload using the Python SDK...")
# This does not have a try/except since it should be caught by the retry system.
# The retry system will always attempt azcopy first and this approach second
with open(file_path, "rb") as handle:
# Using the Azure SDK default max_concurrency
self.client.upload_blob(name=blob_name, data=handle, overwrite=True)
return None return None
def upload_file_data(self, data: str, blob_name: str) -> None: def upload_file_data(self, data: str, blob_name: str) -> None:
self.client.upload_blob( with tempfile.TemporaryDirectory() as tmpdir:
name=blob_name, data=data, overwrite=True, max_concurrency=10 filename = os.path.join(tmpdir, blob_name)
)
with open(filename, "w") as handle:
handle.write(data)
self.upload_file(filename, blob_name)
def upload_dir(self, dir_path: str) -> None: def upload_dir(self, dir_path: str) -> None:
# security note: the src for azcopy comes from the server which is # security note: the src for azcopy comes from the server which is