Files
MOHPortalTest-AllAgents-All…/qwen/python/merchants_of_hope/utils/security.py
2025-10-24 16:29:40 -05:00

132 lines
4.3 KiB
Python

"""
Security utilities for the application
"""
import re
import secrets
from typing import Optional
from passlib.context import CryptContext
from datetime import datetime, timedelta
from fastapi import HTTPException, status
import jwt
from ..config.settings import settings
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a plain password against its hash"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Generate a hash for a password"""
return pwd_context.hash(password)
def generate_secure_token(length: int = 32) -> str:
"""Generate a cryptographically secure token"""
return secrets.token_urlsafe(length)
def validate_email(email: str) -> bool:
"""Validate email format"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def sanitize_input(input_str: str) -> str:
"""Basic input sanitization to prevent injection"""
# Remove potentially dangerous characters
# This is a basic implementation; in production, use more comprehensive sanitization
sanitized = input_str.replace('<', '&lt;').replace('>', '&gt;')
return sanitized
def check_password_strength(password: str) -> tuple[bool, str]:
"""
Check if a password meets strength requirements
Returns (is_strong, message)
"""
if len(password) < 8:
return False, "Password must be at least 8 characters long"
if not re.search(r"[A-Z]", password):
return False, "Password must contain at least one uppercase letter"
if not re.search(r"[a-z]", password):
return False, "Password must contain at least one lowercase letter"
if not re.search(r"\d", password):
return False, "Password must contain at least one digit"
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
return False, "Password must contain at least one special character"
return True, "Password is strong"
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create a JWT access token with security best practices"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire, "iat": datetime.utcnow()})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> dict:
"""Verify a JWT token with security best practices"""
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
options={"verify_exp": True} # Ensure expiration is checked
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
headers={"WWW-Authenticate": "Bearer"},
)
except jwt.JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def rate_limit_exceeded(identifier: str, limit: int = 100, window: int = 3600) -> bool:
"""
Check if a rate limit has been exceeded.
This would typically interface with a cache like Redis in production.
"""
# This is a placeholder implementation - in production, use Redis or similar
# to track rate limits across requests
return False # Placeholder - always allow in this implementation
def validate_user_input(text: str, max_length: int = 1000) -> bool:
"""Validate user input for length and potential injection attacks"""
if not text or len(text) > max_length:
return False
# Check for potential SQL injection patterns (basic check)
injection_patterns = [
r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|EXEC|UNION|WAITFOR|SLEEP)\b)",
r"(--|#|\/\*|\*\/|;)"
]
for pattern in injection_patterns:
if re.search(pattern, text, re.IGNORECASE):
return False
return True