mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-15 19:38:11 +00:00
address multiple issues found by pylint (#206)
This commit is contained in:
@ -59,7 +59,7 @@ def is_authorized(token_data: TokenData) -> bool:
|
|||||||
|
|
||||||
|
|
||||||
def verify_token(
|
def verify_token(
|
||||||
req: func.HttpRequest, func: Callable[[func.HttpRequest], func.HttpResponse]
|
req: func.HttpRequest, method: Callable[[func.HttpRequest], func.HttpResponse]
|
||||||
) -> func.HttpResponse:
|
) -> func.HttpResponse:
|
||||||
token = try_get_token_auth_header(req)
|
token = try_get_token_auth_header(req)
|
||||||
|
|
||||||
@ -79,4 +79,4 @@ def verify_token(
|
|||||||
context="token verification",
|
context="token verification",
|
||||||
)
|
)
|
||||||
|
|
||||||
return func(req)
|
return method(req)
|
||||||
|
@ -25,8 +25,10 @@ def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None:
|
|||||||
|
|
||||||
max_size = min(scaleset.max_size(), autoscale_config.scaleset_size)
|
max_size = min(scaleset.max_size(), autoscale_config.scaleset_size)
|
||||||
logging.info(
|
logging.info(
|
||||||
"scaleset:%s size:%d max_size:%d"
|
"scaleset:%s size:%d max_size:%d",
|
||||||
% (scaleset.scaleset_id, scaleset.size, max_size)
|
scaleset.scaleset_id,
|
||||||
|
scaleset.size,
|
||||||
|
max_size,
|
||||||
)
|
)
|
||||||
if scaleset.size < max_size:
|
if scaleset.size < max_size:
|
||||||
current_size = scaleset.size
|
current_size = scaleset.size
|
||||||
@ -54,7 +56,7 @@ def scale_up(pool: Pool, scalesets: List[Scaleset], nodes_needed: int) -> None:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
logging.info("Creating Scaleset for Pool %s" % (pool.name))
|
logging.info("Creating Scaleset for Pool %s", pool.name)
|
||||||
max_nodes_scaleset = min(
|
max_nodes_scaleset = min(
|
||||||
Scaleset.scaleset_max_size(autoscale_config.image),
|
Scaleset.scaleset_max_size(autoscale_config.image),
|
||||||
autoscale_config.scaleset_size,
|
autoscale_config.scaleset_size,
|
||||||
@ -97,9 +99,7 @@ def scale_down(scalesets: List[Scaleset], nodes_to_remove: int) -> None:
|
|||||||
for node in free_nodes:
|
for node in free_nodes:
|
||||||
if not node.delete_requested:
|
if not node.delete_requested:
|
||||||
nodes.append(node)
|
nodes.append(node)
|
||||||
logging.info(
|
logging.info("Scaleset: %s, #Free Nodes: %s", scaleset.scaleset_id, len(nodes))
|
||||||
"Scaleset: %s, #Free Nodes: %s" % (scaleset.scaleset_id, len(nodes))
|
|
||||||
)
|
|
||||||
|
|
||||||
if nodes and nodes_to_remove > 0:
|
if nodes and nodes_to_remove > 0:
|
||||||
max_nodes_remove = min(len(nodes), nodes_to_remove)
|
max_nodes_remove = min(len(nodes), nodes_to_remove)
|
||||||
@ -134,13 +134,13 @@ def get_vm_count(tasks: List[Task]) -> int:
|
|||||||
|
|
||||||
|
|
||||||
def autoscale_pool(pool: Pool) -> None:
|
def autoscale_pool(pool: Pool) -> None:
|
||||||
logging.info("autoscale: %s" % (pool.autoscale))
|
logging.info("autoscale: %s", pool.autoscale)
|
||||||
if not pool.autoscale:
|
if not pool.autoscale:
|
||||||
return
|
return
|
||||||
|
|
||||||
# get all the tasks (count not stopped) for the pool
|
# get all the tasks (count not stopped) for the pool
|
||||||
tasks = Task.get_tasks_by_pool_name(pool.name)
|
tasks = Task.get_tasks_by_pool_name(pool.name)
|
||||||
logging.info("Pool: %s, #Tasks %d" % (pool.name, len(tasks)))
|
logging.info("Pool: %s, #Tasks %d", pool.name, len(tasks))
|
||||||
|
|
||||||
num_of_tasks = get_vm_count(tasks)
|
num_of_tasks = get_vm_count(tasks)
|
||||||
nodes_needed = max(num_of_tasks, pool.autoscale.min_size)
|
nodes_needed = max(num_of_tasks, pool.autoscale.min_size)
|
||||||
@ -160,7 +160,7 @@ def autoscale_pool(pool: Pool) -> None:
|
|||||||
if pool_resize:
|
if pool_resize:
|
||||||
return
|
return
|
||||||
|
|
||||||
logging.info("Pool: %s, #Nodes Needed: %d" % (pool.name, nodes_needed))
|
logging.info("Pool: %s, #Nodes Needed: %d", pool.name, nodes_needed)
|
||||||
if nodes_needed > 0:
|
if nodes_needed > 0:
|
||||||
# resizing scaleset or creating new scaleset.
|
# resizing scaleset or creating new scaleset.
|
||||||
scale_up(pool, scalesets, nodes_needed)
|
scale_up(pool, scalesets, nodes_needed)
|
||||||
|
@ -94,7 +94,7 @@ def clear_queue(name: QueueNameType, *, account_id: str) -> None:
|
|||||||
try:
|
try:
|
||||||
queue.clear_messages()
|
queue.clear_messages()
|
||||||
except ResourceNotFoundError:
|
except ResourceNotFoundError:
|
||||||
return None
|
pass
|
||||||
|
|
||||||
|
|
||||||
def send_message(
|
def send_message(
|
||||||
|
@ -39,7 +39,6 @@ def delete_subnet(resource_group: str, name: str) -> Union[None, CloudError, Any
|
|||||||
"subnet delete failed: %s %s : %s", resource_group, name, repr(err)
|
"subnet delete failed: %s %s : %s", resource_group, name, repr(err)
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
else:
|
|
||||||
raise err
|
raise err
|
||||||
|
|
||||||
|
|
||||||
|
@ -119,7 +119,7 @@ def get_extension(vm_name: str, extension_name: str) -> Optional[Any]:
|
|||||||
resource_group = get_base_resource_group()
|
resource_group = get_base_resource_group()
|
||||||
|
|
||||||
logging.debug(
|
logging.debug(
|
||||||
"getting extension: %s:%s:%s - %s",
|
"getting extension: %s:%s:%s",
|
||||||
resource_group,
|
resource_group,
|
||||||
vm_name,
|
vm_name,
|
||||||
extension_name,
|
extension_name,
|
||||||
|
@ -28,19 +28,19 @@ from .reports import get_report
|
|||||||
# return commands
|
# return commands
|
||||||
|
|
||||||
|
|
||||||
def generic_extensions(region: Region, os: OS) -> List[Extension]:
|
def generic_extensions(region: Region, vm_os: OS) -> List[Extension]:
|
||||||
extensions = [monitor_extension(region, os)]
|
extensions = [monitor_extension(region, vm_os)]
|
||||||
depedency = dependency_extension(region, os)
|
depedency = dependency_extension(region, vm_os)
|
||||||
if depedency:
|
if depedency:
|
||||||
extensions.append(depedency)
|
extensions.append(depedency)
|
||||||
|
|
||||||
return extensions
|
return extensions
|
||||||
|
|
||||||
|
|
||||||
def monitor_extension(region: Region, os: OS) -> Extension:
|
def monitor_extension(region: Region, vm_os: OS) -> Extension:
|
||||||
settings = get_monitor_settings()
|
settings = get_monitor_settings()
|
||||||
|
|
||||||
if os == OS.windows:
|
if vm_os == OS.windows:
|
||||||
return {
|
return {
|
||||||
"name": "OMSExtension",
|
"name": "OMSExtension",
|
||||||
"publisher": "Microsoft.EnterpriseCloud.Monitoring",
|
"publisher": "Microsoft.EnterpriseCloud.Monitoring",
|
||||||
@ -51,7 +51,7 @@ def monitor_extension(region: Region, os: OS) -> Extension:
|
|||||||
"settings": {"workspaceId": settings["id"]},
|
"settings": {"workspaceId": settings["id"]},
|
||||||
"protectedSettings": {"workspaceKey": settings["key"]},
|
"protectedSettings": {"workspaceKey": settings["key"]},
|
||||||
}
|
}
|
||||||
elif os == OS.linux:
|
elif vm_os == OS.linux:
|
||||||
return {
|
return {
|
||||||
"name": "OMSExtension",
|
"name": "OMSExtension",
|
||||||
"publisher": "Microsoft.EnterpriseCloud.Monitoring",
|
"publisher": "Microsoft.EnterpriseCloud.Monitoring",
|
||||||
@ -62,11 +62,11 @@ def monitor_extension(region: Region, os: OS) -> Extension:
|
|||||||
"settings": {"workspaceId": settings["id"]},
|
"settings": {"workspaceId": settings["id"]},
|
||||||
"protectedSettings": {"workspaceKey": settings["key"]},
|
"protectedSettings": {"workspaceKey": settings["key"]},
|
||||||
}
|
}
|
||||||
raise NotImplementedError("unsupported os: %s" % os)
|
raise NotImplementedError("unsupported os: %s" % vm_os)
|
||||||
|
|
||||||
|
|
||||||
def dependency_extension(region: Region, os: OS) -> Optional[Extension]:
|
def dependency_extension(region: Region, vm_os: OS) -> Optional[Extension]:
|
||||||
if os == OS.windows:
|
if vm_os == OS.windows:
|
||||||
extension = {
|
extension = {
|
||||||
"name": "DependencyAgentWindows",
|
"name": "DependencyAgentWindows",
|
||||||
"publisher": "Microsoft.Azure.Monitoring.DependencyAgent",
|
"publisher": "Microsoft.Azure.Monitoring.DependencyAgent",
|
||||||
@ -90,13 +90,13 @@ def dependency_extension(region: Region, os: OS) -> Optional[Extension]:
|
|||||||
|
|
||||||
|
|
||||||
def build_pool_config(pool_name: str) -> str:
|
def build_pool_config(pool_name: str) -> str:
|
||||||
agent_config = AgentConfig(
|
config = AgentConfig(
|
||||||
pool_name=pool_name,
|
pool_name=pool_name,
|
||||||
onefuzz_url=get_instance_url(),
|
onefuzz_url=get_instance_url(),
|
||||||
instrumentation_key=os.environ.get("APPINSIGHTS_INSTRUMENTATIONKEY"),
|
instrumentation_key=os.environ.get("APPINSIGHTS_INSTRUMENTATIONKEY"),
|
||||||
heartbeat_queue=get_queue_sas(
|
heartbeat_queue=get_queue_sas(
|
||||||
"node-heartbeat",
|
"node-heartbeat",
|
||||||
account_id=os.environ["ONEFUZZ_FUNC_STORAGE"],
|
account_id=get_func_storage(),
|
||||||
add=True,
|
add=True,
|
||||||
),
|
),
|
||||||
telemetry_key=os.environ.get("ONEFUZZ_TELEMETRY"),
|
telemetry_key=os.environ.get("ONEFUZZ_TELEMETRY"),
|
||||||
@ -105,7 +105,7 @@ def build_pool_config(pool_name: str) -> str:
|
|||||||
save_blob(
|
save_blob(
|
||||||
"vm-scripts",
|
"vm-scripts",
|
||||||
"%s/config.json" % pool_name,
|
"%s/config.json" % pool_name,
|
||||||
agent_config.json(),
|
config.json(),
|
||||||
account_id=get_func_storage(),
|
account_id=get_func_storage(),
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -117,7 +117,7 @@ def build_pool_config(pool_name: str) -> str:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def update_managed_scripts(mode: AgentMode) -> None:
|
def update_managed_scripts() -> None:
|
||||||
commands = [
|
commands = [
|
||||||
"azcopy sync '%s' instance-specific-setup"
|
"azcopy sync '%s' instance-specific-setup"
|
||||||
% (
|
% (
|
||||||
@ -151,14 +151,14 @@ def update_managed_scripts(mode: AgentMode) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def agent_config(
|
def agent_config(
|
||||||
region: Region, os: OS, mode: AgentMode, *, urls: Optional[List[str]] = None
|
region: Region, vm_os: OS, mode: AgentMode, *, urls: Optional[List[str]] = None
|
||||||
) -> Extension:
|
) -> Extension:
|
||||||
update_managed_scripts(mode)
|
update_managed_scripts()
|
||||||
|
|
||||||
if urls is None:
|
if urls is None:
|
||||||
urls = []
|
urls = []
|
||||||
|
|
||||||
if os == OS.windows:
|
if vm_os == OS.windows:
|
||||||
urls += [
|
urls += [
|
||||||
get_file_sas_url(
|
get_file_sas_url(
|
||||||
"vm-scripts",
|
"vm-scripts",
|
||||||
@ -200,7 +200,7 @@ def agent_config(
|
|||||||
"protectedSettings": {},
|
"protectedSettings": {},
|
||||||
}
|
}
|
||||||
return extension
|
return extension
|
||||||
elif os == OS.linux:
|
elif vm_os == OS.linux:
|
||||||
urls += [
|
urls += [
|
||||||
get_file_sas_url(
|
get_file_sas_url(
|
||||||
"vm-scripts",
|
"vm-scripts",
|
||||||
@ -235,13 +235,13 @@ def agent_config(
|
|||||||
}
|
}
|
||||||
return extension
|
return extension
|
||||||
|
|
||||||
raise NotImplementedError("unsupported OS: %s" % os)
|
raise NotImplementedError("unsupported OS: %s" % vm_os)
|
||||||
|
|
||||||
|
|
||||||
def fuzz_extensions(region: Region, os: OS, pool_name: str) -> List[Extension]:
|
def fuzz_extensions(region: Region, vm_os: OS, pool_name: str) -> List[Extension]:
|
||||||
urls = [build_pool_config(pool_name)]
|
urls = [build_pool_config(pool_name)]
|
||||||
fuzz_extension = agent_config(region, os, AgentMode.fuzz, urls=urls)
|
fuzz_extension = agent_config(region, vm_os, AgentMode.fuzz, urls=urls)
|
||||||
extensions = generic_extensions(region, os)
|
extensions = generic_extensions(region, vm_os)
|
||||||
extensions += [fuzz_extension]
|
extensions += [fuzz_extension]
|
||||||
return extensions
|
return extensions
|
||||||
|
|
||||||
|
@ -217,9 +217,9 @@ def notify_ado(
|
|||||||
fail_task(report, err)
|
fail_task(report, err)
|
||||||
except AzureDevOpsClientError as err:
|
except AzureDevOpsClientError as err:
|
||||||
fail_task(report, err)
|
fail_task(report, err)
|
||||||
except AzureDevOpsClientRequestError as err:
|
|
||||||
fail_task(report, err)
|
|
||||||
except AzureDevOpsServiceError as err:
|
except AzureDevOpsServiceError as err:
|
||||||
fail_task(report, err)
|
fail_task(report, err)
|
||||||
|
except AzureDevOpsClientRequestError as err:
|
||||||
|
fail_task(report, err)
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
fail_task(report, err)
|
fail_task(report, err)
|
||||||
|
@ -923,7 +923,7 @@ class Scaleset(BASE_SCALESET, ORMMixin):
|
|||||||
|
|
||||||
def update_nodes(self) -> None:
|
def update_nodes(self) -> None:
|
||||||
# Be in at-least 'setup' before checking for the list of VMs
|
# Be in at-least 'setup' before checking for the list of VMs
|
||||||
if self.state == self.init:
|
if self.state == ScalesetState.init:
|
||||||
return
|
return
|
||||||
|
|
||||||
nodes = Node.search_states(scaleset_id=self.scaleset_id)
|
nodes = Node.search_states(scaleset_id=self.scaleset_id)
|
||||||
@ -958,7 +958,8 @@ class Scaleset(BASE_SCALESET, ORMMixin):
|
|||||||
pool = Pool.get_by_name(self.pool_name)
|
pool = Pool.get_by_name(self.pool_name)
|
||||||
if isinstance(pool, Error):
|
if isinstance(pool, Error):
|
||||||
self.error = pool
|
self.error = pool
|
||||||
return self.halt()
|
self.halt()
|
||||||
|
return
|
||||||
|
|
||||||
logging.debug("updating scaleset configs: %s", self.scaleset_id)
|
logging.debug("updating scaleset configs: %s", self.scaleset_id)
|
||||||
extensions = fuzz_extensions(self.region, pool.os, self.pool_name)
|
extensions = fuzz_extensions(self.region, pool.os, self.pool_name)
|
||||||
|
@ -46,7 +46,7 @@ class Repro(BASE_REPRO, ORMMixin):
|
|||||||
def get_vm(self) -> VM:
|
def get_vm(self) -> VM:
|
||||||
task = Task.get_by_task_id(self.task_id)
|
task = Task.get_by_task_id(self.task_id)
|
||||||
if isinstance(task, Error):
|
if isinstance(task, Error):
|
||||||
raise Exception("previously existing task missing: %s", self.task_id)
|
raise Exception("previously existing task missing: %s" % self.task_id)
|
||||||
|
|
||||||
vm_config = task.get_repro_vm_config()
|
vm_config = task.get_repro_vm_config()
|
||||||
if vm_config is None:
|
if vm_config is None:
|
||||||
@ -78,13 +78,15 @@ class Repro(BASE_REPRO, ORMMixin):
|
|||||||
else:
|
else:
|
||||||
script_result = self.build_repro_script()
|
script_result = self.build_repro_script()
|
||||||
if isinstance(script_result, Error):
|
if isinstance(script_result, Error):
|
||||||
return self.set_error(script_result)
|
self.set_error(script_result)
|
||||||
|
return
|
||||||
|
|
||||||
self.state = VmState.extensions_launch
|
self.state = VmState.extensions_launch
|
||||||
else:
|
else:
|
||||||
result = vm.create()
|
result = vm.create()
|
||||||
if isinstance(result, Error):
|
if isinstance(result, Error):
|
||||||
return self.set_error(result)
|
self.set_error(result)
|
||||||
|
return
|
||||||
self.save()
|
self.save()
|
||||||
|
|
||||||
def set_failed(self, vm_data: VirtualMachine) -> None:
|
def set_failed(self, vm_data: VirtualMachine) -> None:
|
||||||
@ -108,15 +110,17 @@ class Repro(BASE_REPRO, ORMMixin):
|
|||||||
vm = self.get_vm()
|
vm = self.get_vm()
|
||||||
vm_data = vm.get()
|
vm_data = vm.get()
|
||||||
if not vm_data:
|
if not vm_data:
|
||||||
return self.set_error(
|
self.set_error(
|
||||||
Error(
|
Error(
|
||||||
code=ErrorCode.VM_CREATE_FAILED,
|
code=ErrorCode.VM_CREATE_FAILED,
|
||||||
errors=["failed before launching extensions"],
|
errors=["failed before launching extensions"],
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
return
|
||||||
|
|
||||||
if vm_data.provisioning_state == "Failed":
|
if vm_data.provisioning_state == "Failed":
|
||||||
return self.set_failed(vm_data)
|
self.set_failed(vm_data)
|
||||||
|
return
|
||||||
|
|
||||||
if not self.ip:
|
if not self.ip:
|
||||||
self.ip = get_public_ip(vm_data.network_profile.network_interfaces[0].id)
|
self.ip = get_public_ip(vm_data.network_profile.network_interfaces[0].id)
|
||||||
@ -126,7 +130,8 @@ class Repro(BASE_REPRO, ORMMixin):
|
|||||||
)
|
)
|
||||||
result = vm.add_extensions(extensions)
|
result = vm.add_extensions(extensions)
|
||||||
if isinstance(result, Error):
|
if isinstance(result, Error):
|
||||||
return self.set_error(result)
|
self.set_error(result)
|
||||||
|
return
|
||||||
elif result:
|
elif result:
|
||||||
self.state = VmState.running
|
self.state = VmState.running
|
||||||
|
|
||||||
|
@ -80,8 +80,8 @@ def ok(
|
|||||||
def not_ok(
|
def not_ok(
|
||||||
error: Error, *, status_code: int = 400, context: Union[str, UUID]
|
error: Error, *, status_code: int = 400, context: Union[str, UUID]
|
||||||
) -> HttpResponse:
|
) -> HttpResponse:
|
||||||
if 400 <= status_code and status_code <= 599:
|
if 400 <= status_code <= 599:
|
||||||
logging.error("request error - %s: %s" % (str(context), error.json()))
|
logging.error("request error - %s: %s", str(context), error.json())
|
||||||
|
|
||||||
return HttpResponse(
|
return HttpResponse(
|
||||||
error.json(), status_code=status_code, mimetype="application/json"
|
error.json(), status_code=status_code, mimetype="application/json"
|
||||||
|
@ -79,7 +79,7 @@ def check_containers(definition: TaskDefinition, config: TaskConfig) -> None:
|
|||||||
for container_type in containers:
|
for container_type in containers:
|
||||||
if container_type not in [x.type for x in definition.containers]:
|
if container_type not in [x.type for x in definition.containers]:
|
||||||
raise TaskConfigError(
|
raise TaskConfigError(
|
||||||
"unsupported container type for this task: %s", container_type.name
|
"unsupported container type for this task: %s" % container_type.name
|
||||||
)
|
)
|
||||||
|
|
||||||
if definition.monitor_queue:
|
if definition.monitor_queue:
|
||||||
|
@ -320,7 +320,7 @@ class DebugNotification(Command):
|
|||||||
|
|
||||||
if reports is None:
|
if reports is None:
|
||||||
raise Exception(
|
raise Exception(
|
||||||
"task does not have a %s container", report_container_type.name
|
"task does not have a %s container" % report_container_type.name
|
||||||
)
|
)
|
||||||
|
|
||||||
with tempfile.TemporaryDirectory() as tempdir:
|
with tempfile.TemporaryDirectory() as tempdir:
|
||||||
|
@ -19,7 +19,7 @@ from .top_view import render
|
|||||||
|
|
||||||
def background_task(queue: PriorityQueue) -> None:
|
def background_task(queue: PriorityQueue) -> None:
|
||||||
while True:
|
while True:
|
||||||
(priority, entry) = queue.get(block=True)
|
(_, entry) = queue.get(block=True)
|
||||||
if entry is None:
|
if entry is None:
|
||||||
queue.task_done()
|
queue.task_done()
|
||||||
return
|
return
|
||||||
|
@ -80,11 +80,11 @@ class Template(Command):
|
|||||||
self.logger.info("not removing: %s", container)
|
self.logger.info("not removing: %s", container)
|
||||||
continue
|
continue
|
||||||
to_remove.append(container.name)
|
to_remove.append(container.name)
|
||||||
for name in to_remove:
|
for container_name in to_remove:
|
||||||
if name in containers:
|
if container_name in containers:
|
||||||
self.logger.info("removing container: %s", name)
|
self.logger.info("removing container: %s", container_name)
|
||||||
self.onefuzz.containers.delete(name)
|
self.onefuzz.containers.delete(container_name)
|
||||||
containers.remove(name)
|
containers.remove(container_name)
|
||||||
|
|
||||||
if stop_notifications:
|
if stop_notifications:
|
||||||
notifications = self.onefuzz.notifications.list()
|
notifications = self.onefuzz.notifications.list()
|
||||||
|
Reference in New Issue
Block a user