guzhi/app/controllers/valuation.py

703 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from typing import Any, Dict, List, Optional
from tortoise.expressions import Q
from tortoise.queryset import QuerySet
from tortoise.functions import Count
from app.models.valuation import ValuationAssessment, ValuationCalculationStep
from app.schemas.valuation import (
ValuationAssessmentCreate,
ValuationAssessmentUpdate,
ValuationAssessmentQuery,
ValuationAssessmentOut,
ValuationAssessmentList,
ValuationCalculationStepCreate,
ValuationCalculationStepOut
)
from app.models.user import AppUser
from app.utils.calculation_engine.formula_registry import get_formula_meta
class ValuationController:
"""估值评估控制器"""
model = ValuationAssessment
step_model = ValuationCalculationStep
async def create_calculation_step(self, data: ValuationCalculationStepCreate) -> ValuationCalculationStepOut:
"""
创建估值计算步骤
Args:
data (ValuationCalculationStepCreate): 估值计算步骤数据
Returns:
ValuationCalculationStepOut: 创建的估值计算步骤
"""
step = await self.step_model.create(**data.model_dump())
logger.info(
"calcstep.create valuation_id={} order={} name={}",
data.valuation_id,
data.step_order,
data.step_name,
)
return ValuationCalculationStepOut.model_validate(step)
async def log_formula_step(
self,
valuation_id: int,
formula_code: str,
*,
status: str = "processing",
input_params: Optional[Dict[str, Any]] = None,
output_result: Optional[Dict[str, Any]] = None,
error_message: Optional[str] = None,
step_description: Optional[str] = None,
duration_ms: Optional[int] = None,
) -> ValuationCalculationStepOut:
"""
幂等记录(或更新)某个公式节点的计算过程。
"""
meta = get_formula_meta(formula_code)
description = step_description or meta.formula
create_payload: Dict[str, Any] = {
"valuation_id": valuation_id,
"formula_code": meta.code,
"formula_name": meta.name,
"formula_text": meta.formula,
"parent_formula_code": meta.parent_code,
"group_code": meta.group_code,
"step_order": meta.order,
"step_name": meta.name,
"step_description": description,
"status": status,
}
if input_params is not None:
create_payload["input_params"] = input_params
if output_result is not None:
create_payload["output_result"] = output_result
if error_message is not None:
create_payload["error_message"] = error_message
# 准备更新字段
update_fields: Dict[str, Any] = {
"status": status,
"step_description": description,
"formula_name": meta.name,
"formula_text": meta.formula,
"parent_formula_code": meta.parent_code,
"group_code": meta.group_code,
"step_order": meta.order,
"step_name": meta.name,
}
if input_params is not None:
update_fields["input_params"] = input_params
if output_result is not None:
update_fields["output_result"] = output_result
if error_message is not None:
update_fields["error_message"] = error_message
if duration_ms is not None:
result = update_fields.get("output_result") or {}
if not isinstance(result, dict):
result = {}
result["duration_ms"] = duration_ms
update_fields["output_result"] = result
# 先尝试查询是否存在(明确排除 formula_code 为 NULL 的情况)
step = await self.step_model.filter(
valuation_id=valuation_id,
formula_code=meta.code
).first()
# 如果没找到,再检查是否有 formula_code 为 NULL 的旧记录(不应该有,但为了安全)
if not step and meta.code:
# 检查是否有重复的旧记录formula_code 为 NULL
old_steps = await self.step_model.filter(
valuation_id=valuation_id,
formula_code__isnull=True
).all()
if old_steps:
logger.warning(
"calcstep.log_formula found old records with NULL formula_code: valuation_id={} count={}",
valuation_id,
len(old_steps),
)
logger.info(
"calcstep.log_formula query: valuation_id={} formula_code={} found={}",
valuation_id,
meta.code,
step is not None,
)
if step:
# 更新现有记录
await step.update_from_dict(update_fields).save()
logger.info(
"calcstep.log_formula updated valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
else:
# 尝试创建新记录
if duration_ms is not None:
result = create_payload.setdefault("output_result", {}) or {}
if not isinstance(result, dict):
result = {}
result["duration_ms"] = duration_ms
create_payload["output_result"] = result
try:
step = await self.step_model.create(**create_payload)
logger.info(
"calcstep.log_formula created valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
except Exception as e:
# 如果因为唯一约束冲突而失败(可能是并发插入),重新查询并更新
error_str = str(e).lower()
if "duplicate" in error_str or "unique" in error_str or "1062" in error_str:
logger.warning(
"calcstep.log_formula duplicate key detected, retrying query: {}",
str(e),
)
# 重新查询(可能已被其他请求插入)
step = await self.step_model.filter(
valuation_id=valuation_id,
formula_code=meta.code
).first()
if step:
# 更新刚插入的记录
await step.update_from_dict(update_fields).save()
logger.info(
"calcstep.log_formula updated after duplicate key: valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
else:
# 如果还是找不到,记录错误但继续
logger.error(
"calcstep.log_formula failed to find record after duplicate key error: valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
raise
else:
# 其他错误直接抛出
raise
return ValuationCalculationStepOut.model_validate(step)
async def update_calculation_step(self, step_id: int, update: dict) -> ValuationCalculationStepOut:
step = await self.step_model.filter(id=step_id).first()
if not step:
raise ValueError(f"calculation_step not found: {step_id}")
await step.update_from_dict(update).save()
logger.info(
"calcstep.update id={} fields={}",
step_id,
list(update.keys()),
)
return ValuationCalculationStepOut.model_validate(step)
async def get_calculation_steps(self, valuation_id: int) -> List[ValuationCalculationStepOut]:
"""
根据估值ID获取所有相关的计算步骤。
此方法从数据库中检索与特定估值ID关联的所有计算步骤记录
并按创建时间升序排序,确保步骤的顺序正确。
Args:
valuation_id (int): 估值的唯一标识符。
Returns:
List[ValuationCalculationStepOut]: 一个包含所有相关计算步骤的列表,
如果找不到任何步骤,则返回空列表。
"""
steps = await self.step_model.filter(valuation_id=valuation_id).order_by('created_at')
logger.info("calcstep.list valuation_id={} count={}", valuation_id, len(steps))
return [ValuationCalculationStepOut.model_validate(step) for step in steps]
async def get_calculation_report_markdown(self, valuation_id: int) -> str:
"""
根据估值ID生成计算过程的 Markdown 报告。
此方法会查询所有相关的计算步骤,按照公式的层级关系组织,
并生成格式化的 Markdown 文档,包含:
- 公式名称和说明
- 输入参数
- 输出结果
- 计算状态
- 错误信息(如果有)
Args:
valuation_id (int): 估值的唯一标识符。
Returns:
str: Markdown 格式的计算报告。
Raises:
ValueError: 如果找不到对应的估值记录。
"""
# 验证估值记录是否存在
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
raise ValueError(f"估值记录不存在: {valuation_id}")
# 获取所有计算步骤
steps = await self.step_model.filter(valuation_id=valuation_id).order_by('step_order')
if not steps:
return f"# 估值计算报告\n\n**估值ID**: {valuation_id}\n\n**资产名称**: {valuation.asset_name}\n\n> 暂无计算步骤记录。\n"
# 转换为字典列表,便于处理
steps_data = []
for step in steps:
step_dict = ValuationCalculationStepOut.model_validate(step).model_dump()
steps_data.append(step_dict)
# 构建公式树形结构
formula_tree = self._build_formula_tree(steps_data)
# 生成 Markdown
markdown = self._generate_markdown(valuation, formula_tree)
logger.info("calcstep.report_markdown generated valuation_id={} steps_count={}", valuation_id, len(steps_data))
return markdown
def _build_formula_tree(self, steps: List[Dict]) -> Dict:
"""
构建公式的树形结构。
Args:
steps: 计算步骤列表。
Returns:
Dict: 树形结构的字典key 为 formula_codevalue 为步骤数据和子节点。
"""
# 按 formula_code 索引
step_map = {}
for step in steps:
code = step.get('formula_code')
if code:
step_map[code] = step
# 构建树形结构
tree = {}
processed = set()
# 第一遍:创建所有节点
for step in steps:
code = step.get('formula_code')
if not code or code in processed:
continue
node = {
'step': step,
'children': []
}
tree[code] = node
processed.add(code)
# 第二遍:建立父子关系
root_nodes = []
for step in steps:
code = step.get('formula_code')
if not code:
continue
parent_code = step.get('parent_formula_code')
node = tree[code]
if parent_code and parent_code in tree:
# 有父节点,添加到父节点的 children
tree[parent_code]['children'].append(node)
else:
# 根节点
root_nodes.append(node)
# 按 step_order 排序
def sort_nodes(nodes):
nodes.sort(key=lambda n: float(n['step'].get('step_order', 0)))
for node in nodes:
if node['children']:
sort_nodes(node['children'])
sort_nodes(root_nodes)
return {'roots': root_nodes, 'all': tree}
def _generate_markdown(self, valuation, formula_tree: Dict) -> str:
"""
生成 Markdown 格式的报告。
Args:
valuation: 估值评估对象。
formula_tree: 公式树形结构。
Returns:
str: Markdown 格式的字符串。
"""
lines = []
# 标题和基本信息
lines.append("# 估值计算报告")
lines.append("")
lines.append("## 基本信息")
lines.append("")
lines.append("| 字段 | 值 |")
lines.append("|------|----|")
lines.append(f"| 估值ID | {valuation.id} |")
lines.append(f"| 资产名称 | {valuation.asset_name} |")
lines.append(f"| 所属机构 | {valuation.institution} |")
lines.append(f"| 所属行业 | {valuation.industry} |")
heritage_value = valuation.heritage_level if valuation.heritage_level else "-"
lines.append(f"| 非遗等级 | {heritage_value} |")
created_at_str = valuation.created_at.strftime("%Y-%m-%d %H:%M:%S") if valuation.created_at else "N/A"
lines.append(f"| 创建时间 | {created_at_str} |")
lines.append("")
# 计算结果摘要
if valuation.final_value_ab is not None:
lines.append("## 计算结果摘要")
lines.append("")
lines.append("| 项目 | 数值(万元) |")
lines.append("|------|-------------|")
if valuation.model_value_b is not None:
lines.append(f"| 模型估值B | {valuation.model_value_b:.2f} |")
if valuation.market_value_c is not None:
lines.append(f"| 市场估值C | {valuation.market_value_c:.2f} |")
lines.append(f"| **最终估值AB** | **{valuation.final_value_ab:.2f}** |")
if valuation.dynamic_pledge_rate is not None:
lines.append(f"| 动态质押率 | {valuation.dynamic_pledge_rate:.4f} |")
lines.append("")
# 详细计算过程
lines.append("## 详细计算过程")
lines.append("")
def _format_json_block(value: Any, indent_prefix: str = "") -> List[str]:
"""格式化 JSON 代码块"""
json_text = json.dumps(value, ensure_ascii=False, indent=2)
block_lines = [f"{indent_prefix}```json"]
for line in json_text.splitlines():
block_lines.append(f"{indent_prefix}{line}")
block_lines.append(f"{indent_prefix}```")
return block_lines
# 递归生成公式树
heading_levels = ["####", "#####", "######", "######", "######"]
def render_node(node: Dict, level: int = 0, prefix: str = ""):
step = node['step']
heading = heading_levels[min(level, len(heading_levels) - 1)]
name = step.get('formula_name', step.get('step_name', '未知'))
formula_text = step.get('formula_text', step.get('step_description', ''))
status = step.get('status', 'unknown')
input_params = step.get('input_params')
output_result = step.get('output_result')
error_message = step.get('error_message')
duration_ms = None
if output_result and isinstance(output_result, dict):
duration_ms = output_result.get('duration_ms')
lines.append(f"{heading} {prefix}{name}")
lines.append("")
# 公式说明
if formula_text:
if level == 0:
lines.append(f"> {formula_text}")
lines.append("")
else:
lines.append("计算公式:")
lines.append(f"`{formula_text}`")
lines.append("")
# 状态和耗时
status_label = {
'processing': '计算中',
'completed': '已完成',
'failed': '计算失败'
}.get(status, status)
lines.append(f"**状态**: {status_label}")
if duration_ms is not None:
lines.append(f"**耗时**: {duration_ms}ms")
lines.append("")
# 输入参数
if input_params:
lines.append("**输入参数**:")
lines.extend(_format_json_block(input_params, ""))
lines.append("")
# 输出结果
if output_result:
# 移除 duration_ms因为已经在状态中显示了
result_display = {k: v for k, v in output_result.items() if k != 'duration_ms'}
if result_display:
lines.append("**输出结果**:")
lines.extend(_format_json_block(result_display, ""))
lines.append("")
# 错误信息
if error_message:
lines.append(f"> ⚠️ **错误**: {error_message}")
lines.append("")
# 子节点
children = node.get('children', [])
if children:
for idx, child in enumerate(children, start=1):
lines.append("")
child_prefix = f"{prefix}{idx}."
render_node(child, level + 1, child_prefix)
lines.append("")
# 渲染所有根节点
for idx, root in enumerate(formula_tree['roots'], start=1):
prefix = f"{idx}."
render_node(root, 0, prefix)
return "\n".join(lines)
async def create(self, data: ValuationAssessmentCreate, user_id: int) -> ValuationAssessmentOut:
"""创建估值评估"""
# 将用户ID添加到数据中
create_data = data.model_dump()
create_data['user_id'] = user_id
valuation = await self.model.create(**create_data)
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def get_by_id(self, valuation_id: int) -> Optional[ValuationAssessmentOut]:
"""根据ID获取估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if valuation:
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
return None
async def update(self, valuation_id: int, data: ValuationAssessmentUpdate) -> Optional[ValuationAssessmentOut]:
"""更新估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
update_data = data.model_dump(exclude_unset=True)
if update_data:
if 'certificate_url' in update_data and update_data.get('certificate_url'):
from datetime import datetime
update_data['audited_at'] = datetime.now()
await valuation.update_from_dict(update_data)
await valuation.save()
from datetime import datetime
valuation.status = update_data.get("status", "pending")
if not getattr(valuation, "audited_at", None):
valuation.audited_at = datetime.now()
await valuation.save()
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def delete(self, valuation_id: int) -> bool:
"""软删除估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return False
valuation.is_active = False
await valuation.save()
return True
async def get_list(self, query: ValuationAssessmentQuery) -> ValuationAssessmentList:
"""获取估值评估列表"""
queryset = self._build_query(query)
# 计算总数
total = await queryset.count()
# 分页查询
offset = (query.page - 1) * query.size
valuations = await queryset.offset(offset).limit(query.size).order_by('-created_at')
# 转换为输出模型
items = [ValuationAssessmentOut.model_validate(v) for v in valuations]
items = await self._attach_user_phone_bulk(items)
# 计算总页数
pages = (total + query.size - 1) // query.size
return ValuationAssessmentList(
items=items,
total=total,
page=query.page,
size=query.size,
pages=pages
)
def _build_query(self, query: ValuationAssessmentQuery) -> QuerySet:
"""构建查询条件"""
queryset = self.model.filter(is_active=True)
if query.asset_name:
queryset = queryset.filter(asset_name__icontains=query.asset_name)
if query.institution:
queryset = queryset.filter(institution__icontains=query.institution)
if query.industry:
queryset = queryset.filter(industry__icontains=query.industry)
if query.heritage_level:
queryset = queryset.filter(heritage_level__icontains=query.heritage_level)
if query.is_active is not None:
queryset = queryset.filter(is_active=query.is_active)
# 添加状态筛选
if hasattr(query, 'status') and query.status:
queryset = queryset.filter(status=query.status)
if getattr(query, 'phone', None):
queryset = queryset.filter(user__phone__icontains=query.phone)
def _parse_time(v: Optional[str]):
if not v:
return None
try:
iv = int(v)
from datetime import datetime
return datetime.fromtimestamp(iv / 1000)
except Exception:
try:
from datetime import datetime
return datetime.fromisoformat(v)
except Exception:
return None
s_dt = _parse_time(getattr(query, 'submitted_start', None))
e_dt = _parse_time(getattr(query, 'submitted_end', None))
if s_dt:
queryset = queryset.filter(created_at__gte=s_dt)
if e_dt:
queryset = queryset.filter(created_at__lte=e_dt)
a_s_dt = _parse_time(getattr(query, 'audited_start', None))
a_e_dt = _parse_time(getattr(query, 'audited_end', None))
if a_s_dt:
queryset = queryset.filter(updated_at__gte=a_s_dt)
if a_e_dt:
queryset = queryset.filter(updated_at__lte=a_e_dt)
return queryset
async def get_statistics(self) -> dict:
"""获取统计信息"""
total_count = await self.model.filter(is_active=True).count()
# 按行业统计
industry_stats = await self.model.filter(is_active=True).group_by('industry').annotate(count=Count('id')).values('industry', 'count')
# 按非遗等级统计
heritage_level_stats = await self.model.filter(
is_active=True,
heritage_level__isnull=False
).group_by('heritage_level').annotate(count=Count('id')).values('heritage_level', 'count')
return {
'total_count': total_count,
'industry_distribution': industry_stats,
'heritage_level_distribution': heritage_level_stats
}
async def search(self, keyword: str, page: int = 1, size: int = 10) -> ValuationAssessmentList:
"""全文搜索"""
queryset = self.model.filter(
Q(asset_name__icontains=keyword) |
Q(institution__icontains=keyword) |
Q(industry__icontains=keyword) |
Q(heritage_level__icontains=keyword),
is_active=True
)
# 计算总数
total = await queryset.count()
# 分页查询
offset = (page - 1) * size
valuations = await queryset.offset(offset).limit(size).order_by('-created_at')
# 转换为输出模型
items = [ValuationAssessmentOut.model_validate(v) for v in valuations]
items = await self._attach_user_phone_bulk(items)
# 计算总页数
pages = (total + size - 1) // size
return ValuationAssessmentList(
items=items,
total=total,
page=page,
size=size,
pages=pages
)
async def approve_valuation(self, valuation_id: int, admin_notes: Optional[str] = None) -> Optional[ValuationAssessmentOut]:
"""审核通过估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
from datetime import datetime
update_data = {"status": "pending", "audited_at": datetime.now()}
if admin_notes:
update_data["admin_notes"] = admin_notes
await valuation.update_from_dict(update_data).save()
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def reject_valuation(self, valuation_id: int, admin_notes: Optional[str] = None) -> Optional[ValuationAssessmentOut]:
"""审核拒绝估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
from datetime import datetime
update_data = {"status": "rejected", "audited_at": datetime.now()}
if admin_notes:
update_data["admin_notes"] = admin_notes
await valuation.update_from_dict(update_data).save()
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def update_admin_notes(self, valuation_id: int, admin_notes: str) -> Optional[ValuationAssessmentOut]:
"""更新管理员备注"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
await valuation.update_from_dict({"admin_notes": admin_notes}).save()
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def _attach_user_phone(self, out: ValuationAssessmentOut) -> ValuationAssessmentOut:
user = await AppUser.filter(id=out.user_id).first()
out.user_phone = getattr(user, "phone", None) if user else None
return out
async def _attach_user_phone_bulk(self, items: List[ValuationAssessmentOut]) -> List[ValuationAssessmentOut]:
ids = list({item.user_id for item in items if item.user_id})
if not ids:
return items
users = await AppUser.filter(id__in=ids).values("id", "phone")
phone_map = {u["id"]: u["phone"] for u in users}
for item in items:
item.user_phone = phone_map.get(item.user_id)
return items
# 创建控制器实例
valuation_controller = ValuationController()
from app.log import logger