import json from typing import Any, Dict, List, Optional from tortoise.expressions import Q from tortoise.queryset import QuerySet from tortoise.functions import Count from app.models.valuation import ValuationAssessment, ValuationCalculationStep from app.schemas.valuation import ( ValuationAssessmentCreate, ValuationAssessmentUpdate, ValuationAssessmentQuery, ValuationAssessmentOut, ValuationAssessmentList, ValuationCalculationStepCreate, ValuationCalculationStepOut ) from app.models.user import AppUser from app.utils.calculation_engine.formula_registry import get_formula_meta class ValuationController: """估值评估控制器""" model = ValuationAssessment step_model = ValuationCalculationStep async def create_calculation_step(self, data: ValuationCalculationStepCreate) -> ValuationCalculationStepOut: """ 创建估值计算步骤 Args: data (ValuationCalculationStepCreate): 估值计算步骤数据 Returns: ValuationCalculationStepOut: 创建的估值计算步骤 """ step = await self.step_model.create(**data.model_dump()) logger.info( "calcstep.create valuation_id={} order={} name={}", data.valuation_id, data.step_order, data.step_name, ) return ValuationCalculationStepOut.model_validate(step) async def log_formula_step( self, valuation_id: int, formula_code: str, *, status: str = "processing", input_params: Optional[Dict[str, Any]] = None, output_result: Optional[Dict[str, Any]] = None, error_message: Optional[str] = None, step_description: Optional[str] = None, duration_ms: Optional[int] = None, ) -> ValuationCalculationStepOut: """ 幂等记录(或更新)某个公式节点的计算过程。 """ meta = get_formula_meta(formula_code) description = step_description or meta.formula create_payload: Dict[str, Any] = { "valuation_id": valuation_id, "formula_code": meta.code, "formula_name": meta.name, "formula_text": meta.formula, "parent_formula_code": meta.parent_code, "group_code": meta.group_code, "step_order": meta.order, "step_name": meta.name, "step_description": description, "status": status, } if input_params is not None: create_payload["input_params"] = input_params if output_result is not None: create_payload["output_result"] = output_result if error_message is not None: create_payload["error_message"] = error_message # 准备更新字段 update_fields: Dict[str, Any] = { "status": status, "step_description": description, "formula_name": meta.name, "formula_text": meta.formula, "parent_formula_code": meta.parent_code, "group_code": meta.group_code, "step_order": meta.order, "step_name": meta.name, } if input_params is not None: update_fields["input_params"] = input_params if output_result is not None: update_fields["output_result"] = output_result if error_message is not None: update_fields["error_message"] = error_message if duration_ms is not None: result = update_fields.get("output_result") or {} if not isinstance(result, dict): result = {} result["duration_ms"] = duration_ms update_fields["output_result"] = result # 先尝试查询是否存在(明确排除 formula_code 为 NULL 的情况) step = await self.step_model.filter( valuation_id=valuation_id, formula_code=meta.code ).first() # 如果没找到,再检查是否有 formula_code 为 NULL 的旧记录(不应该有,但为了安全) if not step and meta.code: # 检查是否有重复的旧记录(formula_code 为 NULL) old_steps = await self.step_model.filter( valuation_id=valuation_id, formula_code__isnull=True ).all() if old_steps: logger.warning( "calcstep.log_formula found old records with NULL formula_code: valuation_id={} count={}", valuation_id, len(old_steps), ) logger.info( "calcstep.log_formula query: valuation_id={} formula_code={} found={}", valuation_id, meta.code, step is not None, ) if step: # 更新现有记录 await step.update_from_dict(update_fields).save() logger.info( "calcstep.log_formula updated valuation_id={} formula_code={}", valuation_id, meta.code, ) else: # 尝试创建新记录 if duration_ms is not None: result = create_payload.setdefault("output_result", {}) or {} if not isinstance(result, dict): result = {} result["duration_ms"] = duration_ms create_payload["output_result"] = result try: step = await self.step_model.create(**create_payload) logger.info( "calcstep.log_formula created valuation_id={} formula_code={}", valuation_id, meta.code, ) except Exception as e: # 如果因为唯一约束冲突而失败(可能是并发插入),重新查询并更新 error_str = str(e).lower() if "duplicate" in error_str or "unique" in error_str or "1062" in error_str: logger.warning( "calcstep.log_formula duplicate key detected, retrying query: {}", str(e), ) # 重新查询(可能已被其他请求插入) step = await self.step_model.filter( valuation_id=valuation_id, formula_code=meta.code ).first() if step: # 更新刚插入的记录 await step.update_from_dict(update_fields).save() logger.info( "calcstep.log_formula updated after duplicate key: valuation_id={} formula_code={}", valuation_id, meta.code, ) else: # 如果还是找不到,记录错误但继续 logger.error( "calcstep.log_formula failed to find record after duplicate key error: valuation_id={} formula_code={}", valuation_id, meta.code, ) raise else: # 其他错误直接抛出 raise return ValuationCalculationStepOut.model_validate(step) async def update_calculation_step(self, step_id: int, update: dict) -> ValuationCalculationStepOut: step = await self.step_model.filter(id=step_id).first() if not step: raise ValueError(f"calculation_step not found: {step_id}") await step.update_from_dict(update).save() logger.info( "calcstep.update id={} fields={}", step_id, list(update.keys()), ) return ValuationCalculationStepOut.model_validate(step) async def get_calculation_steps(self, valuation_id: int) -> List[ValuationCalculationStepOut]: """ 根据估值ID获取所有相关的计算步骤。 此方法从数据库中检索与特定估值ID关联的所有计算步骤记录, 并按创建时间升序排序,确保步骤的顺序正确。 Args: valuation_id (int): 估值的唯一标识符。 Returns: List[ValuationCalculationStepOut]: 一个包含所有相关计算步骤的列表, 如果找不到任何步骤,则返回空列表。 """ steps = await self.step_model.filter(valuation_id=valuation_id).order_by('created_at') logger.info("calcstep.list valuation_id={} count={}", valuation_id, len(steps)) return [ValuationCalculationStepOut.model_validate(step) for step in steps] async def get_calculation_report_markdown(self, valuation_id: int) -> str: """ 根据估值ID生成计算过程的 Markdown 报告。 此方法会查询所有相关的计算步骤,按照公式的层级关系组织, 并生成格式化的 Markdown 文档,包含: - 公式名称和说明 - 输入参数 - 输出结果 - 计算状态 - 错误信息(如果有) Args: valuation_id (int): 估值的唯一标识符。 Returns: str: Markdown 格式的计算报告。 Raises: ValueError: 如果找不到对应的估值记录。 """ # 验证估值记录是否存在 valuation = await self.model.filter(id=valuation_id, is_active=True).first() if not valuation: raise ValueError(f"估值记录不存在: {valuation_id}") # 获取所有计算步骤 steps = await self.step_model.filter(valuation_id=valuation_id).order_by('step_order') if not steps: return f"# 估值计算报告\n\n**估值ID**: {valuation_id}\n\n**资产名称**: {valuation.asset_name}\n\n> 暂无计算步骤记录。\n" # 转换为字典列表,便于处理 steps_data = [] for step in steps: step_dict = ValuationCalculationStepOut.model_validate(step).model_dump() steps_data.append(step_dict) # 构建公式树形结构 formula_tree = self._build_formula_tree(steps_data) # 生成 Markdown markdown = self._generate_markdown(valuation, formula_tree) logger.info("calcstep.report_markdown generated valuation_id={} steps_count={}", valuation_id, len(steps_data)) return markdown def _build_formula_tree(self, steps: List[Dict]) -> Dict: """ 构建公式的树形结构。 Args: steps: 计算步骤列表。 Returns: Dict: 树形结构的字典,key 为 formula_code,value 为步骤数据和子节点。 """ # 按 formula_code 索引 step_map = {} for step in steps: code = step.get('formula_code') if code: step_map[code] = step # 构建树形结构 tree = {} processed = set() # 第一遍:创建所有节点 for step in steps: code = step.get('formula_code') if not code or code in processed: continue node = { 'step': step, 'children': [] } tree[code] = node processed.add(code) # 第二遍:建立父子关系 root_nodes = [] for step in steps: code = step.get('formula_code') if not code: continue parent_code = step.get('parent_formula_code') node = tree[code] if parent_code and parent_code in tree: # 有父节点,添加到父节点的 children tree[parent_code]['children'].append(node) else: # 根节点 root_nodes.append(node) # 按 step_order 排序 def sort_nodes(nodes): nodes.sort(key=lambda n: float(n['step'].get('step_order', 0))) for node in nodes: if node['children']: sort_nodes(node['children']) sort_nodes(root_nodes) return {'roots': root_nodes, 'all': tree} def _generate_markdown(self, valuation, formula_tree: Dict) -> str: """ 生成 Markdown 格式的报告。 Args: valuation: 估值评估对象。 formula_tree: 公式树形结构。 Returns: str: Markdown 格式的字符串。 """ lines = [] # 标题和基本信息 lines.append("# 估值计算报告") lines.append("") lines.append("## 基本信息") lines.append("") lines.append("| 字段 | 值 |") lines.append("|------|----|") lines.append(f"| 估值ID | {valuation.id} |") lines.append(f"| 资产名称 | {valuation.asset_name} |") lines.append(f"| 所属机构 | {valuation.institution} |") lines.append(f"| 所属行业 | {valuation.industry} |") heritage_value = valuation.heritage_level if valuation.heritage_level else "-" lines.append(f"| 非遗等级 | {heritage_value} |") created_at_str = valuation.created_at.strftime("%Y-%m-%d %H:%M:%S") if valuation.created_at else "N/A" lines.append(f"| 创建时间 | {created_at_str} |") lines.append("") # 计算结果摘要 if valuation.final_value_ab is not None: lines.append("## 计算结果摘要") lines.append("") lines.append("| 项目 | 数值(万元) |") lines.append("|------|-------------|") if valuation.model_value_b is not None: lines.append(f"| 模型估值B | {valuation.model_value_b:.2f} |") if valuation.market_value_c is not None: lines.append(f"| 市场估值C | {valuation.market_value_c:.2f} |") lines.append(f"| **最终估值AB** | **{valuation.final_value_ab:.2f}** |") if valuation.dynamic_pledge_rate is not None: lines.append(f"| 动态质押率 | {valuation.dynamic_pledge_rate:.4f} |") lines.append("") # 详细计算过程 lines.append("## 详细计算过程") lines.append("") def _format_json_block(value: Any, indent_prefix: str = "") -> List[str]: """格式化 JSON 代码块""" json_text = json.dumps(value, ensure_ascii=False, indent=2) block_lines = [f"{indent_prefix}```json"] for line in json_text.splitlines(): block_lines.append(f"{indent_prefix}{line}") block_lines.append(f"{indent_prefix}```") return block_lines # 递归生成公式树 heading_levels = ["####", "#####", "######", "######", "######"] def render_node(node: Dict, level: int = 0, prefix: str = ""): step = node['step'] heading = heading_levels[min(level, len(heading_levels) - 1)] name = step.get('formula_name', step.get('step_name', '未知')) formula_text = step.get('formula_text', step.get('step_description', '')) status = step.get('status', 'unknown') input_params = step.get('input_params') output_result = step.get('output_result') error_message = step.get('error_message') duration_ms = None if output_result and isinstance(output_result, dict): duration_ms = output_result.get('duration_ms') lines.append(f"{heading} {prefix}{name}") lines.append("") # 公式说明 if formula_text: if level == 0: lines.append(f"> {formula_text}") lines.append("") else: lines.append("计算公式:") lines.append(f"`{formula_text}`") lines.append("") # 状态和耗时 status_label = { 'processing': '计算中', 'completed': '已完成', 'failed': '计算失败' }.get(status, status) lines.append(f"**状态**: {status_label}") if duration_ms is not None: lines.append(f"**耗时**: {duration_ms}ms") lines.append("") # 输入参数 if input_params: lines.append("**输入参数**:") lines.extend(_format_json_block(input_params, "")) lines.append("") # 输出结果 if output_result: # 移除 duration_ms,因为已经在状态中显示了 result_display = {k: v for k, v in output_result.items() if k != 'duration_ms'} if result_display: lines.append("**输出结果**:") lines.extend(_format_json_block(result_display, "")) lines.append("") # 错误信息 if error_message: lines.append(f"> ⚠️ **错误**: {error_message}") lines.append("") # 子节点 children = node.get('children', []) if children: for idx, child in enumerate(children, start=1): lines.append("") child_prefix = f"{prefix}{idx}." render_node(child, level + 1, child_prefix) lines.append("") # 渲染所有根节点 for idx, root in enumerate(formula_tree['roots'], start=1): prefix = f"{idx}." render_node(root, 0, prefix) return "\n".join(lines) async def create(self, data: ValuationAssessmentCreate, user_id: int) -> ValuationAssessmentOut: """创建估值评估""" # 将用户ID添加到数据中 create_data = data.model_dump() create_data['user_id'] = user_id valuation = await self.model.create(**create_data) out = ValuationAssessmentOut.model_validate(valuation) return await self._attach_user_phone(out) async def get_by_id(self, valuation_id: int) -> Optional[ValuationAssessmentOut]: """根据ID获取估值评估""" valuation = await self.model.filter(id=valuation_id, is_active=True).first() if valuation: out = ValuationAssessmentOut.model_validate(valuation) return await self._attach_user_phone(out) return None async def update(self, valuation_id: int, data: ValuationAssessmentUpdate) -> Optional[ValuationAssessmentOut]: """更新估值评估""" valuation = await self.model.filter(id=valuation_id, is_active=True).first() if not valuation: return None update_data = data.model_dump(exclude_unset=True) if update_data: if 'certificate_url' in update_data and update_data.get('certificate_url'): from datetime import datetime update_data['audited_at'] = datetime.now() await valuation.update_from_dict(update_data) await valuation.save() from datetime import datetime valuation.status = "success" if not getattr(valuation, "audited_at", None): valuation.audited_at = datetime.now() await valuation.save() out = ValuationAssessmentOut.model_validate(valuation) return await self._attach_user_phone(out) async def delete(self, valuation_id: int) -> bool: """软删除估值评估""" valuation = await self.model.filter(id=valuation_id, is_active=True).first() if not valuation: return False valuation.is_active = False await valuation.save() return True async def get_list(self, query: ValuationAssessmentQuery) -> ValuationAssessmentList: """获取估值评估列表""" queryset = self._build_query(query) # 计算总数 total = await queryset.count() # 分页查询 offset = (query.page - 1) * query.size valuations = await queryset.offset(offset).limit(query.size).order_by('-created_at') # 转换为输出模型 items = [ValuationAssessmentOut.model_validate(v) for v in valuations] items = await self._attach_user_phone_bulk(items) # 计算总页数 pages = (total + query.size - 1) // query.size return ValuationAssessmentList( items=items, total=total, page=query.page, size=query.size, pages=pages ) def _build_query(self, query: ValuationAssessmentQuery) -> QuerySet: """构建查询条件""" queryset = self.model.filter(is_active=True) if query.asset_name: queryset = queryset.filter(asset_name__icontains=query.asset_name) if query.institution: queryset = queryset.filter(institution__icontains=query.institution) if query.industry: queryset = queryset.filter(industry__icontains=query.industry) if query.heritage_level: queryset = queryset.filter(heritage_level__icontains=query.heritage_level) if query.is_active is not None: queryset = queryset.filter(is_active=query.is_active) # 添加状态筛选 if hasattr(query, 'status') and query.status: queryset = queryset.filter(status=query.status) if getattr(query, 'phone', None): queryset = queryset.filter(user__phone__icontains=query.phone) def _parse_time(v: Optional[str]): if not v: return None try: iv = int(v) from datetime import datetime return datetime.fromtimestamp(iv / 1000) except Exception: try: from datetime import datetime return datetime.fromisoformat(v) except Exception: return None s_dt = _parse_time(getattr(query, 'submitted_start', None)) e_dt = _parse_time(getattr(query, 'submitted_end', None)) if s_dt: queryset = queryset.filter(created_at__gte=s_dt) if e_dt: queryset = queryset.filter(created_at__lte=e_dt) a_s_dt = _parse_time(getattr(query, 'audited_start', None)) a_e_dt = _parse_time(getattr(query, 'audited_end', None)) if a_s_dt: queryset = queryset.filter(updated_at__gte=a_s_dt) if a_e_dt: queryset = queryset.filter(updated_at__lte=a_e_dt) return queryset async def get_statistics(self) -> dict: """获取统计信息""" total_count = await self.model.filter(is_active=True).count() # 按行业统计 industry_stats = await self.model.filter(is_active=True).group_by('industry').annotate(count=Count('id')).values('industry', 'count') # 按非遗等级统计 heritage_level_stats = await self.model.filter( is_active=True, heritage_level__isnull=False ).group_by('heritage_level').annotate(count=Count('id')).values('heritage_level', 'count') return { 'total_count': total_count, 'industry_distribution': industry_stats, 'heritage_level_distribution': heritage_level_stats } async def search(self, keyword: str, page: int = 1, size: int = 10) -> ValuationAssessmentList: """全文搜索""" queryset = self.model.filter( Q(asset_name__icontains=keyword) | Q(institution__icontains=keyword) | Q(industry__icontains=keyword) | Q(heritage_level__icontains=keyword), is_active=True ) # 计算总数 total = await queryset.count() # 分页查询 offset = (page - 1) * size valuations = await queryset.offset(offset).limit(size).order_by('-created_at') # 转换为输出模型 items = [ValuationAssessmentOut.model_validate(v) for v in valuations] items = await self._attach_user_phone_bulk(items) # 计算总页数 pages = (total + size - 1) // size return ValuationAssessmentList( items=items, total=total, page=page, size=size, pages=pages ) async def approve_valuation(self, valuation_id: int, admin_notes: Optional[str] = None) -> Optional[ValuationAssessmentOut]: """审核通过估值评估""" valuation = await self.model.filter(id=valuation_id, is_active=True).first() if not valuation: return None from datetime import datetime update_data = {"status": "pending", "audited_at": datetime.now()} if admin_notes: update_data["admin_notes"] = admin_notes await valuation.update_from_dict(update_data).save() out = ValuationAssessmentOut.model_validate(valuation) return await self._attach_user_phone(out) async def reject_valuation(self, valuation_id: int, admin_notes: Optional[str] = None) -> Optional[ValuationAssessmentOut]: """审核拒绝估值评估""" valuation = await self.model.filter(id=valuation_id, is_active=True).first() if not valuation: return None from datetime import datetime update_data = {"status": "rejected", "audited_at": datetime.now()} if admin_notes: update_data["admin_notes"] = admin_notes await valuation.update_from_dict(update_data).save() out = ValuationAssessmentOut.model_validate(valuation) return await self._attach_user_phone(out) async def update_admin_notes(self, valuation_id: int, admin_notes: str) -> Optional[ValuationAssessmentOut]: """更新管理员备注""" valuation = await self.model.filter(id=valuation_id, is_active=True).first() if not valuation: return None await valuation.update_from_dict({"admin_notes": admin_notes}).save() out = ValuationAssessmentOut.model_validate(valuation) return await self._attach_user_phone(out) async def _attach_user_phone(self, out: ValuationAssessmentOut) -> ValuationAssessmentOut: user = await AppUser.filter(id=out.user_id).first() out.user_phone = getattr(user, "phone", None) if user else None return out async def _attach_user_phone_bulk(self, items: List[ValuationAssessmentOut]) -> List[ValuationAssessmentOut]: ids = list({item.user_id for item in items if item.user_id}) if not ids: return items users = await AppUser.filter(id__in=ids).values("id", "phone") phone_map = {u["id"]: u["phone"] for u in users} for item in items: item.user_phone = phone_map.get(item.user_id) return items # 创建控制器实例 valuation_controller = ValuationController() from app.log import logger