Files
MOHPortalTest-AllAgents-All…/qwen/python/merchants_of_hope/api/v1/auth.py

73 lines
2.4 KiB
Python

"""
Authentication API routes
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer
from datetime import datetime, timedelta
from typing import Optional
import jwt
from pydantic import BaseModel
from sqlalchemy.orm import Session
from ..config.settings import settings
from ..database import SessionLocal
from ..models import User
router = APIRouter()
security = HTTPBearer()
# Pydantic models for auth
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
tenant_id: Optional[str] = None
class UserLogin(BaseModel):
username: str
password: str
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
@router.post("/token", response_model=Token)
async def login_for_access_token(form_data: UserLogin, db: Session = Depends(SessionLocal)):
# This is a simplified version - in a real app, you'd hash passwords
user = db.query(User).filter(User.username == form_data.username).first()
if not user or user.hashed_password != form_data.password: # Simplified check
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Inactive user",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "tenant_id": user.tenant_id},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/oidc-config")
async def get_oidc_config():
"""Get OIDC configuration"""
return {
"issuer": settings.OIDC_ISSUER,
"client_id": settings.OIDC_CLIENT_ID,
"redirect_uri": settings.OIDC_REDIRECT_URI
}