guzhi/app/controllers/app_user.py
邹方成 cc352d3184 feat: 重构后端服务并添加新功能
refactor: 优化API路由和响应模型
feat(admin): 添加App用户管理接口
feat(sms): 实现阿里云短信服务集成
feat(email): 添加SMTP邮件发送功能
feat(upload): 支持文件上传接口
feat(rate-limiter): 实现手机号限流器
fix: 修复计算步骤入库问题
docs: 更新API文档和测试计划
chore: 更新依赖和配置
2025-11-19 19:36:03 +08:00

148 lines
4.8 KiB
Python

from app.models.user import AppUser
from app.models.user import AppUserQuotaLog
from app.schemas.app_user import AppUserRegisterSchema, AppUserLoginSchema, AppUserUpdateSchema
from app.utils.password import get_password_hash, verify_password
from app.core.crud import CRUDBase
from fastapi.exceptions import HTTPException
from datetime import datetime
from typing import Optional
class AppUserController(CRUDBase[AppUser, AppUserRegisterSchema, AppUserUpdateSchema]):
"""AppUser控制器"""
def __init__(self):
super().__init__(model=AppUser)
async def register(self, register_data: AppUserRegisterSchema) -> AppUser:
"""
用户注册 - 只需要手机号,默认使用手机号后六位作为密码
"""
# 检查手机号是否已存在
existing_user = await self.model.filter(phone=register_data.phone).first()
if existing_user:
raise HTTPException(status_code=400, detail="手机号已存在")
# 生成默认密码:手机号后六位
default_password = register_data.phone[-6:]
hashed_password = get_password_hash(default_password)
# 创建新用户
new_user = self.model(
phone=register_data.phone,
password=hashed_password,
is_active=True
)
await new_user.save()
return new_user
async def authenticate(self, login_data: AppUserLoginSchema) -> Optional[AppUser]:
"""
用户认证
"""
user = await self.model.filter(
phone=login_data.phone, is_active=True
).first()
if not user:
return None
if not verify_password(login_data.password, user.password):
return None
return user
async def get_user_by_id(self, user_id: int) -> Optional[AppUser]:
"""
根据ID获取用户
"""
return await self.model.filter(id=user_id, is_active=True).first()
async def get_user_by_phone(self, phone: str) -> Optional[AppUser]:
"""
根据手机号获取用户
"""
return await self.model.filter(phone=phone, is_active=True).first()
async def update_last_login(self, user_id: int) -> bool:
"""
更新最后登录时间
"""
user = await self.model.filter(id=user_id).first()
if user:
user.last_login = datetime.now()
await user.save()
return True
return False
async def update_user_info(self, user_id: int, update_data: AppUserUpdateSchema) -> Optional[AppUser]:
"""
更新用户信息
"""
user = await self.model.filter(id=user_id).first()
if not user:
return None
# 更新字段
update_dict = update_data.model_dump(exclude_unset=True)
for field, value in update_dict.items():
setattr(user, field, value)
await user.save()
return user
async def update_quota(self, operator_id: int, operator_name: str, user_id: int, target_count: Optional[int] = None, delta: Optional[int] = None, op_type: str = "调整", remark: Optional[str] = None) -> Optional[AppUser]:
user = await self.model.filter(id=user_id).first()
if not user:
return None
before = int(getattr(user, "remaining_quota", 0) or 0)
after = before
if target_count is not None:
after = max(0, int(target_count))
elif delta is not None:
after = max(0, before + int(delta))
user.remaining_quota = after
await user.save()
await AppUserQuotaLog.create(
app_user_id=user_id,
operator_id=operator_id,
operator_name=operator_name,
before_count=before,
after_count=after,
op_type=op_type,
remark=remark,
)
return user
async def change_password(self, user_id: int, old_password: str, new_password: str) -> bool:
"""
修改密码
"""
user = await self.model.filter(id=user_id).first()
if not user:
return False
# 验证原密码
if not verify_password(old_password, user.password):
return False
# 更新密码
user.password = get_password_hash(new_password)
await user.save()
return True
async def deactivate_user(self, user_id: int) -> bool:
"""
停用用户
"""
user = await self.model.filter(id=user_id).first()
if user:
user.is_active = False
await user.save()
return True
return False
# 创建控制器实例
app_user_controller = AppUserController()