Use broker or browser login instead of device flow (#2612)

Update CLI to attempt broker or browser-based authentication first; if you `Ctrl-C` to cancel it, you can fall back to device code login.

Also updated the MSAL dependency to latest version and pass `allow_broker=True` which will allow the use of Web Account Manager (WAM), if it is available.

Using browser auth requires the `http://localhost` redirect URI, and using the broker requires a special custom URI including the app ID (see code).
This commit is contained in:
George Pollard
2022-11-15 09:13:36 +13:00
committed by GitHub
parent 0caac2fc00
commit c5840eb69b
4 changed files with 221 additions and 155 deletions

View File

@ -17,7 +17,7 @@ import time
import uuid
import zipfile
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Union, cast
from typing import Any, Dict, List, Optional, Tuple, Union, cast
from uuid import UUID
from azure.common.credentials import get_cli_profile
@ -408,135 +408,9 @@ class Client:
]
if not app:
logger.info("creating Application registration")
params = {
"displayName": self.application_name,
"identifierUris": self.get_identifier_urls(),
"signInAudience": self.get_signin_audience(),
"appRoles": app_roles,
"api": {
"oauth2PermissionScopes": [
{
"adminConsentDescription": f"Allow the application to access {self.application_name} on behalf of the signed-in user.",
"adminConsentDisplayName": f"Access {self.application_name}",
"id": str(uuid.uuid4()),
"isEnabled": True,
"type": "User",
"userConsentDescription": f"Allow the application to access {self.application_name} on your behalf.",
"userConsentDisplayName": f"Access {self.application_name}",
"value": "user_impersonation",
}
]
},
"web": {
"implicitGrantSettings": {
"enableAccessTokenIssuance": False,
"enableIdTokenIssuance": True,
},
"redirectUris": [
f"{url}/.auth/login/aad/callback"
for url in self.get_instance_urls()
],
},
"requiredResourceAccess": [
{
"resourceAccess": [
{"id": USER_READ_PERMISSION, "type": "Scope"}
],
"resourceAppId": MICROSOFT_GRAPH_APP_ID,
}
],
}
app = query_microsoft_graph(
method="POST",
resource="applications",
body=params,
subscription=self.get_subscription_id(),
)
logger.info("creating service principal")
service_principal_params = {
"accountEnabled": True,
"appRoleAssignmentRequired": True,
"servicePrincipalType": "Application",
"appId": app["appId"],
}
def try_sp_create() -> None:
error: Optional[Exception] = None
for _ in range(10):
try:
query_microsoft_graph(
method="POST",
resource="servicePrincipals",
body=service_principal_params,
subscription=self.get_subscription_id(),
)
return
except GraphQueryError as err:
# work around timing issue when creating service principal
# https://github.com/Azure/azure-cli/issues/14767
if (
"service principal being created must in the local tenant"
not in str(err)
):
raise err
logger.warning(
"creating service principal failed with an error that occurs "
"due to AAD race conditions"
)
time.sleep(60)
if error is None:
raise Exception("service principal creation failed")
else:
raise error
try_sp_create()
app = self.create_new_app_registration(app_roles)
else:
identifier_uris: List[str] = app["identifierUris"]
api_ids = [
id for id in self.get_identifier_urls() if id not in identifier_uris
]
if len(api_ids) > 0:
identifier_uris.extend(api_ids)
query_microsoft_graph(
method="PATCH",
resource=f"applications/{app['id']}",
body={"identifierUris": identifier_uris},
subscription=self.get_subscription_id(),
)
existing_role_values = [app_role["value"] for app_role in app["appRoles"]]
has_missing_roles = any(
[role["value"] not in existing_role_values for role in app_roles]
)
if has_missing_roles:
# disabling the existing app role first to allow the update
# this is a requirement to update the application roles
for role in app["appRoles"]:
role["isEnabled"] = False
query_microsoft_graph(
method="PATCH",
resource=f"applications/{app['id']}",
body={"appRoles": app["appRoles"]},
subscription=self.get_subscription_id(),
)
# overriding the list of app roles
query_microsoft_graph(
method="PATCH",
resource=f"applications/{app['id']}",
body={"appRoles": app_roles},
subscription=self.get_subscription_id(),
)
self.update_existing_app_registration(app, app_roles)
if self.multi_tenant_domain and app["signInAudience"] == "AzureADMyOrg":
set_app_audience(
@ -602,6 +476,31 @@ class Client:
"client_id": onefuzz_cli_app["appId"],
"authority": authority,
}
# ensure replyURLs is set properly
if "publicClient" not in onefuzz_cli_app:
onefuzz_cli_app["publicClient"] = {}
if "redirectUris" not in onefuzz_cli_app["publicClient"]:
onefuzz_cli_app["publicClient"]["redirectUris"] = []
requiredRedirectUris = [
"http://localhost", # required for browser-based auth
f"ms-appx-web://Microsoft.AAD.BrokerPlugin/{onefuzz_cli_app['appId']}", # required for broker auth
]
redirectUris: List[str] = onefuzz_cli_app["publicClient"]["redirectUris"]
updatedRedirectUris = list(set(requiredRedirectUris) | set(redirectUris))
if len(updatedRedirectUris) > len(redirectUris):
logger.info("Updating redirectUris for CLI app")
query_microsoft_graph(
method="PATCH",
resource=f"applications/{onefuzz_cli_app['id']}",
body={"publicClient": {"redirectUris": updatedRedirectUris}},
subscription=self.get_subscription_id(),
)
assign_instance_app_role(
self.application_name,
onefuzz_cli_app["displayName"],
@ -612,6 +511,145 @@ class Client:
self.results["client_id"] = app["appId"]
self.results["client_secret"] = password
def update_existing_app_registration(
self, app: Dict[str, Any], app_roles: List[Dict[str, Any]]
) -> None:
logger.info("updating Application registration")
update_properties: Dict[str, Any] = {}
# find any identifier URIs that need updating
identifier_uris: List[str] = app["identifierUris"]
updated_identifier_uris = list(
set(identifier_uris) | set(self.get_identifier_urls())
)
if len(updated_identifier_uris) > len(identifier_uris):
update_properties["identifierUris"] = updated_identifier_uris
# find any roles that need updating
existing_role_values: List[str] = [
app_role["value"] for app_role in app["appRoles"]
]
has_missing_roles = any(
[role["value"] not in existing_role_values for role in app_roles]
)
if has_missing_roles:
# disabling the existing app role first to allow the update
# this is a requirement to update the application roles
for role in app["appRoles"]:
role["isEnabled"] = False
query_microsoft_graph(
method="PATCH",
resource=f"applications/{app['id']}",
body={"appRoles": app["appRoles"]},
subscription=self.get_subscription_id(),
)
update_properties["appRoles"] = app_roles
if len(update_properties) > 0:
logger.info(
"- updating app registration properties: {}".format(
", ".join(update_properties.keys())
)
)
query_microsoft_graph(
method="PATCH",
resource=f"applications/{app['id']}",
body=update_properties,
subscription=self.get_subscription_id(),
)
def create_new_app_registration(
self, app_roles: List[Dict[str, Any]]
) -> Dict[str, Any]:
logger.info("creating Application registration")
params = {
"displayName": self.application_name,
"identifierUris": self.get_identifier_urls(),
"signInAudience": self.get_signin_audience(),
"appRoles": app_roles,
"api": {
"oauth2PermissionScopes": [
{
"adminConsentDescription": f"Allow the application to access {self.application_name} on behalf of the signed-in user.",
"adminConsentDisplayName": f"Access {self.application_name}",
"id": str(uuid.uuid4()),
"isEnabled": True,
"type": "User",
"userConsentDescription": f"Allow the application to access {self.application_name} on your behalf.",
"userConsentDisplayName": f"Access {self.application_name}",
"value": "user_impersonation",
}
]
},
"web": {
"implicitGrantSettings": {
"enableAccessTokenIssuance": False,
"enableIdTokenIssuance": True,
},
"redirectUris": [
f"{url}/.auth/login/aad/callback"
for url in self.get_instance_urls()
],
},
"requiredResourceAccess": [
{
"resourceAccess": [{"id": USER_READ_PERMISSION, "type": "Scope"}],
"resourceAppId": MICROSOFT_GRAPH_APP_ID,
}
],
}
app = query_microsoft_graph(
method="POST",
resource="applications",
body=params,
subscription=self.get_subscription_id(),
)
logger.info("creating service principal")
service_principal_params = {
"accountEnabled": True,
"appRoleAssignmentRequired": True,
"servicePrincipalType": "Application",
"appId": app["appId"],
}
def try_sp_create() -> None:
error: Optional[Exception] = None
for _ in range(10):
try:
query_microsoft_graph(
method="POST",
resource="servicePrincipals",
body=service_principal_params,
subscription=self.get_subscription_id(),
)
return
except GraphQueryError as err:
# work around timing issue when creating service principal
# https://github.com/Azure/azure-cli/issues/14767
if (
"service principal being created must in the local tenant"
not in str(err)
):
raise err
logger.warning(
"creating service principal failed with an error that occurs "
"due to AAD race conditions"
)
time.sleep(60)
if error is None:
raise Exception("service principal creation failed")
else:
raise error
try_sp_create()
return app
def deploy_template(self) -> None:
logger.info("deploying arm template: %s", self.arm_template)