Allow logged in user to change some of its data. Administrators can lock users using the is_active field.

This commit is contained in:
grossmj 2021-08-09 16:36:18 +09:30
parent 1f0ceb6f74
commit 6dd0f4d4d3
4 changed files with 101 additions and 6 deletions

View File

@ -98,13 +98,20 @@ async def get_logged_in_user(current_user: schemas.User = Depends(get_current_ac
return current_user
@router.get("/me", response_model=schemas.User)
async def get_logged_in_user(current_user: schemas.User = Depends(get_current_active_user)) -> schemas.User:
@router.put("/me", response_model=schemas.User)
async def update_logged_in_user(
user_update: schemas.LoggedInUserUpdate,
current_user: schemas.User = Depends(get_current_active_user),
users_repo: UsersRepository = Depends(get_repository(UsersRepository))
) -> schemas.User:
"""
Get the current active user.
Update the current active user.
"""
return current_user
if user_update.email and await users_repo.get_user_by_email(user_update.email):
raise ControllerBadRequestError(f"Email '{user_update.email}' is already registered")
return await users_repo.update_user(current_user.user_id, user_update)
@router.get("", response_model=List[schemas.User], dependencies=[Depends(get_current_active_user)])
@ -166,6 +173,12 @@ async def update_user(
Update an user.
"""
if user_update.username and await users_repo.get_user_by_username(user_update.username):
raise ControllerBadRequestError(f"Username '{user_update.username}' is already registered")
if user_update.email and await users_repo.get_user_by_email(user_update.email):
raise ControllerBadRequestError(f"Email '{user_update.email}' is already registered")
user = await users_repo.update_user(user_id, user_update)
if not user:
raise ControllerNotFoundError(f"User '{user_id}' not found")

View File

@ -27,7 +27,7 @@ from .controller.drawings import Drawing
from .controller.gns3vm import GNS3VM
from .controller.nodes import NodeCreate, NodeUpdate, NodeDuplicate, NodeCapture, Node
from .controller.projects import ProjectCreate, ProjectUpdate, ProjectDuplicate, Project, ProjectFile
from .controller.users import UserCreate, UserUpdate, User, Credentials, UserGroupCreate, UserGroupUpdate, UserGroup
from .controller.users import UserCreate, UserUpdate, LoggedInUserUpdate, User, Credentials, UserGroupCreate, UserGroupUpdate, UserGroup
from .controller.rbac import RoleCreate, RoleUpdate, Role, PermissionCreate, PermissionUpdate, Permission
from .controller.tokens import Token
from .controller.snapshots import SnapshotCreate, Snapshot

View File

@ -28,6 +28,7 @@ class UserBase(BaseModel):
"""
username: Optional[str] = Field(None, min_length=3, regex="[a-zA-Z0-9_-]+$")
is_active: bool = True
email: Optional[EmailStr]
full_name: Optional[str]
@ -49,11 +50,20 @@ class UserUpdate(UserBase):
password: Optional[SecretStr] = Field(None, min_length=6, max_length=100)
class LoggedInUserUpdate(BaseModel):
"""
Properties to update a logged in user.
"""
password: Optional[SecretStr] = Field(None, min_length=6, max_length=100)
email: Optional[EmailStr]
full_name: Optional[str]
class User(DateTimeModelMixin, UserBase):
user_id: UUID
last_login: Optional[datetime] = None
is_active: bool = True
is_superadmin: bool = False
class Config:

View File

@ -97,6 +97,38 @@ class TestUserRoutes:
response = await client.post(app.url_path_for("create_user"), json=new_user)
assert response.status_code == status_code
@pytest.mark.parametrize(
"attr, value, status_code",
(
("email", "user@email.com", status.HTTP_200_OK),
("email", "user@email.com", status.HTTP_400_BAD_REQUEST),
("username", "user2", status.HTTP_400_BAD_REQUEST),
("email", "invalid_email@one@two.io", status.HTTP_422_UNPROCESSABLE_ENTITY),
("password", "short", status.HTTP_422_UNPROCESSABLE_ENTITY),
("username", "user2@#$%^<>", status.HTTP_422_UNPROCESSABLE_ENTITY),
("username", "ab", status.HTTP_422_UNPROCESSABLE_ENTITY),
("full_name", "John Doe", status.HTTP_200_OK),
("password", "password123", status.HTTP_200_OK),
("is_active", True, status.HTTP_200_OK),
)
)
async def test_update_user(
self,
app: FastAPI,
client: AsyncClient,
db_session: AsyncSession,
attr: str,
value: str,
status_code: int,
) -> None:
user_repo = UsersRepository(db_session)
user_in_db = await user_repo.get_user_by_username("user2")
update_user = {}
update_user[attr] = value
response = await client.put(app.url_path_for("update_user", user_id=user_in_db.user_id), json=update_user)
assert response.status_code == status_code
async def test_users_saved_password_is_hashed(
self,
app: FastAPI,
@ -285,6 +317,46 @@ class TestUserMe:
response = await unauthorized_client.get(app.url_path_for("get_logged_in_user"))
assert response.status_code == status.HTTP_401_UNAUTHORIZED
async def test_authenticated_user_can_update_own_data(
self,
app: FastAPI,
authorized_client: AsyncClient,
test_user: User,
) -> None:
response = await authorized_client.get(app.url_path_for("get_logged_in_user"))
assert response.status_code == status.HTTP_200_OK
user = User(**response.json())
assert user.username == test_user.username
assert user.email == test_user.email
assert user.user_id == test_user.user_id
# logged in users can only change their email, full name and password
@pytest.mark.parametrize(
"attr, value, status_code",
(
("email", "user42@email.com", status.HTTP_200_OK),
("email", "user42@email.com", status.HTTP_400_BAD_REQUEST),
("full_name", "John Doe", status.HTTP_200_OK),
("password", "password123", status.HTTP_200_OK),
)
)
async def test_authenticated_user_can_update_own_data(
self,
app: FastAPI,
authorized_client: AsyncClient,
attr: str,
value: str,
status_code: int,
) -> None:
update_user = {}
update_user[attr] = value
response = await authorized_client.put(
app.url_path_for("update_logged_in_user"),
json=update_user
)
assert response.status_code == status_code
class TestSuperAdmin: