diff --git a/gns3server/api/routes/controller/users.py b/gns3server/api/routes/controller/users.py index 89d14275..2092b251 100644 --- a/gns3server/api/routes/controller/users.py +++ b/gns3server/api/routes/controller/users.py @@ -28,7 +28,7 @@ from gns3server import schemas from gns3server.controller.controller_error import ( ControllerBadRequestError, ControllerNotFoundError, - ControllerUnauthorizedError, + ControllerForbiddenError, ) from gns3server.db.repositories.users import UsersRepository @@ -110,8 +110,8 @@ async def delete_user( Delete an user. """ - if current_user.is_superuser: - raise ControllerUnauthorizedError("The super user cannot be deleted") + if current_user.is_superadmin: + raise ControllerForbiddenError("The super user cannot be deleted") success = await users_repo.delete_user(user_id) if not success: diff --git a/gns3server/db/models/users.py b/gns3server/db/models/users.py index 2dca0894..99108975 100644 --- a/gns3server/db/models/users.py +++ b/gns3server/db/models/users.py @@ -15,9 +15,14 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from sqlalchemy import Boolean, Column, String +from sqlalchemy import Boolean, Column, String, event from .base import BaseTable, generate_uuid, GUID +from gns3server.services import auth_service + +import logging + +log = logging.getLogger(__name__) class User(BaseTable): @@ -30,4 +35,18 @@ class User(BaseTable): full_name = Column(String) hashed_password = Column(String) is_active = Column(Boolean, default=True) - is_superuser = Column(Boolean, default=False) + is_superadmin = Column(Boolean, default=False) + +@event.listens_for(User.__table__, 'after_create') +def create_default_super_admin(target, connection, **kw): + + hashed_password = auth_service.hash_password("admin") + stmt = target.insert().values( + username="admin", + full_name="Super Administrator", + hashed_password=hashed_password, + is_superadmin=True + ) + connection.execute(stmt) + connection.commit() + log.info("Default super admin account added") diff --git a/gns3server/schemas/controller/users.py b/gns3server/schemas/controller/users.py index 1073311c..11aa100e 100644 --- a/gns3server/schemas/controller/users.py +++ b/gns3server/schemas/controller/users.py @@ -52,7 +52,7 @@ class User(DateTimeModelMixin, UserBase): user_id: UUID is_active: bool = True - is_superuser: bool = False + is_superadmin: bool = False class Config: orm_mode = True diff --git a/tests/api/routes/controller/test_users.py b/tests/api/routes/controller/test_users.py index f8ad92f0..6fbfc7cf 100644 --- a/tests/api/routes/controller/test_users.py +++ b/tests/api/routes/controller/test_users.py @@ -118,7 +118,7 @@ class TestUserRoutes: response = await client.get(app.url_path_for("get_users")) assert response.status_code == status.HTTP_200_OK - assert len(response.json()) == 3 # user1, user2 and user3 should exist + assert len(response.json()) == 4 # admin, user1, user2 and user3 should exist class TestAuthTokens: @@ -265,3 +265,30 @@ class TestUserMe: res = await client.get(app.url_path_for("get_current_active_user")) assert res.status_code == status.HTTP_401_UNAUTHORIZED + + +class TestSuperAdmin: + + async def test_super_admin_exists( + self, + app: FastAPI, + client: AsyncClient, + db_session: AsyncSession + ) -> None: + + user_repo = UsersRepository(db_session) + admin_in_db = await user_repo.get_user_by_username("admin") + assert admin_in_db is not None + assert auth_service.verify_password("admin", admin_in_db.hashed_password) + + async def test_cannot_delete_super_admin( + self, + app: FastAPI, + admin_client: AsyncClient, + db_session: AsyncSession + ) -> None: + + user_repo = UsersRepository(db_session) + admin_in_db = await user_repo.get_user_by_username("admin") + res = await admin_client.delete(app.url_path_for("delete_user", user_id=admin_in_db.user_id)) + assert res.status_code == status.HTTP_403_FORBIDDEN diff --git a/tests/conftest.py b/tests/conftest.py index f54c1422..b75cd8b2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -79,8 +79,12 @@ async def db_session(db_engine): # preferred and faster way would be to rollback the session/transaction # but it doesn't work for some reason async with db_engine.begin() as conn: - await conn.run_sync(Base.metadata.drop_all) - await conn.run_sync(Base.metadata.create_all) + # Speed up tests by avoiding to hash the 'admin' password everytime the default super admin is added + # to the database using the "after_create" sqlalchemy event + hashed_password = "$2b$12$jPsNU9IS7.EWEqXahtDfo.26w6VLOLCuFEHKNvDpOjxs5e0WpqJfa" + with patch("gns3server.services.authentication.AuthService.hash_password", return_value=hashed_password): + await conn.run_sync(Base.metadata.drop_all) + await conn.run_sync(Base.metadata.create_all) session = AsyncSession(db_engine) try: @@ -152,6 +156,16 @@ def authorized_client(client: AsyncClient, test_user: User) -> AsyncClient: } return client +@pytest.fixture +async def admin_client(client: AsyncClient) -> AsyncClient: + + # user "admin" is automatically created when the users table is created + access_token = auth_service.create_access_token("admin") + client.headers = { + **client.headers, + "Authorization": f"Bearer {access_token}", + } + return client @pytest.fixture def controller_config_path(tmpdir):