address formatting from black 21.4b0 (#831)

This commit is contained in:
bmc-msft
2021-04-26 12:35:16 -04:00
committed by GitHub
parent b5db1bd3fe
commit cf3d904940
23 changed files with 128 additions and 128 deletions

View File

@ -165,7 +165,7 @@ class Endpoint:
class Files(Endpoint):
""" Interact with files within a container """
"""Interact with files within a container"""
endpoint = "files"
@ -177,19 +177,19 @@ class Files(Endpoint):
def list(
self, container: primitives.Container, prefix: Optional[str] = None
) -> models.Files:
""" Get a list of files in a container """
"""Get a list of files in a container"""
self.logger.debug("listing files in container: %s", container)
client = self._get_client(container)
return models.Files(files=client.list_blobs(name_starts_with=prefix))
def delete(self, container: primitives.Container, filename: str) -> None:
""" delete a file from a container """
"""delete a file from a container"""
self.logger.debug("deleting in container: %s:%s", container, filename)
client = self._get_client(container)
client.delete_blob(filename)
def get(self, container: primitives.Container, filename: str) -> bytes:
""" get a file from a container """
"""get a file from a container"""
self.logger.debug("getting file from container: %s:%s", container, filename)
client = self._get_client(container)
downloaded = client.download_blob(filename)
@ -201,7 +201,7 @@ class Files(Endpoint):
file_path: str,
blob_name: Optional[str] = None,
) -> None:
""" uploads a file to a container """
"""uploads a file to a container"""
if not blob_name:
# Default blob name to file basename. This means that the file data will be
# written to the "root" of the container, if simulating a directory tree.
@ -220,7 +220,7 @@ class Files(Endpoint):
def upload_dir(
self, container: primitives.Container, dir_path: primitives.Directory
) -> None:
""" uploads a directory to a container """
"""uploads a directory to a container"""
self.logger.debug("uploading directory to container %s:%s", container, dir_path)
@ -230,7 +230,7 @@ class Files(Endpoint):
def download_dir(
self, container: primitives.Container, dir_path: primitives.Directory
) -> None:
""" downloads a container to a directory """
"""downloads a container to a directory"""
self.logger.debug(
"downloading container to directory %s:%s", container, dir_path
@ -241,10 +241,10 @@ class Files(Endpoint):
class Versions(Endpoint):
""" Onefuzz Instance """
"""Onefuzz Instance"""
def check(self, exact: bool = False) -> str:
""" Compare API and CLI versions for compatibility """
"""Compare API and CLI versions for compatibility"""
versions = self.onefuzz.info.get().versions
api_str = versions["onefuzz"].version
cli_str = __version__
@ -279,23 +279,23 @@ class Versions(Endpoint):
class Info(Endpoint):
""" Information about the OneFuzz instance """
"""Information about the OneFuzz instance"""
endpoint = "info"
def get(self) -> responses.Info:
""" Get information about the OneFuzz instance """
"""Get information about the OneFuzz instance"""
self.logger.debug("getting info")
return self._req_model("GET", responses.Info)
class Webhooks(Endpoint):
""" Interact with Webhooks """
"""Interact with Webhooks"""
endpoint = "webhooks"
def get(self, webhook_id: UUID_EXPANSION) -> webhooks.Webhook:
""" get a webhook """
"""get a webhook"""
webhook_id_expanded = self._disambiguate_uuid(
"webhook_id", webhook_id, lambda: [str(x.webhook_id) for x in self.list()]
@ -309,7 +309,7 @@ class Webhooks(Endpoint):
)
def list(self) -> List[webhooks.Webhook]:
""" list webhooks """
"""list webhooks"""
self.logger.debug("listing webhooks")
return self._req_model_list(
@ -326,7 +326,7 @@ class Webhooks(Endpoint):
*,
secret_token: Optional[str] = None,
) -> webhooks.Webhook:
""" Create a webhook """
"""Create a webhook"""
self.logger.debug("creating webhook. name: %s", name)
return self._req_model(
"POST",
@ -345,7 +345,7 @@ class Webhooks(Endpoint):
event_types: Optional[List[events.EventType]] = None,
secret_token: Optional[str] = None,
) -> webhooks.Webhook:
""" Update a webhook """
"""Update a webhook"""
webhook_id_expanded = self._disambiguate_uuid(
"webhook_id", webhook_id, lambda: [str(x.webhook_id) for x in self.list()]
@ -365,7 +365,7 @@ class Webhooks(Endpoint):
)
def delete(self, webhook_id: UUID_EXPANSION) -> responses.BoolResult:
""" Delete a webhook """
"""Delete a webhook"""
webhook_id_expanded = self._disambiguate_uuid(
"webhook_id", webhook_id, lambda: [str(x.webhook_id) for x in self.list()]
@ -378,7 +378,7 @@ class Webhooks(Endpoint):
)
def ping(self, webhook_id: UUID_EXPANSION) -> events.EventPing:
""" ping a webhook """
"""ping a webhook"""
webhook_id_expanded = self._disambiguate_uuid(
"webhook_id", webhook_id, lambda: [str(x.webhook_id) for x in self.list()]
@ -393,7 +393,7 @@ class Webhooks(Endpoint):
)
def logs(self, webhook_id: UUID_EXPANSION) -> List[webhooks.WebhookMessageLog]:
""" retreive webhook event log """
"""retreive webhook event log"""
webhook_id_expanded = self._disambiguate_uuid(
"webhook_id", webhook_id, lambda: [str(x.webhook_id) for x in self.list()]
@ -409,7 +409,7 @@ class Webhooks(Endpoint):
class Containers(Endpoint):
""" Interact with Onefuzz containers """
"""Interact with Onefuzz containers"""
endpoint = "containers"
@ -418,7 +418,7 @@ class Containers(Endpoint):
self.files = Files(onefuzz)
def get(self, name: str) -> responses.ContainerInfo:
""" Get a fully qualified SAS URL for a container """
"""Get a fully qualified SAS URL for a container"""
self.logger.debug("get container: %s", name)
return self._req_model(
"GET", responses.ContainerInfo, data=requests.ContainerGet(name=name)
@ -427,7 +427,7 @@ class Containers(Endpoint):
def create(
self, name: str, metadata: Optional[Dict[str, str]] = None
) -> responses.ContainerInfo:
""" Create a storage container """
"""Create a storage container"""
self.logger.debug("create container: %s", name)
return self._req_model(
"POST",
@ -436,14 +436,14 @@ class Containers(Endpoint):
)
def delete(self, name: str) -> responses.BoolResult:
""" Delete a storage container """
"""Delete a storage container"""
self.logger.debug("delete container: %s", name)
return self._req_model(
"DELETE", responses.BoolResult, data=requests.ContainerDelete(name=name)
)
def list(self) -> List[responses.ContainerInfoBase]:
""" Get a list of containers """
"""Get a list of containers"""
self.logger.debug("list containers")
return self._req_model_list("GET", responses.ContainerInfoBase)
@ -480,12 +480,12 @@ class Containers(Endpoint):
class Repro(Endpoint):
""" Interact with Reproduction VMs """
"""Interact with Reproduction VMs"""
endpoint = "repro_vms"
def get(self, vm_id: UUID_EXPANSION) -> models.Repro:
""" get information about a Reproduction VM """
"""get information about a Reproduction VM"""
vm_id_expanded = self._disambiguate_uuid(
"vm_id", vm_id, lambda: [str(x.vm_id) for x in self.list()]
)
@ -498,7 +498,7 @@ class Repro(Endpoint):
def create(
self, container: primitives.Container, path: str, duration: int = 24
) -> models.Repro:
""" Create a Reproduction VM from a Crash Report """
"""Create a Reproduction VM from a Crash Report"""
self.logger.info(
"creating repro vm: %s %s (%d hours)", container, path, duration
)
@ -509,7 +509,7 @@ class Repro(Endpoint):
)
def delete(self, vm_id: UUID_EXPANSION) -> models.Repro:
""" Delete a Reproduction VM """
"""Delete a Reproduction VM"""
vm_id_expanded = self._disambiguate_uuid(
"vm_id", vm_id, lambda: [str(x.vm_id) for x in self.list()]
)
@ -520,14 +520,14 @@ class Repro(Endpoint):
)
def list(self) -> List[models.Repro]:
""" List all VMs """
"""List all VMs"""
self.logger.debug("listing repro vms")
return self._req_model_list("GET", models.Repro, data=requests.ReproGet())
def _dbg_linux(
self, repro: models.Repro, debug_command: Optional[str]
) -> Optional[str]:
""" Launch gdb with GDB script that includes 'target remote | ssh ...' """
"""Launch gdb with GDB script that includes 'target remote | ssh ...'"""
if (
repro.auth is None
@ -574,7 +574,7 @@ class Repro(Endpoint):
def _dbg_windows(
self, repro: models.Repro, debug_command: Optional[str]
) -> Optional[str]:
""" Setup an SSH tunnel, then connect via CDB over SSH tunnel """
"""Setup an SSH tunnel, then connect via CDB over SSH tunnel"""
if (
repro.auth is None
@ -618,7 +618,7 @@ class Repro(Endpoint):
delete_after_use: bool = False,
debug_command: Optional[str] = None,
) -> Optional[str]:
""" Connect to an existing Reproduction VM """
"""Connect to an existing Reproduction VM"""
self.logger.info("connecting to reproduction VM: %s", vm_id)
@ -678,7 +678,7 @@ class Repro(Endpoint):
delete_after_use: bool = False,
debug_command: Optional[str] = None,
) -> Optional[str]:
""" Create and connect to a Reproduction VM """
"""Create and connect to a Reproduction VM"""
repro = self.create(container, path, duration=duration)
return self.connect(
repro.vm_id, delete_after_use=delete_after_use, debug_command=debug_command
@ -686,14 +686,14 @@ class Repro(Endpoint):
class Notifications(Endpoint):
""" Interact with models.Notifications """
"""Interact with models.Notifications"""
endpoint = "notifications"
def create(
self, container: primitives.Container, config: models.NotificationConfig
) -> models.Notification:
""" Create a notification based on a config file """
"""Create a notification based on a config file"""
config = requests.NotificationCreate(container=container, config=config.config)
return self._req_model("POST", models.Notification, data=config)
@ -701,7 +701,7 @@ class Notifications(Endpoint):
def create_teams(
self, container: primitives.Container, url: str
) -> models.Notification:
""" Create a Teams notification integration """
"""Create a Teams notification integration"""
self.logger.debug("create teams notification integration: %s", container)
@ -723,7 +723,7 @@ class Notifications(Endpoint):
on_dup_set_state: Optional[Dict[str, str]] = None,
on_dup_fields: Optional[Dict[str, str]] = None,
) -> models.Notification:
""" Create an Azure DevOps notification integration """
"""Create an Azure DevOps notification integration"""
self.logger.debug("create ado notification integration: %s", container)
@ -747,7 +747,7 @@ class Notifications(Endpoint):
return self.create(container, entry)
def delete(self, notification_id: UUID_EXPANSION) -> models.Notification:
""" Delete a notification integration """
"""Delete a notification integration"""
notification_id_expanded = self._disambiguate_uuid(
"notification_id",
@ -766,19 +766,19 @@ class Notifications(Endpoint):
)
def list(self) -> List[models.Notification]:
""" List notification integrations """
"""List notification integrations"""
self.logger.debug("listing notification integrations")
return self._req_model_list("GET", models.Notification)
class Tasks(Endpoint):
""" Interact with tasks """
"""Interact with tasks"""
endpoint = "tasks"
def delete(self, task_id: UUID_EXPANSION) -> models.Task:
""" Stop an individual task """
"""Stop an individual task"""
task_id_expanded = self._disambiguate_uuid(
"task_id", task_id, lambda: [str(x.task_id) for x in self.list()]
@ -791,7 +791,7 @@ class Tasks(Endpoint):
)
def get(self, task_id: UUID_EXPANSION) -> models.Task:
""" Get information about a task """
"""Get information about a task"""
task_id_expanded = self._disambiguate_uuid(
"task_id", task_id, lambda: [str(x.task_id) for x in self.list()]
)
@ -803,7 +803,7 @@ class Tasks(Endpoint):
)
def create_with_config(self, config: models.TaskConfig) -> models.Task:
""" Create a Task using TaskConfig """
"""Create a Task using TaskConfig"""
return self._req_model("POST", models.Task, data=config)
@ -924,7 +924,7 @@ class Tasks(Endpoint):
job_id: Optional[UUID_EXPANSION] = None,
state: Optional[List[enums.TaskState]] = enums.TaskState.available(),
) -> List[models.Task]:
""" Get information about all tasks """
"""Get information about all tasks"""
self.logger.debug("list tasks")
job_id_expanded: Optional[UUID] = None
@ -943,7 +943,7 @@ class Tasks(Endpoint):
class JobContainers(Endpoint):
""" Interact with Containers used within tasks in a Job """
"""Interact with Containers used within tasks in a Job"""
endpoint = "jobs"
@ -972,17 +972,17 @@ class JobContainers(Endpoint):
class JobTasks(Endpoint):
""" Interact with tasks within a job """
"""Interact with tasks within a job"""
endpoint = "jobs"
def list(self, job_id: UUID_EXPANSION) -> List[models.Task]:
""" List all of the tasks for a given job """
"""List all of the tasks for a given job"""
return self.onefuzz.tasks.list(job_id=job_id, state=[])
class Jobs(Endpoint):
""" Interact with Jobs """
"""Interact with Jobs"""
endpoint = "jobs"
@ -993,7 +993,7 @@ class Jobs(Endpoint):
def delete(self, job_id: UUID_EXPANSION) -> models.Job:
""" Stop a job and all tasks that make up a job """
"""Stop a job and all tasks that make up a job"""
job_id_expanded = self._disambiguate_uuid(
"job_id", job_id, lambda: [str(x.job_id) for x in self.list()]
)
@ -1004,7 +1004,7 @@ class Jobs(Endpoint):
)
def get(self, job_id: UUID_EXPANSION) -> models.Job:
""" Get information about a specific job """
"""Get information about a specific job"""
job_id_expanded = self._disambiguate_uuid(
"job_id", job_id, lambda: [str(x.job_id) for x in self.list()]
)
@ -1015,7 +1015,7 @@ class Jobs(Endpoint):
return job
def create_with_config(self, config: models.JobConfig) -> models.Job:
""" Create a job """
"""Create a job"""
self.logger.debug(
"create job: project:%s name:%s build:%s",
config.project,
@ -1031,7 +1031,7 @@ class Jobs(Endpoint):
def create(
self, project: str, name: str, build: str, duration: int = 24
) -> models.Job:
""" Create a job """
"""Create a job"""
return self.create_with_config(
models.JobConfig(project=project, name=name, build=build, duration=duration)
)
@ -1040,7 +1040,7 @@ class Jobs(Endpoint):
self,
job_state: Optional[List[enums.JobState]] = enums.JobState.available(),
) -> List[models.Job]:
""" Get information about all jobs """
"""Get information about all jobs"""
self.logger.debug("list jobs")
return self._req_model_list(
"GET", models.Job, data=requests.JobSearch(state=job_state)
@ -1048,7 +1048,7 @@ class Jobs(Endpoint):
class Pool(Endpoint):
""" Interact with worker pools """
"""Interact with worker pools"""
endpoint = "pool"
@ -1078,7 +1078,7 @@ class Pool(Endpoint):
)
def get_config(self, pool_name: primitives.PoolName) -> models.AgentConfig:
""" Get the agent configuration for the pool """
"""Get the agent configuration for the pool"""
pool = self.get(pool_name)
@ -1125,7 +1125,7 @@ class Pool(Endpoint):
class Node(Endpoint):
""" Interact with nodes """
"""Interact with nodes"""
endpoint = "node"
@ -1248,7 +1248,7 @@ class Node(Endpoint):
class Scaleset(Endpoint):
""" Interact with managed scaleset pools """
"""Interact with managed scaleset pools"""
endpoint = "scaleset"
@ -1373,7 +1373,7 @@ class Scaleset(Endpoint):
class ScalesetProxy(Endpoint):
""" Interact with Scaleset Proxies (NOTE: This API is unstable) """
"""Interact with Scaleset Proxies (NOTE: This API is unstable)"""
endpoint = "proxy"
@ -1384,7 +1384,7 @@ class ScalesetProxy(Endpoint):
*,
dst_port: Optional[int] = None,
) -> responses.BoolResult:
""" Stop a proxy node """
"""Stop a proxy node"""
(
scaleset,
@ -1408,7 +1408,7 @@ class ScalesetProxy(Endpoint):
)
def reset(self, region: primitives.Region) -> responses.BoolResult:
""" Reset the proxy for an existing region """
"""Reset the proxy for an existing region"""
return self._req_model(
"PATCH", responses.BoolResult, data=requests.ProxyReset(region=region)
@ -1417,7 +1417,7 @@ class ScalesetProxy(Endpoint):
def get(
self, scaleset_id: UUID_EXPANSION, machine_id: UUID_EXPANSION, dst_port: int
) -> responses.ProxyGetResult:
""" Get information about a specific job """
"""Get information about a specific job"""
(
scaleset,
machine_id_expanded,
@ -1445,7 +1445,7 @@ class ScalesetProxy(Endpoint):
*,
duration: Optional[int] = 1,
) -> responses.ProxyGetResult:
""" Create a proxy """
"""Create a proxy"""
(
scaleset,
machine_id_expanded,
@ -1533,27 +1533,27 @@ class Onefuzz:
self.job_templates._load_cache()
def licenses(self) -> object:
""" Return third-party licenses used by this package """
"""Return third-party licenses used by this package"""
data = pkgutil.get_data("onefuzz", "data/licenses.json")
if data is None:
raise Exception("missing licenses.json")
return json.loads(data)
def privacy_statement(self) -> bytes:
""" Return OneFuzz privacy statement """
"""Return OneFuzz privacy statement"""
data = pkgutil.get_data("onefuzz", "data/privacy.txt")
if data is None:
raise Exception("missing licenses.json")
return data
def logout(self) -> None:
""" Logout of Onefuzz """
"""Logout of Onefuzz"""
self.logger.debug("logout")
self._backend.logout()
def login(self) -> str:
""" Login to Onefuzz """
"""Login to Onefuzz"""
# Rather than interacting MSAL directly, call a simple API which
# actuates the login process
@ -1573,7 +1573,7 @@ class Onefuzz:
enable_feature: Optional[PreviewFeature] = None,
tenant_domain: Optional[str] = None,
) -> BackendConfig:
""" Configure onefuzz CLI """
"""Configure onefuzz CLI"""
self.logger.debug("set config")
if endpoint is not None: