guzhi/app/controllers/valuation.py
邹方成 552c02516a feat(发票): 支持多附件上传和邮件发送功能
refactor(用户管理): 优化用户列表查询和备注字段处理

feat(估值): 评估报告和证书URL改为数组类型并添加下载地址

docs: 添加交易管理与用户备注功能增强实施计划

fix(邮件): 修复邮件发送接口的多附件支持问题

style: 清理注释代码和格式化文件
2025-11-25 20:09:50 +08:00

309 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import List, Optional
from tortoise.expressions import Q
from tortoise.queryset import QuerySet
from tortoise.functions import Count
from app.models.valuation import ValuationAssessment, ValuationCalculationStep
from app.schemas.valuation import (
ValuationAssessmentCreate,
ValuationAssessmentUpdate,
ValuationAssessmentQuery,
ValuationAssessmentOut,
ValuationAssessmentList,
ValuationCalculationStepCreate,
ValuationCalculationStepOut
)
from app.models.user import AppUser
class ValuationController:
"""估值评估控制器"""
model = ValuationAssessment
step_model = ValuationCalculationStep
async def create_calculation_step(self, data: ValuationCalculationStepCreate) -> ValuationCalculationStepOut:
"""
创建估值计算步骤
Args:
data (ValuationCalculationStepCreate): 估值计算步骤数据
Returns:
ValuationCalculationStepOut: 创建的估值计算步骤
"""
step = await self.step_model.create(**data.model_dump())
logger.info(
"calcstep.create valuation_id={} order={} name={}",
data.valuation_id,
data.step_order,
data.step_name,
)
return ValuationCalculationStepOut.model_validate(step)
async def update_calculation_step(self, step_id: int, update: dict) -> ValuationCalculationStepOut:
step = await self.step_model.filter(id=step_id).first()
if not step:
raise ValueError(f"calculation_step not found: {step_id}")
await step.update_from_dict(update).save()
logger.info(
"calcstep.update id={} fields={}",
step_id,
list(update.keys()),
)
return ValuationCalculationStepOut.model_validate(step)
async def get_calculation_steps(self, valuation_id: int) -> List[ValuationCalculationStepOut]:
"""
根据估值ID获取所有相关的计算步骤。
此方法从数据库中检索与特定估值ID关联的所有计算步骤记录
并按创建时间升序排序,确保步骤的顺序正确。
Args:
valuation_id (int): 估值的唯一标识符。
Returns:
List[ValuationCalculationStepOut]: 一个包含所有相关计算步骤的列表,
如果找不到任何步骤,则返回空列表。
"""
steps = await self.step_model.filter(valuation_id=valuation_id).order_by('created_at')
logger.info("calcstep.list valuation_id={} count={}", valuation_id, len(steps))
return [ValuationCalculationStepOut.model_validate(step) for step in steps]
async def create(self, data: ValuationAssessmentCreate, user_id: int) -> ValuationAssessmentOut:
"""创建估值评估"""
# 将用户ID添加到数据中
create_data = data.model_dump()
create_data['user_id'] = user_id
valuation = await self.model.create(**create_data)
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def get_by_id(self, valuation_id: int) -> Optional[ValuationAssessmentOut]:
"""根据ID获取估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if valuation:
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
return None
async def update(self, valuation_id: int, data: ValuationAssessmentUpdate) -> Optional[ValuationAssessmentOut]:
"""更新估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
update_data = data.model_dump(exclude_unset=True)
if update_data:
if 'certificate_url' in update_data and update_data.get('certificate_url'):
from datetime import datetime
update_data['audited_at'] = datetime.now()
await valuation.update_from_dict(update_data)
await valuation.save()
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def delete(self, valuation_id: int) -> bool:
"""软删除估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return False
valuation.is_active = False
await valuation.save()
return True
async def get_list(self, query: ValuationAssessmentQuery) -> ValuationAssessmentList:
"""获取估值评估列表"""
queryset = self._build_query(query)
# 计算总数
total = await queryset.count()
# 分页查询
offset = (query.page - 1) * query.size
valuations = await queryset.offset(offset).limit(query.size).order_by('-created_at')
# 转换为输出模型
items = [ValuationAssessmentOut.model_validate(v) for v in valuations]
items = await self._attach_user_phone_bulk(items)
# 计算总页数
pages = (total + query.size - 1) // query.size
return ValuationAssessmentList(
items=items,
total=total,
page=query.page,
size=query.size,
pages=pages
)
def _build_query(self, query: ValuationAssessmentQuery) -> QuerySet:
"""构建查询条件"""
queryset = self.model.filter(is_active=True)
if query.asset_name:
queryset = queryset.filter(asset_name__icontains=query.asset_name)
if query.institution:
queryset = queryset.filter(institution__icontains=query.institution)
if query.industry:
queryset = queryset.filter(industry__icontains=query.industry)
if query.heritage_level:
queryset = queryset.filter(heritage_level__icontains=query.heritage_level)
if query.is_active is not None:
queryset = queryset.filter(is_active=query.is_active)
# 添加状态筛选
if hasattr(query, 'status') and query.status:
queryset = queryset.filter(status=query.status)
if getattr(query, 'phone', None):
queryset = queryset.filter(user__phone__icontains=query.phone)
def _parse_time(v: Optional[str]):
if not v:
return None
try:
iv = int(v)
from datetime import datetime
return datetime.fromtimestamp(iv / 1000)
except Exception:
try:
from datetime import datetime
return datetime.fromisoformat(v)
except Exception:
return None
s_dt = _parse_time(getattr(query, 'submitted_start', None))
e_dt = _parse_time(getattr(query, 'submitted_end', None))
if s_dt:
queryset = queryset.filter(created_at__gte=s_dt)
if e_dt:
queryset = queryset.filter(created_at__lte=e_dt)
a_s_dt = _parse_time(getattr(query, 'audited_start', None))
a_e_dt = _parse_time(getattr(query, 'audited_end', None))
if a_s_dt:
queryset = queryset.filter(updated_at__gte=a_s_dt)
if a_e_dt:
queryset = queryset.filter(updated_at__lte=a_e_dt)
return queryset
async def get_statistics(self) -> dict:
"""获取统计信息"""
total_count = await self.model.filter(is_active=True).count()
# 按行业统计
industry_stats = await self.model.filter(is_active=True).group_by('industry').annotate(count=Count('id')).values('industry', 'count')
# 按非遗等级统计
heritage_level_stats = await self.model.filter(
is_active=True,
heritage_level__isnull=False
).group_by('heritage_level').annotate(count=Count('id')).values('heritage_level', 'count')
return {
'total_count': total_count,
'industry_distribution': industry_stats,
'heritage_level_distribution': heritage_level_stats
}
async def search(self, keyword: str, page: int = 1, size: int = 10) -> ValuationAssessmentList:
"""全文搜索"""
queryset = self.model.filter(
Q(asset_name__icontains=keyword) |
Q(institution__icontains=keyword) |
Q(industry__icontains=keyword) |
Q(heritage_level__icontains=keyword),
is_active=True
)
# 计算总数
total = await queryset.count()
# 分页查询
offset = (page - 1) * size
valuations = await queryset.offset(offset).limit(size).order_by('-created_at')
# 转换为输出模型
items = [ValuationAssessmentOut.model_validate(v) for v in valuations]
items = await self._attach_user_phone_bulk(items)
# 计算总页数
pages = (total + size - 1) // size
return ValuationAssessmentList(
items=items,
total=total,
page=page,
size=size,
pages=pages
)
async def approve_valuation(self, valuation_id: int, admin_notes: Optional[str] = None) -> Optional[ValuationAssessmentOut]:
"""审核通过估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
from datetime import datetime
update_data = {"status": "approved", "audited_at": datetime.now()}
if admin_notes:
update_data["admin_notes"] = admin_notes
await valuation.update_from_dict(update_data).save()
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def reject_valuation(self, valuation_id: int, admin_notes: Optional[str] = None) -> Optional[ValuationAssessmentOut]:
"""审核拒绝估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
from datetime import datetime
update_data = {"status": "rejected", "audited_at": datetime.now()}
if admin_notes:
update_data["admin_notes"] = admin_notes
await valuation.update_from_dict(update_data).save()
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def update_admin_notes(self, valuation_id: int, admin_notes: str) -> Optional[ValuationAssessmentOut]:
"""更新管理员备注"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
await valuation.update_from_dict({"admin_notes": admin_notes}).save()
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def _attach_user_phone(self, out: ValuationAssessmentOut) -> ValuationAssessmentOut:
user = await AppUser.filter(id=out.user_id).first()
out.user_phone = getattr(user, "phone", None) if user else None
return out
async def _attach_user_phone_bulk(self, items: List[ValuationAssessmentOut]) -> List[ValuationAssessmentOut]:
ids = list({item.user_id for item in items if item.user_id})
if not ids:
return items
users = await AppUser.filter(id__in=ids).values("id", "phone")
phone_map = {u["id"]: u["phone"] for u in users}
for item in items:
item.user_phone = phone_map.get(item.user_id)
return items
# 创建控制器实例
valuation_controller = ValuationController()
from app.log import logger