mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-20 21:35:42 +00:00
@ -39,25 +39,53 @@ def is_agent(token_data: UserInfo) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def call_if_agent(
|
||||
req: func.HttpRequest, method: Callable[[func.HttpRequest], func.HttpResponse]
|
||||
) -> func.HttpResponse:
|
||||
def is_user(token_data: UserInfo) -> bool:
|
||||
return not is_agent(token_data)
|
||||
|
||||
|
||||
def reject(req: func.HttpRequest, token: UserInfo) -> func.HttpResponse:
|
||||
logging.error(
|
||||
"reject token. url:%s token:%s body:%s",
|
||||
repr(req.url),
|
||||
repr(token),
|
||||
repr(req.get_body()),
|
||||
)
|
||||
return not_ok(
|
||||
Error(code=ErrorCode.UNAUTHORIZED, errors=["Unrecognized agent"]),
|
||||
status_code=401,
|
||||
context="token verification",
|
||||
)
|
||||
|
||||
|
||||
def call_if(
|
||||
req: func.HttpRequest,
|
||||
method: Callable[[func.HttpRequest], func.HttpResponse],
|
||||
*,
|
||||
allow_user: bool = False,
|
||||
allow_agent: bool = False
|
||||
) -> func.HttpResponse:
|
||||
token = parse_jwt_token(req)
|
||||
if isinstance(token, Error):
|
||||
return not_ok(token, status_code=401, context="token verification")
|
||||
|
||||
if not is_agent(token):
|
||||
logging.error(
|
||||
"rejecting token url:%s token:%s body:%s",
|
||||
repr(req.url),
|
||||
repr(token),
|
||||
repr(req.get_body()),
|
||||
)
|
||||
return not_ok(
|
||||
Error(code=ErrorCode.UNAUTHORIZED, errors=["Unrecognized agent"]),
|
||||
status_code=401,
|
||||
context="token verification",
|
||||
)
|
||||
if is_user(token) and not allow_user:
|
||||
return reject(req, token)
|
||||
|
||||
if is_agent(token) and not allow_agent:
|
||||
return reject(req, token)
|
||||
|
||||
return method(req)
|
||||
|
||||
|
||||
def call_if_user(
|
||||
req: func.HttpRequest, method: Callable[[func.HttpRequest], func.HttpResponse]
|
||||
) -> func.HttpResponse:
|
||||
|
||||
return call_if(req, method, allow_user=True)
|
||||
|
||||
|
||||
def call_if_agent(
|
||||
req: func.HttpRequest, method: Callable[[func.HttpRequest], func.HttpResponse]
|
||||
) -> func.HttpResponse:
|
||||
|
||||
return call_if(req, method, allow_agent=True)
|
Reference in New Issue
Block a user