from app.models.user import AppUser from app.models.user import AppUserQuotaLog from app.schemas.app_user import AppUserRegisterSchema, AppUserLoginSchema, AppUserUpdateSchema from app.utils.password import get_password_hash, verify_password from app.core.crud import CRUDBase from fastapi.exceptions import HTTPException from datetime import datetime from typing import Optional class AppUserController(CRUDBase[AppUser, AppUserRegisterSchema, AppUserUpdateSchema]): """AppUser控制器""" def __init__(self): super().__init__(model=AppUser) async def register(self, register_data: AppUserRegisterSchema) -> AppUser: """ 用户注册 - 只需要手机号,默认使用手机号后六位作为密码 """ # 检查手机号是否已存在 existing_user = await self.model.filter(phone=register_data.phone).first() if existing_user: raise HTTPException(status_code=400, detail="手机号已存在") # 生成默认密码:手机号后六位 default_password = register_data.phone[-6:] hashed_password = get_password_hash(default_password) # 创建新用户 new_user = self.model( phone=register_data.phone, password=hashed_password, is_active=True ) await new_user.save() return new_user async def authenticate(self, login_data: AppUserLoginSchema) -> Optional[AppUser]: """ 用户认证 """ user = await self.model.filter( phone=login_data.phone, is_active=True ).first() if not user: return None if not verify_password(login_data.password, user.password): return None return user async def get_user_by_id(self, user_id: int) -> Optional[AppUser]: """ 根据ID获取用户 """ return await self.model.filter(id=user_id, is_active=True).first() async def get_user_by_phone(self, phone: str) -> Optional[AppUser]: """ 根据手机号获取用户 """ return await self.model.filter(phone=phone, is_active=True).first() async def update_last_login(self, user_id: int) -> bool: """ 更新最后登录时间 """ user = await self.model.filter(id=user_id).first() if user: user.last_login = datetime.now() await user.save() return True return False async def update_user_info(self, user_id: int, update_data: AppUserUpdateSchema) -> Optional[AppUser]: """ 更新用户信息 """ user = await self.model.filter(id=user_id).first() if not user: return None # 更新字段 update_dict = update_data.model_dump(exclude_unset=True) if "nickname" in update_dict: update_dict["alias"] = update_dict.pop("nickname") update_dict.pop("avatar", None) for field, value in update_dict.items(): setattr(user, field, value) await user.save() return user async def update_quota(self, operator_id: int, operator_name: str, user_id: int, target_count: Optional[int] = None, delta: Optional[int] = None, op_type: str = "调整", remark: Optional[str] = None) -> Optional[AppUser]: user = await self.model.filter(id=user_id).first() if not user: return None before = int(getattr(user, "remaining_quota", 0) or 0) after = before if target_count is not None: after = max(0, int(target_count)) elif delta is not None: after = max(0, before + int(delta)) user.remaining_quota = after await user.save() await AppUserQuotaLog.create( app_user_id=user_id, operator_id=operator_id, operator_name=operator_name, before_count=before, after_count=after, op_type=op_type, remark=remark, ) return user async def change_password(self, user_id: int, old_password: str, new_password: str) -> bool: """ 修改密码 """ user = await self.model.filter(id=user_id).first() if not user: return False # 验证原密码 if not verify_password(old_password, user.password): return False # 更新密码 user.password = get_password_hash(new_password) await user.save() return True async def deactivate_user(self, user_id: int) -> bool: """ 停用用户 """ user = await self.model.filter(id=user_id).first() if user: user.is_active = False await user.save() return True return False # 创建控制器实例 app_user_controller = AppUserController()