Provide the ability to create a new cli application registration (#236)

This commit is contained in:
Cheick Keita
2020-11-02 09:44:07 -08:00
committed by GitHub
parent 2d5cda79b3
commit e026e50582
2 changed files with 98 additions and 31 deletions

View File

@ -65,6 +65,7 @@ from registration import (
assign_scaleset_role,
authorize_application,
get_application,
OnefuzzAppRole,
register_application,
update_pool_registration,
)
@ -260,19 +261,19 @@ class Client:
app_roles = [
AppRole(
allowed_member_types=["Application"],
display_name="CliClient",
display_name=OnefuzzAppRole.CliClient.value,
id=str(uuid.uuid4()),
is_enabled=True,
description="Allows access from the CLI.",
value="CliClient",
value=OnefuzzAppRole.CliClient.value,
),
AppRole(
allowed_member_types=["Application"],
display_name="ManagedNode",
display_name=OnefuzzAppRole.ManagedNode.value,
id=str(uuid.uuid4()),
is_enabled=True,
description="Allow access from a lab machine.",
value="ManagedNode",
value=OnefuzzAppRole.ManagedNode.value,
),
]
@ -340,7 +341,9 @@ class Client:
"Could not find the default CLI application under the current "
"subscription, creating a new one"
)
app_info = register_application("onefuzz-cli", self.application_name)
app_info = register_application(
"onefuzz-cli", self.application_name, OnefuzzAppRole.CliClient
)
self.cli_config = {
"client_id": app_info.client_id,
"authority": app_info.authority,

View File

@ -5,8 +5,10 @@
import argparse
import logging
import time
import urllib.parse
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, NamedTuple, Optional, Tuple
from uuid import UUID, uuid4
@ -17,6 +19,7 @@ from azure.graphrbac import GraphRbacManagementClient
from azure.graphrbac.models import (
Application,
ApplicationCreateParameters,
AppRole,
RequiredResourceAccess,
ResourceAccess,
)
@ -27,7 +30,13 @@ logger = logging.getLogger("deploy")
class GraphQueryError(Exception):
pass
def __init__(self, message, status_code):
super(GraphQueryError, self).__init__(message)
self.status_code = status_code
@property
def status_code():
return self.status_code
def query_microsoft_graph(
@ -59,7 +68,9 @@ def query_microsoft_graph(
else:
error_text = str(response.content, encoding="utf-8", errors="backslashreplace")
raise GraphQueryError(
"request did not succeed: HTTP %s - %s" % (response.status_code, error_text)
"request did not succeed: HTTP %s - %s"
% (response.status_code, error_text),
response.status_code,
)
@ -69,20 +80,31 @@ class ApplicationInfo(NamedTuple):
authority: str
class OnefuzzAppRole(Enum):
ManagedNode = "ManagedNode"
CliClient = "CliClient"
def register_application(
registration_name: str, onefuzz_instance_name: str
registration_name: str, onefuzz_instance_name: str, approle: OnefuzzAppRole
) -> ApplicationInfo:
logger.debug("retrieving the application registration")
logger.info("retrieving the application registration %s" % registration_name)
client = get_client_from_cli_profile(GraphRbacManagementClient)
apps: List[Application] = list(
client.applications.list(filter="displayName eq '%s'" % registration_name)
)
if len(apps) == 0:
logger.debug("No existing registration found. creating a new one")
app = create_application_registration(onefuzz_instance_name, registration_name)
logger.info("No existing registration found. creating a new one")
app = create_application_registration(
onefuzz_instance_name, registration_name, approle
)
else:
app = apps[0]
logger.info(
"Found existing application objectId '%s' - appid '%s"
% (app.object_id, app.app_id)
)
onefuzz_apps: List[Application] = list(
client.applications.list(filter="displayName eq '%s'" % onefuzz_instance_name)
@ -113,6 +135,7 @@ def register_application(
def create_application_credential(application_name: str) -> str:
""" Add a new password to the application registration """
logger.info("creating application credential for '%s'" % application_name)
client = get_client_from_cli_profile(GraphRbacManagementClient)
apps: List[Application] = list(
client.applications.list(filter="displayName eq '%s'" % application_name)
@ -125,7 +148,7 @@ def create_application_credential(application_name: str) -> str:
def create_application_registration(
onefuzz_instance_name: str, name: str
onefuzz_instance_name: str, name: str, approle: OnefuzzAppRole
) -> Application:
""" Create an application registration """
@ -136,7 +159,9 @@ def create_application_registration(
app = apps[0]
resource_access = [
ResourceAccess(id=role.id, type="Role") for role in app.app_roles
ResourceAccess(id=role.id, type="Role")
for role in app.app_roles
if role.value == approle.value
]
params = ApplicationCreateParameters(
@ -158,16 +183,33 @@ def create_application_registration(
registered_app: Application = client.applications.create(params)
atttempts = 5
while True:
if atttempts < 0:
raise Exception(
"Unable to create application registration, Please try again"
)
atttempts = atttempts - 1
try:
time.sleep(5)
query_microsoft_graph(
method="PATCH",
resource="applications/%s" % registered_app.object_id,
body={
"publicClient": {
"redirectUris": ["https://%s.azurewebsites.net" % onefuzz_instance_name]
"redirectUris": [
"https://%s.azurewebsites.net" % onefuzz_instance_name
]
},
"isFallbackPublicClient": True,
},
)
break
except GraphQueryError as err:
if err.status_code == 404:
continue
authorize_application(UUID(registered_app.app_id), UUID(app.app_id))
return registered_app
@ -250,12 +292,14 @@ def authorize_application(
)
def update_pool_registration(application_name: str):
def create_and_display_registration(
onefuzz_instance_name: str, registration_name: str, approle: OnefuzzAppRole
):
logger.info("Updating application registration")
application_info = register_application(
registration_name=("%s_pool" % application_name),
onefuzz_instance_name=application_name,
registration_name=registration_name,
onefuzz_instance_name=onefuzz_instance_name,
approle=approle,
)
logger.info("Registration complete")
logger.info("These generated credentials are valid for a year")
@ -263,6 +307,14 @@ def update_pool_registration(application_name: str):
logger.info("client_secret: %s" % application_info.client_secret)
def update_pool_registration(onefuzz_instance_name: str):
create_and_display_registration(
onefuzz_instance_name,
"%s_pool" % onefuzz_instance_name,
OnefuzzAppRole.ManagedNode,
)
def assign_scaleset_role(onefuzz_instance_name: str, scaleset_name: str):
""" Allows the nodes in the scaleset to access the service by assigning their managed identity to the ManagedNode Role """
@ -300,7 +352,7 @@ def assign_scaleset_role(onefuzz_instance_name: str, scaleset_name: str):
managed_node_role = (
seq(onefuzz_service_principal["appRoles"])
.filter(lambda x: x["value"] == "ManagedNode")
.filter(lambda x: x["value"] == OnefuzzAppRole.ManagedNode.value)
.head_option()
)
@ -359,6 +411,12 @@ def main():
"scaleset_name",
help="the name of the scaleset",
)
cli_registration_parser = subparsers.add_parser(
"create_cli_registration", parents=[parent_parser]
)
cli_registration_parser.add_argument(
"--registration_name", help="the name of the cli registration"
)
args = parser.parse_args()
if args.verbose:
@ -369,10 +427,16 @@ def main():
logging.basicConfig(format="%(levelname)s:%(message)s", level=level)
logging.getLogger("deploy").setLevel(logging.INFO)
onefuzz_instance_name = args.onefuzz_instance
if args.command == "update_pool_registration":
update_pool_registration(args.onefuzz_instance)
update_pool_registration(onefuzz_instance_name)
elif args.command == "create_cli_registration":
registration_name = args.registration_name or ("%s_cli" % onefuzz_instance_name)
create_and_display_registration(
onefuzz_instance_name, registration_name, OnefuzzAppRole.CliClient
)
elif args.command == "assign_scaleset_role":
assign_scaleset_role(args.onefuzz_instance, args.scaleset_name)
assign_scaleset_role(onefuzz_instance_name, args.scaleset_name)
else:
raise Exception("invalid arguments")