guzhi/app/controllers/app_user.py
邹方成 552c02516a feat(发票): 支持多附件上传和邮件发送功能
refactor(用户管理): 优化用户列表查询和备注字段处理

feat(估值): 评估报告和证书URL改为数组类型并添加下载地址

docs: 添加交易管理与用户备注功能增强实施计划

fix(邮件): 修复邮件发送接口的多附件支持问题

style: 清理注释代码和格式化文件
2025-11-25 20:09:50 +08:00

155 lines
5.0 KiB
Python

from app.models.user import AppUser
from app.models.user import AppUserQuotaLog
from app.schemas.app_user import AppUserRegisterSchema, AppUserLoginSchema, AppUserUpdateSchema
from app.utils.password import get_password_hash, verify_password
from app.core.crud import CRUDBase
from fastapi.exceptions import HTTPException
from datetime import datetime
from typing import Optional
class AppUserController(CRUDBase[AppUser, AppUserRegisterSchema, AppUserUpdateSchema]):
"""AppUser控制器"""
def __init__(self):
super().__init__(model=AppUser)
async def register(self, register_data: AppUserRegisterSchema) -> AppUser:
"""
用户注册 - 只需要手机号,默认使用手机号后六位作为密码
"""
# 检查手机号是否已存在
existing_user = await self.model.filter(phone=register_data.phone).first()
if existing_user:
raise HTTPException(status_code=400, detail="手机号已存在")
# 生成默认密码:手机号后六位
default_password = register_data.phone[-6:]
hashed_password = get_password_hash(default_password)
# 创建新用户
new_user = self.model(
phone=register_data.phone,
password=hashed_password,
is_active=True
)
await new_user.save()
return new_user
async def authenticate(self, login_data: AppUserLoginSchema) -> Optional[AppUser]:
"""
用户认证
"""
user = await self.model.filter(
phone=login_data.phone, is_active=True
).first()
if not user:
return None
if not verify_password(login_data.password, user.password):
return None
return user
async def get_user_by_id(self, user_id: int) -> Optional[AppUser]:
"""
根据ID获取用户
"""
return await self.model.filter(id=user_id, is_active=True).first()
async def get_user_by_phone(self, phone: str) -> Optional[AppUser]:
"""
根据手机号获取用户
"""
return await self.model.filter(phone=phone, is_active=True).first()
async def update_last_login(self, user_id: int) -> bool:
"""
更新最后登录时间
"""
user = await self.model.filter(id=user_id).first()
if user:
user.last_login = datetime.now()
await user.save()
return True
return False
async def update_user_info(self, user_id: int, update_data: AppUserUpdateSchema) -> Optional[AppUser]:
"""
更新用户信息
"""
user = await self.model.filter(id=user_id).first()
if not user:
return None
# 更新字段
update_dict = update_data.model_dump(exclude_unset=True)
if "nickname" in update_dict:
update_dict["alias"] = update_dict.pop("nickname")
update_dict.pop("avatar", None)
for field, value in update_dict.items():
setattr(user, field, value)
await user.save()
return user
async def update_quota(self, operator_id: int, operator_name: str, user_id: int, target_count: Optional[int] = None, delta: Optional[int] = None, op_type: str = "调整", remark: Optional[str] = None) -> Optional[AppUser]:
user = await self.model.filter(id=user_id).first()
if not user:
return None
before = int(getattr(user, "remaining_quota", 0) or 0)
after = before
if target_count is not None:
after = max(0, int(target_count))
elif delta is not None:
after = max(0, before + int(delta))
user.remaining_quota = after
await user.save()
await AppUserQuotaLog.create(
app_user_id=user_id,
operator_id=operator_id,
operator_name=operator_name,
before_count=before,
after_count=after,
op_type=op_type,
remark=remark,
)
# if remark is not None:
# user.notes = remark
# await user.save()
return user
async def change_password(self, user_id: int, old_password: str, new_password: str) -> bool:
"""
修改密码
"""
user = await self.model.filter(id=user_id).first()
if not user:
return False
# 验证原密码
if not verify_password(old_password, user.password):
return False
# 更新密码
user.password = get_password_hash(new_password)
await user.save()
return True
async def deactivate_user(self, user_id: int) -> bool:
"""
停用用户
"""
user = await self.model.filter(id=user_id).first()
if user:
user.is_active = False
await user.save()
return True
return False
# 创建控制器实例
app_user_controller = AppUserController()