Files
MOHPortalTest-AllAgents-All…/qwen/python/merchants_of_hope/api/v1/users.py
2025-10-24 17:06:14 -05:00

126 lines
3.7 KiB
Python

"""
Users API routes
"""
from fastapi import APIRouter, Depends, HTTPException, status, Request
from typing import List
from pydantic import BaseModel
import hashlib
from sqlalchemy.orm import Session
from ...database import SessionLocal
from ...models import User, UserRole, Tenant
from ...config.settings import settings
router = APIRouter()
# Pydantic models for users
class UserCreate(BaseModel):
email: str
username: str
password: str
role: UserRole
class UserUpdate(BaseModel):
email: str = None
username: str = None
is_active: bool = None
class UserResponse(BaseModel):
id: int
email: str
username: str
role: str
is_active: bool
is_verified: bool
tenant_id: int
class Config:
from_attributes = True
json_schema_extra = {
"example": {
"id": 1,
"email": "user@example.com",
"username": "johndoe",
"role": "job_seeker",
"is_active": True,
"is_verified": True,
"tenant_id": 1
}
}
def hash_password_util(password: str) -> str:
"""Hash password using utility function"""
from ..utils.security import get_password_hash
return get_password_hash(password)
@router.get("/", response_model=List[UserResponse])
async def get_users(skip: int = 0, limit: int = 100, db: Session = Depends(SessionLocal)):
"""Get all users"""
users = db.query(User).offset(skip).limit(limit).all()
return users
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, db: Session = Depends(SessionLocal)):
"""Get a specific user"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.post("/", response_model=UserResponse)
async def create_user(user: UserCreate, db: Session = Depends(SessionLocal)):
"""Create a new user"""
# For testing, use a default tenant
tenant_id = 1 # Default tenant for testing
# Check if user already exists
existing_user = db.query(User).filter(
(User.email == user.email) | (User.username == user.username)
).first()
if existing_user:
raise HTTPException(status_code=400, detail="Email or username already registered")
# Create new user
hashed_pwd = hash_password_util(user.password)
db_user = User(
email=user.email,
username=user.username,
hashed_password=hashed_pwd,
role=user.role.value,
tenant_id=tenant_id # Use default tenant
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user_update: UserUpdate, db: Session = Depends(SessionLocal)):
"""Update a user"""
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
# Update fields if provided
if user_update.email is not None:
db_user.email = user_update.email
if user_update.username is not None:
db_user.username = user_update.username
if user_update.is_active is not None:
db_user.is_active = user_update.is_active
db.commit()
db.refresh(db_user)
return db_user
@router.delete("/{user_id}")
async def delete_user(user_id: int, db: Session = Depends(SessionLocal)):
"""Delete a user"""
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
db.delete(db_user)
db.commit()
return {"message": "User deleted successfully"}