Add linting to deployment tools (#332)

This commit is contained in:
bmc-msft
2020-11-20 13:00:19 -05:00
committed by GitHub
parent 9e2a61fe66
commit 3ddb756504
5 changed files with 118 additions and 76 deletions

View File

@ -270,6 +270,18 @@ jobs:
with:
name: build-artifacts
path: artifacts
- uses: actions/setup-python@v2
with:
python-version: 3.7
- name: Lint
shell: bash
run: |
set -ex
cd src/deployment
pip install mypy isort black
mypy .
isort --profile black . --check
black . --check
- name: Package Onefuzz
run: |
set -ex

View File

@ -4,14 +4,14 @@
# Licensed under the MIT License.
import argparse
from uuid import UUID
import json
from typing import Callable, Dict, List
from uuid import UUID
from azure.common.client_factory import get_client_from_cli_profile
from azure.cosmosdb.table.tablebatch import TableBatch
from azure.cosmosdb.table.tableservice import TableService
from azure.mgmt.storage import StorageManagementClient
from azure.common.client_factory import get_client_from_cli_profile
def migrate_task_os(table_service: TableService) -> None:
@ -84,7 +84,7 @@ def migrate(table_service: TableService, migration_names: List[str]) -> None:
print("migration '%s' applied" % name)
def main():
def main() -> None:
formatter = argparse.ArgumentDefaultsHelpFormatter
parser = argparse.ArgumentParser(formatter_class=formatter)
parser.add_argument("resource_group")

View File

@ -16,7 +16,8 @@ import time
import uuid
import zipfile
from datetime import datetime, timedelta
from typing import Optional
from typing import Dict, List, Optional, Tuple, Union, cast
from uuid import UUID
from azure.common.client_factory import get_client_from_cli_profile
from azure.common.credentials import get_cli_profile
@ -62,11 +63,11 @@ from msrest.serialization import TZ_UTC
from data_migration import migrate
from registration import (
OnefuzzAppRole,
add_application_password,
assign_scaleset_role,
authorize_application,
get_application,
OnefuzzAppRole,
register_application,
update_pool_registration,
)
@ -94,27 +95,28 @@ FUNC_TOOLS_ERROR = (
logger = logging.getLogger("deploy")
def gen_guid():
def gen_guid() -> str:
return str(uuid.uuid4())
class Client:
def __init__(
self,
resource_group,
location,
application_name,
owner,
client_id,
client_secret,
app_zip,
tools,
instance_specific,
third_party,
arm_template,
workbook_data,
create_registration,
migrations,
*,
resource_group: str,
location: str,
application_name: str,
owner: str,
client_id: Optional[str],
client_secret: Optional[str],
app_zip: str,
tools: str,
instance_specific: str,
third_party: str,
arm_template: str,
workbook_data: str,
create_registration: bool,
migrations: List[str],
export_appinsights: bool,
log_service_principal: bool,
upgrade: bool,
@ -130,11 +132,11 @@ class Client:
self.third_party = third_party
self.create_registration = create_registration
self.upgrade = upgrade
self.results = {
self.results: Dict = {
"client_id": client_id,
"client_secret": client_secret,
}
self.cli_config = {
self.cli_config: Dict[str, Union[str, UUID]] = {
"client_id": ONEFUZZ_CLI_APP,
"authority": ONEFUZZ_CLI_AUTHORITY,
}
@ -161,22 +163,22 @@ class Client:
with open(workbook_data) as f:
self.workbook_data = json.load(f)
def get_subscription_id(self):
def get_subscription_id(self) -> str:
profile = get_cli_profile()
return profile.get_subscription_id()
return cast(str, profile.get_subscription_id())
def get_location_display_name(self):
def get_location_display_name(self) -> str:
location_client = get_client_from_cli_profile(SubscriptionClient)
locations = location_client.subscriptions.list_locations(
self.get_subscription_id()
)
for location in locations:
if location.name == self.location:
return location.display_name
return cast(str, location.display_name)
raise Exception("unknown location: %s", self.location)
def check_region(self):
def check_region(self) -> None:
# At the moment, this only checks are the specified providers available
# in the selected region
@ -223,7 +225,7 @@ class Client:
print("\n".join(["* " + x for x in unsupported]))
sys.exit(1)
def create_password(self, object_id):
def create_password(self, object_id: UUID) -> Tuple[str, str]:
# Work-around the race condition where the app is created but passwords cannot
# be created yet.
count = 0
@ -238,7 +240,7 @@ class Client:
if count > timeout_seconds / wait:
raise Exception("creating password failed, trying again")
def setup_rbac(self):
def setup_rbac(self) -> None:
"""
Setup the client application for the OneFuzz instance.
@ -281,6 +283,8 @@ class Client:
),
]
app: Optional[Application] = None
if not existing:
logger.info("creating Application registration")
url = "https://%s.azurewebsites.net" % self.application_name
@ -311,7 +315,7 @@ class Client:
)
client.service_principals.create(service_principal_params)
else:
app: Application = existing[0]
app = existing[0]
existing_role_values = [app_role.value for app_role in app.app_roles]
has_missing_roles = any(
[role.value not in existing_role_values for role in app_roles]
@ -365,7 +369,7 @@ class Client:
else:
logger.debug("client_id: %s client_secret: %s", app.app_id, password)
def deploy_template(self):
def deploy_template(self) -> None:
logger.info("deploying arm template: %s", self.arm_template)
with open(self.arm_template, "r") as template_handle:
@ -403,7 +407,7 @@ class Client:
sys.exit(1)
self.results["deploy"] = result.properties.outputs
def assign_scaleset_identity_role(self):
def assign_scaleset_identity_role(self) -> None:
if self.upgrade:
logger.info("Upgrading: skipping assignment of the managed identity role")
return
@ -413,14 +417,14 @@ class Client:
self.results["deploy"]["scaleset-identity"]["value"],
)
def apply_migrations(self):
def apply_migrations(self) -> None:
self.results["deploy"]["func-storage"]["value"]
name = self.results["deploy"]["func-name"]["value"]
key = self.results["deploy"]["func-key"]["value"]
table_service = TableService(account_name=name, account_key=key)
migrate(table_service, self.migrations)
def create_queues(self):
def create_queues(self) -> None:
logger.info("creating eventgrid destination queue")
name = self.results["deploy"]["func-name"]["value"]
@ -443,7 +447,7 @@ class Client:
except ResourceExistsError:
pass
def create_eventgrid(self):
def create_eventgrid(self) -> None:
logger.info("creating eventgrid subscription")
src_resource_id = self.results["deploy"]["fuzz-storage"]["value"]
dst_resource_id = self.results["deploy"]["func-storage"]["value"]
@ -474,7 +478,7 @@ class Client:
% json.dumps(result.as_dict(), indent=4, sort_keys=True),
)
def add_instance_id(self):
def add_instance_id(self) -> None:
logger.info("setting instance_id log export")
container_name = "base-config"
@ -497,7 +501,7 @@ class Client:
logger.info("instance_id: %s", instance_id)
def add_log_export(self):
def add_log_export(self) -> None:
if not self.export_appinsights:
logger.info("not exporting appinsights")
return
@ -561,7 +565,7 @@ class Client:
self.resource_group, self.application_name, req
)
def upload_tools(self):
def upload_tools(self) -> None:
logger.info("uploading tools from %s", self.tools)
account_name = self.results["deploy"]["func-name"]["value"]
key = self.results["deploy"]["func-key"]["value"]
@ -587,7 +591,7 @@ class Client:
[self.azcopy, "sync", self.tools, url, "--delete-destination", "true"]
)
def upload_instance_setup(self):
def upload_instance_setup(self) -> None:
logger.info("uploading instance-specific-setup from %s", self.instance_specific)
account_name = self.results["deploy"]["func-name"]["value"]
key = self.results["deploy"]["func-key"]["value"]
@ -622,7 +626,7 @@ class Client:
]
)
def upload_third_party(self):
def upload_third_party(self) -> None:
logger.info("uploading third-party tools from %s", self.third_party)
account_name = self.results["deploy"]["fuzz-name"]["value"]
key = self.results["deploy"]["fuzz-key"]["value"]
@ -654,10 +658,13 @@ class Client:
[self.azcopy, "sync", path, url, "--delete-destination", "true"]
)
def deploy_app(self):
def deploy_app(self) -> None:
logger.info("deploying function app %s", self.app_zip)
with tempfile.TemporaryDirectory() as tmpdirname:
with zipfile.ZipFile(self.app_zip, "r") as zip_ref:
func = shutil.which("func")
assert func is not None
zip_ref.extractall(tmpdirname)
error: Optional[subprocess.CalledProcessError] = None
max_tries = 5
@ -665,7 +672,7 @@ class Client:
try:
subprocess.check_output(
[
shutil.which("func"),
func,
"azure",
"functionapp",
"publish",
@ -688,12 +695,12 @@ class Client:
if error is not None:
raise error
def update_registration(self):
def update_registration(self) -> None:
if not self.create_registration:
return
update_pool_registration(self.application_name)
def done(self):
def done(self) -> None:
logger.info(TELEMETRY_NOTICE)
client_secret_arg = (
("--client_secret %s" % self.cli_config["client_secret"])
@ -710,19 +717,19 @@ class Client:
)
def arg_dir(arg):
def arg_dir(arg: str) -> str:
if not os.path.isdir(arg):
raise argparse.ArgumentTypeError("not a directory: %s" % arg)
return arg
def arg_file(arg):
def arg_file(arg: str) -> str:
if not os.path.isfile(arg):
raise argparse.ArgumentTypeError("not a file: %s" % arg)
return arg
def main():
def main() -> None:
states = [
("check_region", Client.check_region),
("rbac", Client.setup_rbac),
@ -826,23 +833,23 @@ def main():
sys.exit(1)
client = Client(
args.resource_group,
args.location,
args.application_name,
args.owner,
args.client_id,
args.client_secret,
args.app_zip,
args.tools,
args.instance_specific,
args.third_party,
args.arm_template,
args.workbook_data,
args.create_pool_registration,
args.apply_migrations,
args.export_appinsights,
args.log_service_principal,
args.upgrade,
resource_group=args.resource_group,
location=args.location,
application_name=args.application_name,
owner=args.owner,
client_id=args.client_id,
client_secret=args.client_secret,
app_zip=args.app_zip,
tools=args.tools,
instance_specific=args.instance_specific,
third_party=args.third_party,
arm_template=args.arm_template,
workbook_data=args.workbook_data,
create_registration=args.create_pool_registration,
migrations=args.apply_migrations,
export_appinsights=args.export_appinsights,
log_service_principal=args.log_service_principal,
upgrade=args.upgrade,
)
if args.verbose:
level = logging.DEBUG

23
src/deployment/mypy.ini Normal file
View File

@ -0,0 +1,23 @@
[mypy]
disallow_untyped_defs = True
follow_imports = silent
check_untyped_defs = True
; disallow_any_generics = True
no_implicit_reexport = True
strict_optional = True
warn_redundant_casts = True
warn_return_any = True
warn_unused_configs = True
warn_unused_ignores = True
[mypy-azure.*]
ignore_missing_imports = True
[mypy-msrestazure.*]
ignore_missing_imports = True
[mypy-msrest.*]
ignore_missing_imports = True
[mypy-functional.*]
ignore_missing_imports = True

View File

@ -9,7 +9,7 @@ import time
import urllib.parse
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, NamedTuple, Optional, Tuple
from typing import Any, Dict, List, NamedTuple, Optional, Tuple
from uuid import UUID, uuid4
import requests
@ -30,7 +30,7 @@ logger = logging.getLogger("deploy")
class GraphQueryError(Exception):
def __init__(self, message, status_code):
def __init__(self, message: str, status_code: int) -> None:
super(GraphQueryError, self).__init__(message)
self.status_code = status_code
@ -40,7 +40,7 @@ def query_microsoft_graph(
resource: str,
params: Optional[Dict] = None,
body: Optional[Dict] = None,
):
) -> Any:
profile = get_cli_profile()
(token_type, access_token, _), _, _ = profile.get_raw_token(
resource="https://graph.microsoft.com"
@ -139,7 +139,7 @@ def create_application_credential(application_name: str) -> str:
app: Application = apps[0]
(key, password) = add_application_password(app.object_id)
(_, password) = add_application_password(app.object_id)
return str(password)
@ -210,7 +210,7 @@ def create_application_registration(
return registered_app
def add_application_password(app_object_id: UUID) -> Optional[Tuple[str, str]]:
def add_application_password(app_object_id: UUID) -> Tuple[str, str]:
key = uuid4()
password_request = {
"passwordCredential": {
@ -232,10 +232,10 @@ def add_application_password(app_object_id: UUID) -> Optional[Tuple[str, str]]:
return (str(key), password["secretText"])
except GraphQueryError as err:
logger.warning("creating password failed : %s" % err)
None
raise err
def get_application(app_id: UUID) -> Optional[Dict]:
def get_application(app_id: UUID) -> Optional[Any]:
apps: Dict = query_microsoft_graph(
method="GET",
resource="applications",
@ -251,7 +251,7 @@ def authorize_application(
registration_app_id: UUID,
onefuzz_app_id: UUID,
permissions: List[str] = ["user_impersonation"],
):
) -> None:
onefuzz_app = get_application(onefuzz_app_id)
if onefuzz_app is None:
logger.error("Application '%s' not found" % onefuzz_app_id)
@ -290,7 +290,7 @@ def authorize_application(
def create_and_display_registration(
onefuzz_instance_name: str, registration_name: str, approle: OnefuzzAppRole
):
) -> None:
logger.info("Updating application registration")
application_info = register_application(
registration_name=registration_name,
@ -303,7 +303,7 @@ def create_and_display_registration(
logger.info("client_secret: %s" % application_info.client_secret)
def update_pool_registration(onefuzz_instance_name: str):
def update_pool_registration(onefuzz_instance_name: str) -> None:
create_and_display_registration(
onefuzz_instance_name,
"%s_pool" % onefuzz_instance_name,
@ -311,7 +311,7 @@ def update_pool_registration(onefuzz_instance_name: str):
)
def assign_scaleset_role(onefuzz_instance_name: str, scaleset_name: str):
def assign_scaleset_role(onefuzz_instance_name: str, scaleset_name: str) -> None:
""" Allows the nodes in the scaleset to access the service by assigning their managed identity to the ManagedNode Role """
onefuzz_service_appId = query_microsoft_graph(
@ -380,7 +380,7 @@ def assign_scaleset_role(onefuzz_instance_name: str, scaleset_name: str):
)
def main():
def main() -> None:
formatter = argparse.ArgumentDefaultsHelpFormatter
parent_parser = argparse.ArgumentParser(add_help=False)