Add a registration command to allow an application registration to connect to the Onefuzz (#2571)

This commit is contained in:
Cheick Keita
2022-11-03 09:41:38 -07:00
committed by GitHub
parent 6cc7caf67b
commit fb8a4e36f1
2 changed files with 98 additions and 26 deletions

View File

@ -383,7 +383,7 @@ class Client:
}, },
{ {
"allowedMemberTypes": ["Application"], "allowedMemberTypes": ["Application"],
"description": "Allow access from a lab machine.", "description": "Allow access from a managed node.",
"displayName": OnefuzzAppRole.ManagedNode.value, "displayName": OnefuzzAppRole.ManagedNode.value,
"id": str(uuid.uuid4()), "id": str(uuid.uuid4()),
"isEnabled": True, "isEnabled": True,
@ -397,6 +397,14 @@ class Client:
"isEnabled": True, "isEnabled": True,
"value": OnefuzzAppRole.UserAssignment.value, "value": OnefuzzAppRole.UserAssignment.value,
}, },
{
"allowedMemberTypes": ["Application"],
"description": "Allow access from an unmanaged node.",
"displayName": OnefuzzAppRole.UnmanagedNode.value,
"id": str(uuid.uuid4()),
"isEnabled": True,
"value": OnefuzzAppRole.UnmanagedNode.value,
},
] ]
if not app: if not app:

View File

@ -10,7 +10,17 @@ import time
import urllib.parse import urllib.parse
from datetime import datetime, timedelta from datetime import datetime, timedelta
from enum import Enum from enum import Enum
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple, TypeVar from typing import (
Any,
Callable,
Dict,
List,
NamedTuple,
Optional,
Tuple,
TypeVar,
Union,
)
from uuid import UUID from uuid import UUID
import requests import requests
@ -24,6 +34,8 @@ logger = logging.getLogger("deploy")
GRAPH_RESOURCE = "https://graph.microsoft.com" GRAPH_RESOURCE = "https://graph.microsoft.com"
GRAPH_RESOURCE_ENDPOINT = "https://graph.microsoft.com/v1.0" GRAPH_RESOURCE_ENDPOINT = "https://graph.microsoft.com/v1.0"
NameOrAppId = Union[str, UUID]
class GraphQueryError(Exception): class GraphQueryError(Exception):
def __init__(self, message: str, status_code: Optional[int]) -> None: def __init__(self, message: str, status_code: Optional[int]) -> None:
@ -117,6 +129,7 @@ def retry(
data: Any = None, data: Any = None,
) -> OperationResult: ) -> OperationResult:
count = 0 count = 0
error = None
while True: while True:
try: try:
return operation(data) return operation(data)
@ -160,6 +173,7 @@ class OnefuzzAppRole(Enum):
ManagedNode = "ManagedNode" ManagedNode = "ManagedNode"
CliClient = "CliClient" CliClient = "CliClient"
UserAssignment = "UserAssignment" UserAssignment = "UserAssignment"
UnmanagedNode = "UnmanagedNode"
def register_application( def register_application(
@ -309,22 +323,32 @@ def create_application_registration(
registered_app_id = registered_app["appId"] registered_app_id = registered_app["appId"]
app_id = app["appId"] app_id = app["appId"]
authorize_and_assign_role(app_id, registered_app_id, approle, subscription_id)
return registered_app
def authorize_and_assign_role(
onfuzz_app_id: UUID,
registered_app_id: UUID,
role: OnefuzzAppRole,
subscription_id: str,
) -> None:
def try_authorize_application(data: Any) -> None: def try_authorize_application(data: Any) -> None:
authorize_application( authorize_application(
UUID(registered_app_id), registered_app_id,
UUID(app_id), onfuzz_app_id,
subscription_id=subscription_id, subscription_id=subscription_id,
) )
retry(try_authorize_application, "authorize application") retry(try_authorize_application, "authorize application")
def try_assign_instance_role(data: Any) -> None: def try_assign_instance_role(data: Any) -> None:
assign_instance_app_role(onefuzz_instance_name, name, subscription_id, approle) assign_instance_app_role(
onfuzz_app_id, registered_app_id, subscription_id, role
)
retry(try_assign_instance_role, "assingn role") retry(try_assign_instance_role, "assingn role")
return registered_app
def add_application_password( def add_application_password(
password_name: str, app_object_id: UUID, subscription_id: str password_name: str, app_object_id: UUID, subscription_id: str
@ -422,6 +446,10 @@ def authorize_application(
permissions: List[str] = ["user_impersonation"], permissions: List[str] = ["user_impersonation"],
subscription_id: Optional[str] = None, subscription_id: Optional[str] = None,
) -> None: ) -> None:
logger.info(
f"authorizing registration {registration_app_id} to access application {onefuzz_app_id} with the permissions '{', '.join(permissions)}'"
)
onefuzz_app = get_application( onefuzz_app = get_application(
app_id=onefuzz_app_id, subscription_id=subscription_id app_id=onefuzz_app_id, subscription_id=subscription_id
) )
@ -575,8 +603,8 @@ def assign_app_role(
def assign_instance_app_role( def assign_instance_app_role(
onefuzz_instance_name: str, onefuzz_instance: NameOrAppId,
application_name: str, application_name: NameOrAppId,
subscription_id: str, subscription_id: str,
app_role: OnefuzzAppRole, app_role: OnefuzzAppRole,
) -> None: ) -> None:
@ -585,18 +613,25 @@ def assign_instance_app_role(
their managed identity to the provided App Role their managed identity to the provided App Role
""" """
onefuzz_service_appIds = query_microsoft_graph_list( logger.info(
method="GET", f"Assigning app role {app_role} from {onefuzz_instance} to {application_name}"
resource="applications",
params={
"$filter": "displayName eq '%s'" % onefuzz_instance_name,
"$select": "appId",
},
subscription=subscription_id,
) )
if len(onefuzz_service_appIds) == 0: if isinstance(onefuzz_instance, str):
raise Exception("onefuzz app registration not found") onefuzz_service_appIds = query_microsoft_graph_list(
appId = onefuzz_service_appIds[0]["appId"] method="GET",
resource="applications",
params={
"$filter": "displayName eq '%s'" % onefuzz_instance,
"$select": "appId",
},
subscription=subscription_id,
)
if len(onefuzz_service_appIds) == 0:
raise Exception("onefuzz app registration not found")
appId = onefuzz_service_appIds[0]["appId"]
else:
appId = onefuzz_instance
onefuzz_service_principals = query_microsoft_graph_list( onefuzz_service_principals = query_microsoft_graph_list(
method="GET", method="GET",
resource="servicePrincipals", resource="servicePrincipals",
@ -607,15 +642,27 @@ def assign_instance_app_role(
if len(onefuzz_service_principals) == 0: if len(onefuzz_service_principals) == 0:
raise Exception("onefuzz app service principal not found") raise Exception("onefuzz app service principal not found")
onefuzz_service_principal = onefuzz_service_principals[0] onefuzz_service_principal = onefuzz_service_principals[0]
application_service_principals = query_microsoft_graph_list(
method="GET", if isinstance(application_name, str):
resource="servicePrincipals", application_service_principals = query_microsoft_graph_list(
params={"$filter": "displayName eq '%s'" % application_name}, method="GET",
subscription=subscription_id, resource="servicePrincipals",
) params={"$filter": "displayName eq '%s'" % application_name},
subscription=subscription_id,
)
else:
application_service_principals = query_microsoft_graph_list(
method="GET",
resource="servicePrincipals",
params={"$filter": "appId eq '%s'" % application_name},
subscription=subscription_id,
)
if len(application_service_principals) == 0: if len(application_service_principals) == 0:
raise Exception(f"application '{application_name}' service principal not found") raise Exception(f"application '{application_name}' service principal not found")
application_service_principal = application_service_principals[0] application_service_principal = application_service_principals[0]
managed_node_role = ( managed_node_role = (
seq(onefuzz_service_principal["appRoles"]) seq(onefuzz_service_principal["appRoles"])
.filter(lambda x: x["value"] == app_role.value) .filter(lambda x: x["value"] == app_role.value)
@ -799,6 +846,12 @@ def main() -> None:
cli_registration_parser.add_argument( cli_registration_parser.add_argument(
"--registration_name", help="the name of the cli registration" "--registration_name", help="the name of the cli registration"
) )
register_app_parser = subparsers.add_parser("register_app", parents=[parent_parser])
register_app_parser.add_argument("--app_id", help="the application id to register")
register_app_parser.add_argument(
"--role",
help=f"the role of the application to register. Valid values: {', '.join([member.value for member in OnefuzzAppRole])}",
)
args = parser.parse_args() args = parser.parse_args()
if args.verbose: if args.verbose:
@ -821,6 +874,17 @@ def main() -> None:
args.subscription_id, args.subscription_id,
display_secret=True, display_secret=True,
) )
elif args.command == "register_app":
onefuzz_app_id = get_application(display_name=onefuzz_instance_name)
if not onefuzz_app_id:
raise Exception("could not find onefuzz application")
authorize_and_assign_role(
UUID(onefuzz_app_id["appId"]),
UUID(args.app_id),
OnefuzzAppRole(args.role),
args.subscription_id,
)
elif args.command == "assign_scaleset_role": elif args.command == "assign_scaleset_role":
assign_instance_app_role( assign_instance_app_role(
onefuzz_instance_name, onefuzz_instance_name,