guzhi/app/controllers/valuation.py
邹方成 3aa806ea6b feat(valuation): 添加datetime序列化支持并优化统计查询
为valuation模型添加json_encoders以支持datetime序列化
使用annotate替代原始SQL进行统计查询计数
在API响应中使用model_dump_json确保datetime正确序列化
2025-10-02 12:25:19 +08:00

193 lines
7.0 KiB
Python

from typing import List, Optional
from tortoise.expressions import Q
from tortoise.queryset import QuerySet
from tortoise.functions import Count
from app.models.valuation import ValuationAssessment
from app.schemas.valuation import (
ValuationAssessmentCreate,
ValuationAssessmentUpdate,
ValuationAssessmentQuery,
ValuationAssessmentOut,
ValuationAssessmentList
)
class ValuationController:
"""估值评估控制器"""
model = ValuationAssessment
async def create(self, data: ValuationAssessmentCreate) -> ValuationAssessmentOut:
"""创建估值评估"""
valuation = await self.model.create(**data.model_dump())
return ValuationAssessmentOut.model_validate(valuation)
async def get_by_id(self, valuation_id: int) -> Optional[ValuationAssessmentOut]:
"""根据ID获取估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if valuation:
return ValuationAssessmentOut.model_validate(valuation)
return None
async def update(self, valuation_id: int, data: ValuationAssessmentUpdate) -> Optional[ValuationAssessmentOut]:
"""更新估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
update_data = data.model_dump(exclude_unset=True)
if update_data:
await valuation.update_from_dict(update_data)
await valuation.save()
return ValuationAssessmentOut.model_validate(valuation)
async def delete(self, valuation_id: int) -> bool:
"""软删除估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return False
valuation.is_active = False
await valuation.save()
return True
async def get_list(self, query: ValuationAssessmentQuery) -> ValuationAssessmentList:
"""获取估值评估列表"""
queryset = self._build_query(query)
# 计算总数
total = await queryset.count()
# 分页查询
offset = (query.page - 1) * query.size
valuations = await queryset.offset(offset).limit(query.size).order_by('-created_at')
# 转换为输出模型
items = [ValuationAssessmentOut.model_validate(v) for v in valuations]
# 计算总页数
pages = (total + query.size - 1) // query.size
return ValuationAssessmentList(
items=items,
total=total,
page=query.page,
size=query.size,
pages=pages
)
def _build_query(self, query: ValuationAssessmentQuery) -> QuerySet:
"""构建查询条件"""
queryset = self.model.filter(is_active=True)
if query.asset_name:
queryset = queryset.filter(asset_name__icontains=query.asset_name)
if query.institution:
queryset = queryset.filter(institution__icontains=query.institution)
if query.industry:
queryset = queryset.filter(industry__icontains=query.industry)
if query.heritage_level:
queryset = queryset.filter(heritage_level__icontains=query.heritage_level)
if query.is_active is not None:
queryset = queryset.filter(is_active=query.is_active)
# 添加状态筛选
if hasattr(query, 'status') and query.status:
queryset = queryset.filter(status=query.status)
return queryset
async def get_statistics(self) -> dict:
"""获取统计信息"""
total_count = await self.model.filter(is_active=True).count()
# 按行业统计
industry_stats = await self.model.filter(is_active=True).group_by('industry').annotate(count=Count('id')).values('industry', 'count')
# 按非遗等级统计
heritage_level_stats = await self.model.filter(
is_active=True,
heritage_level__isnull=False
).group_by('heritage_level').annotate(count=Count('id')).values('heritage_level', 'count')
return {
'total_count': total_count,
'industry_distribution': industry_stats,
'heritage_level_distribution': heritage_level_stats
}
async def search(self, keyword: str, page: int = 1, size: int = 10) -> ValuationAssessmentList:
"""全文搜索"""
queryset = self.model.filter(
Q(asset_name__icontains=keyword) |
Q(institution__icontains=keyword) |
Q(industry__icontains=keyword) |
Q(heritage_level__icontains=keyword),
is_active=True
)
# 计算总数
total = await queryset.count()
# 分页查询
offset = (page - 1) * size
valuations = await queryset.offset(offset).limit(size).order_by('-created_at')
# 转换为输出模型
items = [ValuationAssessmentOut.model_validate(v) for v in valuations]
# 计算总页数
pages = (total + size - 1) // size
return ValuationAssessmentList(
items=items,
total=total,
page=page,
size=size,
pages=pages
)
async def approve_valuation(self, valuation_id: int, admin_notes: Optional[str] = None) -> Optional[ValuationAssessmentOut]:
"""审核通过估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
update_data = {"status": "approved"}
if admin_notes:
update_data["admin_notes"] = admin_notes
await valuation.update_from_dict(update_data).save()
return ValuationAssessmentOut.model_validate(valuation)
async def reject_valuation(self, valuation_id: int, admin_notes: Optional[str] = None) -> Optional[ValuationAssessmentOut]:
"""审核拒绝估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
update_data = {"status": "rejected"}
if admin_notes:
update_data["admin_notes"] = admin_notes
await valuation.update_from_dict(update_data).save()
return ValuationAssessmentOut.model_validate(valuation)
async def update_admin_notes(self, valuation_id: int, admin_notes: str) -> Optional[ValuationAssessmentOut]:
"""更新管理员备注"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
await valuation.update_from_dict({"admin_notes": admin_notes}).save()
return ValuationAssessmentOut.model_validate(valuation)
# 创建控制器实例
valuation_controller = ValuationController()