the beginning of the idiots
This commit is contained in:
		
							
								
								
									
										73
									
								
								qwen/python/merchants_of_hope/api/v1/auth.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										73
									
								
								qwen/python/merchants_of_hope/api/v1/auth.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,73 @@
 | 
			
		||||
"""
 | 
			
		||||
Authentication API routes
 | 
			
		||||
"""
 | 
			
		||||
from fastapi import APIRouter, Depends, HTTPException, status
 | 
			
		||||
from fastapi.security import HTTPBearer
 | 
			
		||||
from datetime import datetime, timedelta
 | 
			
		||||
from typing import Optional
 | 
			
		||||
import jwt
 | 
			
		||||
from pydantic import BaseModel
 | 
			
		||||
from sqlalchemy.orm import Session
 | 
			
		||||
 | 
			
		||||
from ..config.settings import settings
 | 
			
		||||
from ..database import SessionLocal
 | 
			
		||||
from ..models import User
 | 
			
		||||
 | 
			
		||||
router = APIRouter()
 | 
			
		||||
security = HTTPBearer()
 | 
			
		||||
 | 
			
		||||
# Pydantic models for auth
 | 
			
		||||
class Token(BaseModel):
 | 
			
		||||
    access_token: str
 | 
			
		||||
    token_type: str
 | 
			
		||||
 | 
			
		||||
class TokenData(BaseModel):
 | 
			
		||||
    username: Optional[str] = None
 | 
			
		||||
    tenant_id: Optional[str] = None
 | 
			
		||||
 | 
			
		||||
class UserLogin(BaseModel):
 | 
			
		||||
    username: str
 | 
			
		||||
    password: str
 | 
			
		||||
 | 
			
		||||
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
 | 
			
		||||
    to_encode = data.copy()
 | 
			
		||||
    if expires_delta:
 | 
			
		||||
        expire = datetime.utcnow() + expires_delta
 | 
			
		||||
    else:
 | 
			
		||||
        expire = datetime.utcnow() + timedelta(minutes=15)
 | 
			
		||||
    to_encode.update({"exp": expire})
 | 
			
		||||
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
 | 
			
		||||
    return encoded_jwt
 | 
			
		||||
 | 
			
		||||
@router.post("/token", response_model=Token)
 | 
			
		||||
async def login_for_access_token(form_data: UserLogin, db: Session = Depends(SessionLocal)):
 | 
			
		||||
    # This is a simplified version - in a real app, you'd hash passwords
 | 
			
		||||
    user = db.query(User).filter(User.username == form_data.username).first()
 | 
			
		||||
    if not user or user.hashed_password != form_data.password:  # Simplified check
 | 
			
		||||
        raise HTTPException(
 | 
			
		||||
            status_code=status.HTTP_401_UNAUTHORIZED,
 | 
			
		||||
            detail="Incorrect username or password",
 | 
			
		||||
            headers={"WWW-Authenticate": "Bearer"},
 | 
			
		||||
        )
 | 
			
		||||
    if not user.is_active:
 | 
			
		||||
        raise HTTPException(
 | 
			
		||||
            status_code=status.HTTP_401_UNAUTHORIZED,
 | 
			
		||||
            detail="Inactive user",
 | 
			
		||||
            headers={"WWW-Authenticate": "Bearer"},
 | 
			
		||||
        )
 | 
			
		||||
    
 | 
			
		||||
    access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
 | 
			
		||||
    access_token = create_access_token(
 | 
			
		||||
        data={"sub": user.username, "tenant_id": user.tenant_id},
 | 
			
		||||
        expires_delta=access_token_expires
 | 
			
		||||
    )
 | 
			
		||||
    return {"access_token": access_token, "token_type": "bearer"}
 | 
			
		||||
 | 
			
		||||
@router.get("/oidc-config")
 | 
			
		||||
async def get_oidc_config():
 | 
			
		||||
    """Get OIDC configuration"""
 | 
			
		||||
    return {
 | 
			
		||||
        "issuer": settings.OIDC_ISSUER,
 | 
			
		||||
        "client_id": settings.OIDC_CLIENT_ID,
 | 
			
		||||
        "redirect_uri": settings.OIDC_REDIRECT_URI
 | 
			
		||||
    }
 | 
			
		||||
		Reference in New Issue
	
	Block a user