from fastapi import FastAPI, APIRouter, HTTPException, Depends, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.responses import StreamingResponse from dotenv import load_dotenv from starlette.middleware.cors import CORSMiddleware from motor.motor_asyncio import AsyncIOMotorClient import os import logging from pathlib import Path from pydantic import BaseModel, Field, ConfigDict, EmailStr from typing import List, Optional import uuid from datetime import datetime, timezone, timedelta from passlib.context import CryptContext from jose import JWTError, jwt import io from openpyxl import Workbook ROOT_DIR = Path(__file__).parent load_dotenv(ROOT_DIR / '.env') # JWT Configuration SECRET_KEY = os.environ.get('JWT_SECRET', 'insurtech-secret-key-2024') ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # MongoDB connection mongo_url = os.environ['MONGO_URL'] client = AsyncIOMotorClient(mongo_url) db = client[os.environ['DB_NAME']] app = FastAPI(title="InsurTech Core API") api_router = APIRouter(prefix="/api") security = HTTPBearer() # ============ MODELS ============ class UserBase(BaseModel): email: EmailStr nombre: str role: str = "user" # user or admin class UserCreate(UserBase): password: str class UserUpdate(BaseModel): nombre: Optional[str] = None role: Optional[str] = None is_active: Optional[bool] = None class User(UserBase): model_config = ConfigDict(extra="ignore") id: str = Field(default_factory=lambda: str(uuid.uuid4())) is_active: bool = True created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class LoginRequest(BaseModel): email: EmailStr password: str class Token(BaseModel): access_token: str token_type: str user: dict class SiniestroBase(BaseModel): fecha_registro: str contratante: str asegurado: str descripcion: str folio: str fecha_entrega: str class SiniestroCreate(SiniestroBase): pass class Siniestro(SiniestroBase): model_config = ConfigDict(extra="ignore") id: str = Field(default_factory=lambda: str(uuid.uuid4())) created_by: str = "" created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class MovimientoBase(BaseModel): tipo: str # Alta, Baja, Cambio fecha_registro: str contratante: str asegurado: str descripcion: str folio: str fecha_entrega: str class MovimientoCreate(MovimientoBase): pass class Movimiento(MovimientoBase): model_config = ConfigDict(extra="ignore") id: str = Field(default_factory=lambda: str(uuid.uuid4())) created_by: str = "" created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) # ============ AUTH HELPERS ============ def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def create_access_token(data: dict): to_encode = data.copy() expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token inválido o expirado", headers={"WWW-Authenticate": "Bearer"}, ) try: token = credentials.credentials payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id: str = payload.get("sub") if user_id is None: raise credentials_exception except JWTError: raise credentials_exception user = await db.users.find_one({"id": user_id}, {"_id": 0, "password": 0}) if user is None: raise credentials_exception return user async def require_admin(current_user: dict = Depends(get_current_user)): if current_user.get("role") != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Se requieren permisos de administrador" ) return current_user # ============ AUTH ROUTES ============ @api_router.post("/auth/register", response_model=dict) async def register(user_data: UserCreate): existing = await db.users.find_one({"email": user_data.email}) if existing: raise HTTPException(status_code=400, detail="El email ya está registrado") user_count = await db.users.count_documents({}) role = "admin" if user_count == 0 else "user" user = User( email=user_data.email, nombre=user_data.nombre, role=role ) user_dict = user.model_dump() user_dict["password"] = get_password_hash(user_data.password) user_dict["created_at"] = user_dict["created_at"].isoformat() await db.users.insert_one(user_dict) return {"message": "Usuario registrado exitosamente", "role": role} @api_router.post("/auth/login", response_model=Token) async def login(login_data: LoginRequest): user = await db.users.find_one({"email": login_data.email}) if not user or not verify_password(login_data.password, user["password"]): raise HTTPException(status_code=401, detail="Credenciales incorrectas") if not user.get("is_active", True): raise HTTPException(status_code=401, detail="Usuario desactivado") access_token = create_access_token(data={"sub": user["id"]}) user_response = { "id": user["id"], "email": user["email"], "nombre": user["nombre"], "role": user["role"] } return Token(access_token=access_token, token_type="bearer", user=user_response) @api_router.get("/auth/me", response_model=dict) async def get_me(current_user: dict = Depends(get_current_user)): return current_user # ============ USERS ROUTES (Admin only) ============ @api_router.get("/users", response_model=List[dict]) async def get_users(admin: dict = Depends(require_admin)): users = await db.users.find({}, {"_id": 0, "password": 0}).to_list(1000) return users @api_router.put("/users/{user_id}", response_model=dict) async def update_user(user_id: str, user_update: UserUpdate, admin: dict = Depends(require_admin)): update_data = {k: v for k, v in user_update.model_dump().items() if v is not None} if not update_data: raise HTTPException(status_code=400, detail="No hay datos para actualizar") result = await db.users.update_one({"id": user_id}, {"$set": update_data}) if result.matched_count == 0: raise HTTPException(status_code=404, detail="Usuario no encontrado") return {"message": "Usuario actualizado"} @api_router.delete("/users/{user_id}", response_model=dict) async def delete_user(user_id: str, admin: dict = Depends(require_admin)): if admin["id"] == user_id: raise HTTPException(status_code=400, detail="No puedes eliminarte a ti mismo") result = await db.users.delete_one({"id": user_id}) if result.deleted_count == 0: raise HTTPException(status_code=404, detail="Usuario no encontrado") return {"message": "Usuario eliminado"} # ============ SINIESTROS ROUTES ============ @api_router.post("/siniestros", response_model=dict) async def create_siniestro(data: SiniestroCreate, current_user: dict = Depends(get_current_user)): siniestro = Siniestro(**data.model_dump(), created_by=current_user["id"]) doc = siniestro.model_dump() doc["created_at"] = doc["created_at"].isoformat() await db.siniestros.insert_one(doc) return {"message": "Siniestro registrado", "id": siniestro.id} @api_router.get("/siniestros", response_model=List[dict]) async def get_siniestros(current_user: dict = Depends(get_current_user)): siniestros = await db.siniestros.find({}, {"_id": 0}).sort("created_at", -1).to_list(1000) return siniestros @api_router.put("/siniestros/{siniestro_id}", response_model=dict) async def update_siniestro(siniestro_id: str, data: SiniestroCreate, admin: dict = Depends(require_admin)): result = await db.siniestros.update_one( {"id": siniestro_id}, {"$set": data.model_dump()} ) if result.matched_count == 0: raise HTTPException(status_code=404, detail="Siniestro no encontrado") return {"message": "Siniestro actualizado"} @api_router.delete("/siniestros/{siniestro_id}", response_model=dict) async def delete_siniestro(siniestro_id: str, admin: dict = Depends(require_admin)): result = await db.siniestros.delete_one({"id": siniestro_id}) if result.deleted_count == 0: raise HTTPException(status_code=404, detail="Siniestro no encontrado") return {"message": "Siniestro eliminado"} # ============ MOVIMIENTOS ROUTES ============ @api_router.post("/movimientos", response_model=dict) async def create_movimiento(data: MovimientoCreate, current_user: dict = Depends(get_current_user)): movimiento = Movimiento(**data.model_dump(), created_by=current_user["id"]) doc = movimiento.model_dump() doc["created_at"] = doc["created_at"].isoformat() await db.movimientos.insert_one(doc) return {"message": "Movimiento registrado", "id": movimiento.id} @api_router.get("/movimientos", response_model=List[dict]) async def get_movimientos(current_user: dict = Depends(get_current_user)): movimientos = await db.movimientos.find({}, {"_id": 0}).sort("created_at", -1).to_list(1000) return movimientos @api_router.put("/movimientos/{movimiento_id}", response_model=dict) async def update_movimiento(movimiento_id: str, data: MovimientoCreate, admin: dict = Depends(require_admin)): result = await db.movimientos.update_one( {"id": movimiento_id}, {"$set": data.model_dump()} ) if result.matched_count == 0: raise HTTPException(status_code=404, detail="Movimiento no encontrado") return {"message": "Movimiento actualizado"} @api_router.delete("/movimientos/{movimiento_id}", response_model=dict) async def delete_movimiento(movimiento_id: str, admin: dict = Depends(require_admin)): result = await db.movimientos.delete_one({"id": movimiento_id}) if result.deleted_count == 0: raise HTTPException(status_code=404, detail="Movimiento no encontrado") return {"message": "Movimiento eliminado"} # ============ EXPORT TO EXCEL ============ @api_router.get("/export/siniestros") async def export_siniestros(current_user: dict = Depends(get_current_user)): siniestros = await db.siniestros.find({}, {"_id": 0}).to_list(1000) wb = Workbook() ws = wb.active ws.title = "Siniestros" headers = ["Folio", "Fecha Registro", "Contratante", "Asegurado", "Descripción", "Fecha Entrega"] ws.append(headers) for s in siniestros: ws.append([s["folio"], s["fecha_registro"], s["contratante"], s["asegurado"], s["descripcion"], s["fecha_entrega"]]) output = io.BytesIO() wb.save(output) output.seek(0) return StreamingResponse( output, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": "attachment; filename=siniestros.xlsx"} ) @api_router.get("/export/movimientos") async def export_movimientos(current_user: dict = Depends(get_current_user)): movimientos = await db.movimientos.find({}, {"_id": 0}).to_list(1000) wb = Workbook() ws = wb.active ws.title = "Movimientos" headers = ["Folio", "Tipo", "Fecha Registro", "Contratante", "Asegurado", "Descripción", "Fecha Entrega"] ws.append(headers) for m in movimientos: ws.append([m["folio"], m["tipo"], m["fecha_registro"], m["contratante"], m["asegurado"], m["descripcion"], m["fecha_entrega"]]) output = io.BytesIO() wb.save(output) output.seek(0) return StreamingResponse( output, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": "attachment; filename=movimientos.xlsx"} ) # ============ ROOT ============ @api_router.get("/") async def root(): return {"message": "InsurTech Core API"} app.include_router(api_router) app.add_middleware( CORSMiddleware, allow_credentials=True, allow_origins=os.environ.get('CORS_ORIGINS', '*').split(','), allow_methods=["*"], allow_headers=["*"], ) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @app.on_event("shutdown") async def shutdown_db_client(): client.close()