* 'main' of https://git.1024tool.vip/zfc/guzhi:
  up repo md
  refactor(valuation): 重构评估状态逻辑并添加创建时间筛选
This commit is contained in:
Wei_佳 2025-11-26 17:47:30 +08:00
commit 5059e57f19
14 changed files with 1319 additions and 335 deletions

View File

@ -60,6 +60,33 @@ async def get_valuation_steps(valuation_id: int):
return Success(data=steps_out, msg="获取计算步骤成功") return Success(data=steps_out, msg="获取计算步骤成功")
@valuations_router.get("/{valuation_id}/report", summary="获取估值计算报告Markdown格式")
async def get_valuation_report(valuation_id: int):
"""
根据估值ID生成计算过程的 Markdown 报告
返回格式化的 Markdown 文档包含
- 估值基本信息
- 计算结果摘要
- 详细计算过程按公式层级组织
- 每个公式的输入参数输出结果状态等信息
"""
try:
markdown = await valuation_controller.get_calculation_report_markdown(valuation_id)
from fastapi import Response
return Response(
content=markdown,
media_type="text/markdown; charset=utf-8",
headers={
"Content-Disposition": f'attachment; filename="valuation_report_{valuation_id}.md"'
}
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"生成报告失败: {str(e)}")
@valuations_router.put("/{valuation_id}", summary="更新估值评估", response_model=BasicResponse[ValuationAssessmentOut]) @valuations_router.put("/{valuation_id}", summary="更新估值评估", response_model=BasicResponse[ValuationAssessmentOut])
async def update_valuation(valuation_id: int, data: ValuationAssessmentUpdate): async def update_valuation(valuation_id: int, data: ValuationAssessmentUpdate):
"""更新估值评估记录""" """更新估值评估记录"""

View File

@ -1,4 +1,5 @@
from typing import List, Optional import json
from typing import Any, Dict, List, Optional
from tortoise.expressions import Q from tortoise.expressions import Q
from tortoise.queryset import QuerySet from tortoise.queryset import QuerySet
from tortoise.functions import Count from tortoise.functions import Count
@ -14,6 +15,7 @@ from app.schemas.valuation import (
ValuationCalculationStepOut ValuationCalculationStepOut
) )
from app.models.user import AppUser from app.models.user import AppUser
from app.utils.calculation_engine.formula_registry import get_formula_meta
class ValuationController: class ValuationController:
@ -41,6 +43,152 @@ class ValuationController:
) )
return ValuationCalculationStepOut.model_validate(step) return ValuationCalculationStepOut.model_validate(step)
async def log_formula_step(
self,
valuation_id: int,
formula_code: str,
*,
status: str = "processing",
input_params: Optional[Dict[str, Any]] = None,
output_result: Optional[Dict[str, Any]] = None,
error_message: Optional[str] = None,
step_description: Optional[str] = None,
duration_ms: Optional[int] = None,
) -> ValuationCalculationStepOut:
"""
幂等记录或更新某个公式节点的计算过程
"""
meta = get_formula_meta(formula_code)
description = step_description or meta.formula
create_payload: Dict[str, Any] = {
"valuation_id": valuation_id,
"formula_code": meta.code,
"formula_name": meta.name,
"formula_text": meta.formula,
"parent_formula_code": meta.parent_code,
"group_code": meta.group_code,
"step_order": meta.order,
"step_name": meta.name,
"step_description": description,
"status": status,
}
if input_params is not None:
create_payload["input_params"] = input_params
if output_result is not None:
create_payload["output_result"] = output_result
if error_message is not None:
create_payload["error_message"] = error_message
# 准备更新字段
update_fields: Dict[str, Any] = {
"status": status,
"step_description": description,
"formula_name": meta.name,
"formula_text": meta.formula,
"parent_formula_code": meta.parent_code,
"group_code": meta.group_code,
"step_order": meta.order,
"step_name": meta.name,
}
if input_params is not None:
update_fields["input_params"] = input_params
if output_result is not None:
update_fields["output_result"] = output_result
if error_message is not None:
update_fields["error_message"] = error_message
if duration_ms is not None:
result = update_fields.get("output_result") or {}
if not isinstance(result, dict):
result = {}
result["duration_ms"] = duration_ms
update_fields["output_result"] = result
# 先尝试查询是否存在(明确排除 formula_code 为 NULL 的情况)
step = await self.step_model.filter(
valuation_id=valuation_id,
formula_code=meta.code
).first()
# 如果没找到,再检查是否有 formula_code 为 NULL 的旧记录(不应该有,但为了安全)
if not step and meta.code:
# 检查是否有重复的旧记录formula_code 为 NULL
old_steps = await self.step_model.filter(
valuation_id=valuation_id,
formula_code__isnull=True
).all()
if old_steps:
logger.warning(
"calcstep.log_formula found old records with NULL formula_code: valuation_id={} count={}",
valuation_id,
len(old_steps),
)
logger.info(
"calcstep.log_formula query: valuation_id={} formula_code={} found={}",
valuation_id,
meta.code,
step is not None,
)
if step:
# 更新现有记录
await step.update_from_dict(update_fields).save()
logger.info(
"calcstep.log_formula updated valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
else:
# 尝试创建新记录
if duration_ms is not None:
result = create_payload.setdefault("output_result", {}) or {}
if not isinstance(result, dict):
result = {}
result["duration_ms"] = duration_ms
create_payload["output_result"] = result
try:
step = await self.step_model.create(**create_payload)
logger.info(
"calcstep.log_formula created valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
except Exception as e:
# 如果因为唯一约束冲突而失败(可能是并发插入),重新查询并更新
error_str = str(e).lower()
if "duplicate" in error_str or "unique" in error_str or "1062" in error_str:
logger.warning(
"calcstep.log_formula duplicate key detected, retrying query: {}",
str(e),
)
# 重新查询(可能已被其他请求插入)
step = await self.step_model.filter(
valuation_id=valuation_id,
formula_code=meta.code
).first()
if step:
# 更新刚插入的记录
await step.update_from_dict(update_fields).save()
logger.info(
"calcstep.log_formula updated after duplicate key: valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
else:
# 如果还是找不到,记录错误但继续
logger.error(
"calcstep.log_formula failed to find record after duplicate key error: valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
raise
else:
# 其他错误直接抛出
raise
return ValuationCalculationStepOut.model_validate(step)
async def update_calculation_step(self, step_id: int, update: dict) -> ValuationCalculationStepOut: async def update_calculation_step(self, step_id: int, update: dict) -> ValuationCalculationStepOut:
step = await self.step_model.filter(id=step_id).first() step = await self.step_model.filter(id=step_id).first()
if not step: if not step:
@ -71,6 +219,247 @@ class ValuationController:
logger.info("calcstep.list valuation_id={} count={}", valuation_id, len(steps)) logger.info("calcstep.list valuation_id={} count={}", valuation_id, len(steps))
return [ValuationCalculationStepOut.model_validate(step) for step in steps] return [ValuationCalculationStepOut.model_validate(step) for step in steps]
async def get_calculation_report_markdown(self, valuation_id: int) -> str:
"""
根据估值ID生成计算过程的 Markdown 报告
此方法会查询所有相关的计算步骤按照公式的层级关系组织
并生成格式化的 Markdown 文档包含
- 公式名称和说明
- 输入参数
- 输出结果
- 计算状态
- 错误信息如果有
Args:
valuation_id (int): 估值的唯一标识符
Returns:
str: Markdown 格式的计算报告
Raises:
ValueError: 如果找不到对应的估值记录
"""
# 验证估值记录是否存在
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
raise ValueError(f"估值记录不存在: {valuation_id}")
# 获取所有计算步骤
steps = await self.step_model.filter(valuation_id=valuation_id).order_by('step_order')
if not steps:
return f"# 估值计算报告\n\n**估值ID**: {valuation_id}\n\n**资产名称**: {valuation.asset_name}\n\n> 暂无计算步骤记录。\n"
# 转换为字典列表,便于处理
steps_data = []
for step in steps:
step_dict = ValuationCalculationStepOut.model_validate(step).model_dump()
steps_data.append(step_dict)
# 构建公式树形结构
formula_tree = self._build_formula_tree(steps_data)
# 生成 Markdown
markdown = self._generate_markdown(valuation, formula_tree)
logger.info("calcstep.report_markdown generated valuation_id={} steps_count={}", valuation_id, len(steps_data))
return markdown
def _build_formula_tree(self, steps: List[Dict]) -> Dict:
"""
构建公式的树形结构
Args:
steps: 计算步骤列表
Returns:
Dict: 树形结构的字典key formula_codevalue 为步骤数据和子节点
"""
# 按 formula_code 索引
step_map = {}
for step in steps:
code = step.get('formula_code')
if code:
step_map[code] = step
# 构建树形结构
tree = {}
processed = set()
# 第一遍:创建所有节点
for step in steps:
code = step.get('formula_code')
if not code or code in processed:
continue
node = {
'step': step,
'children': []
}
tree[code] = node
processed.add(code)
# 第二遍:建立父子关系
root_nodes = []
for step in steps:
code = step.get('formula_code')
if not code:
continue
parent_code = step.get('parent_formula_code')
node = tree[code]
if parent_code and parent_code in tree:
# 有父节点,添加到父节点的 children
tree[parent_code]['children'].append(node)
else:
# 根节点
root_nodes.append(node)
# 按 step_order 排序
def sort_nodes(nodes):
nodes.sort(key=lambda n: float(n['step'].get('step_order', 0)))
for node in nodes:
if node['children']:
sort_nodes(node['children'])
sort_nodes(root_nodes)
return {'roots': root_nodes, 'all': tree}
def _generate_markdown(self, valuation, formula_tree: Dict) -> str:
"""
生成 Markdown 格式的报告
Args:
valuation: 估值评估对象
formula_tree: 公式树形结构
Returns:
str: Markdown 格式的字符串
"""
lines = []
# 标题和基本信息
lines.append("# 估值计算报告")
lines.append("")
lines.append("## 基本信息")
lines.append("")
lines.append("| 字段 | 值 |")
lines.append("|------|----|")
lines.append(f"| 估值ID | {valuation.id} |")
lines.append(f"| 资产名称 | {valuation.asset_name} |")
lines.append(f"| 所属机构 | {valuation.institution} |")
lines.append(f"| 所属行业 | {valuation.industry} |")
heritage_value = valuation.heritage_level if valuation.heritage_level else "-"
lines.append(f"| 非遗等级 | {heritage_value} |")
created_at_str = valuation.created_at.strftime("%Y-%m-%d %H:%M:%S") if valuation.created_at else "N/A"
lines.append(f"| 创建时间 | {created_at_str} |")
lines.append("")
# 计算结果摘要
if valuation.final_value_ab is not None:
lines.append("## 计算结果摘要")
lines.append("")
lines.append("| 项目 | 数值(万元) |")
lines.append("|------|-------------|")
if valuation.model_value_b is not None:
lines.append(f"| 模型估值B | {valuation.model_value_b:.2f} |")
if valuation.market_value_c is not None:
lines.append(f"| 市场估值C | {valuation.market_value_c:.2f} |")
lines.append(f"| **最终估值AB** | **{valuation.final_value_ab:.2f}** |")
if valuation.dynamic_pledge_rate is not None:
lines.append(f"| 动态质押率 | {valuation.dynamic_pledge_rate:.4f} |")
lines.append("")
# 详细计算过程
lines.append("## 详细计算过程")
lines.append("")
def _format_json_block(value: Any, indent_prefix: str = "") -> List[str]:
"""格式化 JSON 代码块"""
json_text = json.dumps(value, ensure_ascii=False, indent=2)
block_lines = [f"{indent_prefix}```json"]
for line in json_text.splitlines():
block_lines.append(f"{indent_prefix}{line}")
block_lines.append(f"{indent_prefix}```")
return block_lines
# 递归生成公式树
heading_levels = ["####", "#####", "######", "######", "######"]
def render_node(node: Dict, level: int = 0, prefix: str = ""):
step = node['step']
heading = heading_levels[min(level, len(heading_levels) - 1)]
name = step.get('formula_name', step.get('step_name', '未知'))
formula_text = step.get('formula_text', step.get('step_description', ''))
status = step.get('status', 'unknown')
input_params = step.get('input_params')
output_result = step.get('output_result')
error_message = step.get('error_message')
duration_ms = None
if output_result and isinstance(output_result, dict):
duration_ms = output_result.get('duration_ms')
lines.append(f"{heading} {prefix}{name}")
lines.append("")
# 公式说明
if formula_text:
if level == 0:
lines.append(f"> {formula_text}")
lines.append("")
else:
lines.append("计算公式:")
lines.append(f"`{formula_text}`")
lines.append("")
# 状态和耗时
status_label = {
'processing': '计算中',
'completed': '已完成',
'failed': '计算失败'
}.get(status, status)
lines.append(f"**状态**: {status_label}")
if duration_ms is not None:
lines.append(f"**耗时**: {duration_ms}ms")
lines.append("")
# 输入参数
if input_params:
lines.append("**输入参数**:")
lines.extend(_format_json_block(input_params, ""))
lines.append("")
# 输出结果
if output_result:
# 移除 duration_ms因为已经在状态中显示了
result_display = {k: v for k, v in output_result.items() if k != 'duration_ms'}
if result_display:
lines.append("**输出结果**:")
lines.extend(_format_json_block(result_display, ""))
lines.append("")
# 错误信息
if error_message:
lines.append(f"> ⚠️ **错误**: {error_message}")
lines.append("")
# 子节点
children = node.get('children', [])
if children:
for idx, child in enumerate(children, start=1):
lines.append("")
child_prefix = f"{prefix}{idx}."
render_node(child, level + 1, child_prefix)
lines.append("")
# 渲染所有根节点
for idx, root in enumerate(formula_tree['roots'], start=1):
prefix = f"{idx}."
render_node(root, 0, prefix)
return "\n".join(lines)
async def create(self, data: ValuationAssessmentCreate, user_id: int) -> ValuationAssessmentOut: async def create(self, data: ValuationAssessmentCreate, user_id: int) -> ValuationAssessmentOut:
"""创建估值评估""" """创建估值评估"""
# 将用户ID添加到数据中 # 将用户ID添加到数据中

View File

@ -284,7 +284,90 @@ async def init_apis():
await api_controller.refresh_api() await api_controller.refresh_api()
async def _ensure_unique_index():
"""确保 valuation_calculation_steps 表的唯一索引存在"""
try:
conn_alias = settings.TORTOISE_ORM["apps"]["models"]["default_connection"]
from tortoise import connections
conn = connections.get(conn_alias)
# 检查表是否存在
result = await conn.execute_query(
"SHOW TABLES LIKE 'valuation_calculation_steps'"
)
if not result or len(result[1]) == 0:
logger.info("Table valuation_calculation_steps does not exist, skipping index check")
return
# 检查唯一索引是否存在
# 查找包含 valuation_id 和 formula_code 的唯一索引
index_result = await conn.execute_query(
"SHOW INDEX FROM `valuation_calculation_steps` WHERE Non_unique = 0 AND Column_name IN ('valuation_id', 'formula_code')"
)
# 查找是否存在 (valuation_id, formula_code) 的唯一索引
# 对于复合索引SHOW INDEX 会返回多行,每行对应一个列
# 需要检查是否有同一个 Key_name 包含两个列
has_unique_index = False
if index_result and len(index_result) > 1:
# 按 Key_name 分组
index_groups = {}
for row in index_result[1]:
if len(row) >= 5:
key_name = row[2] if len(row) > 2 else ""
non_unique = row[1] if len(row) > 1 else 1
column_name = row[4] if len(row) > 4 else ""
seq_in_index = row[3] if len(row) > 3 else 0
if non_unique == 0 and column_name in ('valuation_id', 'formula_code'):
if key_name not in index_groups:
index_groups[key_name] = []
index_groups[key_name].append(column_name)
# 检查是否有索引包含两个列
for key_name, columns in index_groups.items():
if 'valuation_id' in columns and 'formula_code' in columns:
has_unique_index = True
logger.debug(f"Found unique index: {key_name} on (valuation_id, formula_code)")
break
if not has_unique_index:
logger.warning("Unique index on (valuation_id, formula_code) not found, attempting to create...")
try:
# 先删除可能存在的重复记录
await conn.execute_query("""
DELETE t1 FROM `valuation_calculation_steps` t1
INNER JOIN `valuation_calculation_steps` t2
WHERE t1.id > t2.id
AND t1.valuation_id = t2.valuation_id
AND t1.formula_code = t2.formula_code
AND t1.formula_code IS NOT NULL
""")
logger.info("Cleaned up duplicate records")
# 创建唯一索引
await conn.execute_query("""
CREATE UNIQUE INDEX `uidx_valuation_formula`
ON `valuation_calculation_steps` (`valuation_id`, `formula_code`)
""")
logger.info("Created unique index on (valuation_id, formula_code)")
except Exception as idx_err:
error_str = str(idx_err).lower()
if "duplicate key name" in error_str or "already exists" in error_str:
logger.info("Unique index already exists (different name)")
else:
logger.warning(f"Failed to create unique index: {idx_err}")
else:
logger.debug("Unique index on (valuation_id, formula_code) already exists")
except Exception as e:
logger.warning(f"Failed to ensure unique index: {e}")
async def init_db(): async def init_db():
import os
from pathlib import Path
from tortoise import Tortoise
from tortoise.exceptions import OperationalError
command = Command(tortoise_config=settings.TORTOISE_ORM) command = Command(tortoise_config=settings.TORTOISE_ORM)
try: try:
await command.init_db(safe=True) await command.init_db(safe=True)
@ -292,14 +375,84 @@ async def init_db():
pass pass
await command.init() await command.init()
# 检查并清理可能冲突的迁移文件(避免交互式提示)
# Aerich 在检测到迁移文件已存在时会交互式提示,我们提前删除冲突文件
migrations_dir = Path("migrations/models")
if migrations_dir.exists():
# 查找包含 "update" 的迁移文件(通常是自动生成的冲突文件)
for migration_file in migrations_dir.glob("*update*.py"):
if migration_file.name != "__init__.py":
logger.info(f"Removing conflicting migration file: {migration_file.name}")
migration_file.unlink()
# 尝试执行 migrate
try: try:
await command.migrate() await command.migrate()
except AttributeError: except AttributeError:
logger.warning("unable to retrieve model history from database, model history will be created from scratch") logger.warning("unable to retrieve model history from database, model history will be created from scratch")
shutil.rmtree("migrations") shutil.rmtree("migrations")
await command.init_db(safe=True) await command.init_db(safe=True)
except Exception as e:
# 如果 migrate 失败,记录警告但继续执行 upgrade
logger.warning(f"Migrate failed: {e}, continuing with upgrade...")
await command.upgrade(run_in_transaction=True) # 在 upgrade 之前,先检查表是否存在,如果不存在则先创建表
try:
await command.upgrade(run_in_transaction=True)
# upgrade 成功后,验证并修复唯一索引
await _ensure_unique_index()
except (OperationalError, Exception) as e:
error_msg = str(e)
# 如果是因为表不存在而失败,先让 Tortoise 生成表结构
if "doesn't exist" in error_msg.lower() or ("table" in error_msg.lower() and "valuation_calculation_steps" in error_msg):
logger.warning(f"Table not found during upgrade: {error_msg}, generating schemas first...")
# 确保 Tortoise 已初始化Aerich 的 init 应该已经初始化了,但为了安全再检查)
try:
# 生成表结构safe=True 表示如果表已存在则跳过)
await Tortoise.generate_schemas(safe=True)
logger.info("Tables generated successfully, retrying upgrade...")
# 重新尝试 upgrade这次应该会成功因为表已经存在
try:
await command.upgrade(run_in_transaction=True)
except Exception as upgrade_err:
# 如果 upgrade 仍然失败,可能是迁移文件的问题,记录警告但继续
logger.warning(f"Upgrade still failed after generating schemas: {upgrade_err}, continuing anyway...")
except Exception as gen_err:
logger.error(f"Failed to generate schemas: {gen_err}")
raise
# 如果是重复字段错误,说明迁移已经执行过,直接跳过并确保索引
elif "duplicate column name" in error_msg.lower():
logger.warning(f"Duplicate column detected during upgrade: {error_msg}, skipping migration step and ensuring schema integrity...")
await _ensure_unique_index()
# 如果是重复索引错误,删除表并重新创建(最简单可靠的方法)
elif "duplicate key" in error_msg.lower() or "duplicate key name" in error_msg.lower():
logger.warning(f"Duplicate index detected: {error_msg}, dropping and recreating table...")
try:
# Aerich 的 command.init() 已经初始化了 Tortoise直接使用连接
# 连接别名是 "mysql"(从配置中读取)
conn_alias = settings.TORTOISE_ORM["apps"]["models"]["default_connection"]
from tortoise import connections
# 尝试获取连接,如果失败则重新初始化
try:
conn = connections.get(conn_alias)
except Exception:
# 如果连接不存在,重新初始化 Tortoise
await Tortoise.init(config=settings.TORTOISE_ORM)
conn = connections.get(conn_alias)
# 删除表
await conn.execute_query("DROP TABLE IF EXISTS `valuation_calculation_steps`")
logger.info("Dropped valuation_calculation_steps table")
# 重新生成表结构(包含正确的唯一索引)
# 使用 safe=True 避免尝试创建已存在的其他表(如 user_role只创建不存在的表
await Tortoise.generate_schemas(safe=True)
logger.info("Table regenerated successfully with correct unique index")
except Exception as recreate_err:
logger.error(f"Failed to recreate table: {recreate_err}")
raise
else:
raise
async def init_roles(): async def init_roles():

View File

@ -149,7 +149,8 @@ class HttpAuditLogMiddleware(BaseHTTPMiddleware):
try: try:
return json.loads(stripped) return json.loads(stripped)
except (ValueError, TypeError): except (ValueError, TypeError):
return stripped # 将非 JSON 字符串包装为字典,以便 JSONField 能够正确存储
return {"text": stripped}
if isinstance(value, (dict, list, int, float, bool)): if isinstance(value, (dict, list, int, float, bool)):
return value return value

View File

@ -101,19 +101,28 @@ class ValuationCalculationStep(Model):
"""估值计算步骤模型""" """估值计算步骤模型"""
id = fields.IntField(pk=True, description="主键ID") id = fields.IntField(pk=True, description="主键ID")
valuation = fields.ForeignKeyField("models.ValuationAssessment", related_name="calculation_steps", description="关联的估值评估") valuation = fields.ForeignKeyField("models.ValuationAssessment", related_name="calculation_steps", description="关联的估值评估")
formula_code = fields.CharField(max_length=64, null=True, description="公式编码")
formula_name = fields.CharField(max_length=255, null=True, description="公式名称")
formula_text = fields.TextField(null=True, description="公式说明")
parent_formula_code = fields.CharField(max_length=64, null=True, description="父级公式编码")
group_code = fields.CharField(max_length=64, null=True, description="分组编码")
step_order = fields.DecimalField(max_digits=8, decimal_places=3, description="步骤顺序") step_order = fields.DecimalField(max_digits=8, decimal_places=3, description="步骤顺序")
step_name = fields.CharField(max_length=255, description="步骤名称") step_name = fields.CharField(max_length=255, description="步骤名称")
step_description = fields.TextField(null=True, description="步骤描述") step_description = fields.TextField(null=True, description="步骤描述")
input_params = fields.JSONField(null=True, description="输入参数") input_params = fields.JSONField(null=True, description="输入参数")
output_result = fields.JSONField(null=True, description="输出结果") output_result = fields.JSONField(null=True, description="输出结果")
status = fields.CharField(max_length=20, default="SUCCESS", description="步骤状态: SUCCESS, FAILED") status = fields.CharField(max_length=20, default="processing", description="步骤状态: processing, completed, failed")
error_message = fields.TextField(null=True, description="错误信息") error_message = fields.TextField(null=True, description="错误信息")
created_at = fields.DatetimeField(auto_now_add=True, description="创建时间") created_at = fields.DatetimeField(auto_now_add=True, description="创建时间")
updated_at = fields.DatetimeField(auto_now=True, description="更新时间")
class Meta: class Meta:
table = "valuation_calculation_steps" table = "valuation_calculation_steps"
table_description = "估值计算步骤表" table_description = "估值计算步骤表"
ordering = ["step_order"] ordering = ["step_order"]
# 唯一索引同一估值ID下同一公式编码只能有一条记录
# 注意formula_code 允许为 NULL但新逻辑中 formula_code 总是有值
unique_together = [("valuation", "formula_code")]
def __str__(self): def __str__(self):
return f"估值ID {self.valuation_id} - 步骤 {self.step_order}: {self.step_name}" return f"估值ID {self.valuation_id} - 步骤 {self.step_order}: {self.step_name}"

View File

@ -354,8 +354,13 @@ class ValuationCalculationStepBase(BaseModel):
step_description: Optional[str] = Field(None, description="步骤描述") step_description: Optional[str] = Field(None, description="步骤描述")
input_params: Optional[Dict[str, Any]] = Field(None, description="输入参数") input_params: Optional[Dict[str, Any]] = Field(None, description="输入参数")
output_result: Optional[Dict[str, Any]] = Field(None, description="输出结果") output_result: Optional[Dict[str, Any]] = Field(None, description="输出结果")
status: str = Field(..., description="步骤状态") status: str = Field(..., description="步骤状态: processing/completed/failed")
error_message: Optional[str] = Field(None, description="错误信息") error_message: Optional[str] = Field(None, description="错误信息")
formula_code: Optional[str] = Field(None, description="公式编码")
formula_name: Optional[str] = Field(None, description="公式名称")
formula_text: Optional[str] = Field(None, description="公式说明")
parent_formula_code: Optional[str] = Field(None, description="父级公式编码")
group_code: Optional[str] = Field(None, description="分组编码")
@field_validator('step_order', mode='before') @field_validator('step_order', mode='before')
@classmethod @classmethod
@ -380,6 +385,7 @@ class ValuationCalculationStepOut(ValuationCalculationStepBase):
id: int = Field(..., description="主键ID") id: int = Field(..., description="主键ID")
valuation_id: int = Field(..., description="关联的估值评估ID") valuation_id: int = Field(..., description="关联的估值评估ID")
created_at: datetime = Field(..., description="创建时间") created_at: datetime = Field(..., description="创建时间")
updated_at: datetime = Field(..., description="更新时间")
class Config: class Config:
from_attributes = True from_attributes = True

View File

@ -1,53 +1,53 @@
''' """
这是非物质文化遗产IP知识产权评估系统的核心计算引擎包 非遗资产估值计算引擎包
''' 提供各类计算器并通过懒加载避免循环依赖
from app.utils.calculation_engine.economic_value_b1 import EconomicValueB1Calculator """
from app.utils.calculation_engine.economic_value_b1.sub_formulas import ( from importlib import import_module
BasicValueB11Calculator, from typing import Any
TrafficFactorB12Calculator,
PolicyMultiplierB13Calculator
)
from app.utils.calculation_engine.cultural_value_b2 import CulturalValueB2Calculator
from app.utils.calculation_engine.cultural_value_b2.sub_formulas import (
LivingHeritageB21Calculator,
PatternGeneB22Calculator
)
from app.utils.calculation_engine.risk_adjustment_b3 import RiskAdjustmentB3Calculator
from app.utils.calculation_engine.market_value_c import MarketValueCCalculator
from app.utils.calculation_engine.market_value_c.sub_formulas import (
MarketBiddingC1Calculator,
HeatCoefficientC2Calculator,
ScarcityMultiplierC3Calculator,
TemporalDecayC4Calculator
)
from app.utils.calculation_engine.final_value_ab import FinalValueACalculator
__version__ = "1.0.0" __version__ = "1.0.0"
__author__ = "Assessment Team" __author__ = "Assessment Team"
__all__ = [ __all__ = [
# 经济价值B1模块
"EconomicValueB1Calculator", "EconomicValueB1Calculator",
"BasicValueB11Calculator", "BasicValueB11Calculator",
"TrafficFactorB12Calculator", "TrafficFactorB12Calculator",
"PolicyMultiplierB13Calculator", "PolicyMultiplierB13Calculator",
# 文化价值B2模块
"CulturalValueB2Calculator", "CulturalValueB2Calculator",
"LivingHeritageB21Calculator", "LivingHeritageB21Calculator",
"PatternGeneB22Calculator", "PatternGeneB22Calculator",
# 风险调整系数B3模块
"RiskAdjustmentB3Calculator", "RiskAdjustmentB3Calculator",
# 市场估值C模块
"MarketValueCCalculator", "MarketValueCCalculator",
"MarketBiddingC1Calculator", "MarketBiddingC1Calculator",
"HeatCoefficientC2Calculator", "HeatCoefficientC2Calculator",
"ScarcityMultiplierC3Calculator", "ScarcityMultiplierC3Calculator",
"TemporalDecayC4Calculator", "TemporalDecayC4Calculator",
"FinalValueACalculator",
# 最终估值A模块
"FinalValueACalculator"
] ]
_EXPORT_MAP = {
"EconomicValueB1Calculator": "app.utils.calculation_engine.economic_value_b1",
"BasicValueB11Calculator": "app.utils.calculation_engine.economic_value_b1.sub_formulas.basic_value_b11",
"TrafficFactorB12Calculator": "app.utils.calculation_engine.economic_value_b1.sub_formulas.traffic_factor_b12",
"PolicyMultiplierB13Calculator": "app.utils.calculation_engine.economic_value_b1.sub_formulas.policy_multiplier_b13",
"CulturalValueB2Calculator": "app.utils.calculation_engine.cultural_value_b2.cultural_value_b2",
"LivingHeritageB21Calculator": "app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21",
"PatternGeneB22Calculator": "app.utils.calculation_engine.cultural_value_b2.sub_formulas.pattern_gene_b22",
"RiskAdjustmentB3Calculator": "app.utils.calculation_engine.risk_adjustment_b3.sub_formulas.risk_adjustment_b3",
"MarketValueCCalculator": "app.utils.calculation_engine.market_value_c.market_value_c",
"MarketBiddingC1Calculator": "app.utils.calculation_engine.market_value_c.sub_formulas.market_bidding_c1",
"HeatCoefficientC2Calculator": "app.utils.calculation_engine.market_value_c.sub_formulas.heat_coefficient_c2",
"ScarcityMultiplierC3Calculator": "app.utils.calculation_engine.market_value_c.sub_formulas.scarcity_multiplier_c3",
"TemporalDecayC4Calculator": "app.utils.calculation_engine.market_value_c.sub_formulas.temporal_decay_c4",
"FinalValueACalculator": "app.utils.calculation_engine.final_value_ab.final_value_a",
}
def __getattr__(name: str) -> Any:
module_path = _EXPORT_MAP.get(name)
if not module_path:
raise AttributeError(f"module {__name__} has no attribute {name}")
module = import_module(module_path)
attr = getattr(module, name)
globals()[name] = attr
return attr

View File

@ -18,13 +18,12 @@ try:
from .sub_formulas.living_heritage_b21 import LivingHeritageB21Calculator from .sub_formulas.living_heritage_b21 import LivingHeritageB21Calculator
from .sub_formulas.pattern_gene_b22 import PatternGeneB22Calculator from .sub_formulas.pattern_gene_b22 import PatternGeneB22Calculator
from app.controllers.valuation import ValuationController from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
except ImportError: except ImportError:
# 绝对导入(当直接运行时) # 绝对导入(当直接运行时)
from sub_formulas.living_heritage_b21 import LivingHeritageB21Calculator from sub_formulas.living_heritage_b21 import LivingHeritageB21Calculator
from sub_formulas.pattern_gene_b22 import PatternGeneB22Calculator from sub_formulas.pattern_gene_b22 import PatternGeneB22Calculator
from app.controllers.valuation import ValuationController from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate from app.controllers.valuation import ValuationController
class CulturalValueB2Calculator: class CulturalValueB2Calculator:
@ -54,7 +53,7 @@ class CulturalValueB2Calculator:
return cultural_value return cultural_value
async def calculate_complete_cultural_value_b2(self, valuation_id: int, input_data: Dict) -> float: async def calculate_complete_cultural_value_b2(self, valuation_id: int, input_data: Dict) -> Dict[str, float]:
""" """
计算完整的文化价值B2并记录所有计算步骤 计算完整的文化价值B2并记录所有计算步骤
@ -73,38 +72,59 @@ class CulturalValueB2Calculator:
} }
Returns: Returns:
float: 计算得出的文化价值B2 Dict[str, float]: 包含文化价值B2及子公式结果的字典
Raises: Raises:
Exception: 在计算过程中遇到的任何异常都会被捕获记录并重新抛出 Exception: 在计算过程中遇到的任何异常都会被捕获记录并重新抛出
""" """
step = await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, "MODEL_B_CULTURAL_B2",
step_order=2.2, status="processing",
step_name="文化价值B2计算", input_params=input_data,
step_description="开始计算文化价值B2公式为活态传承系数B21 × 0.6 + (纹样基因值B22 / 10) × 0.4",
input_params=input_data,
status="in_progress"
)
) )
try: try:
# 计算活态传承系数B21 # 计算活态传承系数B21
teaching_frequency = self.living_heritage_calculator.calculate_teaching_frequency(
input_data["offline_sessions"],
input_data["douyin_views"],
input_data["kuaishou_views"],
input_data["bilibili_views"]
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_CULTURAL_B21_TEACHING_FREQ",
status="completed",
input_params={
"offline_sessions": input_data.get("offline_sessions"),
"douyin_views": input_data.get("douyin_views"),
"kuaishou_views": input_data.get("kuaishou_views"),
"bilibili_views": input_data.get("bilibili_views"),
},
output_result={"teaching_frequency": teaching_frequency},
)
living_heritage_b21 = self.living_heritage_calculator.calculate_living_heritage_b21( living_heritage_b21 = self.living_heritage_calculator.calculate_living_heritage_b21(
input_data['inheritor_level_coefficient'], input_data['inheritor_level_coefficient'],
self.living_heritage_calculator.calculate_teaching_frequency( teaching_frequency,
input_data["offline_sessions"],
input_data["douyin_views"],
input_data["kuaishou_views"],
input_data["bilibili_views"]
),
input_data['cross_border_depth'] input_data['cross_border_depth']
) )
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, step_order=2.21, step_name="活态传承系数B21", "MODEL_B_CULTURAL_B21",
output_result={'living_heritage_b21': living_heritage_b21}, status="completed" status="completed",
) input_params={
"inheritor_level_coefficient": input_data.get("inheritor_level_coefficient"),
"offline_sessions": input_data.get("offline_sessions"),
"douyin_views": input_data.get("douyin_views"),
"kuaishou_views": input_data.get("kuaishou_views"),
"bilibili_views": input_data.get("bilibili_views"),
"cross_border_depth": input_data.get("cross_border_depth"),
},
output_result={
"living_heritage_b21": living_heritage_b21,
"teaching_frequency": teaching_frequency,
},
) )
# 计算纹样基因值B22 # 计算纹样基因值B22
@ -113,11 +133,16 @@ class CulturalValueB2Calculator:
input_data['normalized_entropy'], input_data['normalized_entropy'],
input_data['historical_inheritance'] input_data['historical_inheritance']
) )
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, step_order=2.22, step_name="纹样基因值B22", "MODEL_B_CULTURAL_B22",
output_result={'pattern_gene_b22': pattern_gene_b22}, status="completed" status="completed",
) input_params={
"structure_complexity": input_data.get("structure_complexity"),
"normalized_entropy": input_data.get("normalized_entropy"),
"historical_inheritance": input_data.get("historical_inheritance"),
},
output_result={"pattern_gene_b22": pattern_gene_b22},
) )
# 计算文化价值B2 # 计算文化价值B2
@ -126,14 +151,24 @@ class CulturalValueB2Calculator:
pattern_gene_b22 pattern_gene_b22
) )
await self.valuation_controller.update_calculation_step( result = {
step.id, {"status": "completed", "output_result": {"cultural_value_b2": cultural_value_b2}} "cultural_value_b2": cultural_value_b2,
"living_heritage_b21": living_heritage_b21,
"pattern_gene_b22": pattern_gene_b22,
}
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_CULTURAL_B2",
status="completed",
output_result=result,
) )
return cultural_value_b2 return result
except Exception as e: except Exception as e:
error_message = f"文化价值B2计算失败: {e}" await self.valuation_controller.log_formula_step(
await self.valuation_controller.update_calculation_step( valuation_id,
step.id, {"status": "failed", "error_message": error_message} "MODEL_B_CULTURAL_B2",
status="failed",
error_message=str(e),
) )
raise raise

View File

@ -7,7 +7,6 @@
from typing import Dict from typing import Dict
from app.controllers.valuation import ValuationController from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
from .sub_formulas.basic_value_b11 import BasicValueB11Calculator from .sub_formulas.basic_value_b11 import BasicValueB11Calculator
from .sub_formulas.traffic_factor_b12 import TrafficFactorB12Calculator from .sub_formulas.traffic_factor_b12 import TrafficFactorB12Calculator
@ -46,7 +45,7 @@ class EconomicValueB1Calculator:
return economic_value return economic_value
async def calculate_complete_economic_value_b1(self, valuation_id: int, input_data: Dict) -> float: async def calculate_complete_economic_value_b1(self, valuation_id: int, input_data: Dict) -> Dict[str, float]:
""" """
计算完整的经济价值B1并记录所有计算步骤 计算完整的经济价值B1并记录所有计算步骤
@ -66,81 +65,229 @@ class EconomicValueB1Calculator:
} }
Returns: Returns:
float: 计算得出的经济价值B1 Dict[str, float]: 包含经济价值B1及各子公式结果的字典
Raises: Raises:
Exception: 在计算过程中发生的任何异常都会被捕获记录并重新抛出 Exception: 在计算过程中发生的任何异常都会被捕获记录并重新抛出
""" """
step = await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, "MODEL_B_ECON_B1",
step_order=2.1, status="processing",
step_name="经济价值B1计算", input_params=input_data,
step_description="开始计算经济价值B1公式为基础价值B11 × (1 + 流量因子B12) × 政策乘数B13",
input_params=input_data,
status="in_progress"
)
) )
try: try:
# 计算基础价值B11 financial_value = self.basic_value_calculator.calculate_financial_value_f(input_data["three_year_income"])
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_ECON_B11_FINANCIAL_VALUE",
status="completed",
input_params={"three_year_income": input_data.get("three_year_income")},
output_result={"financial_value_f": financial_value},
)
legal_strength = self.basic_value_calculator.calculate_legal_strength_l(
input_data["patent_score"],
input_data["popularity_score"],
input_data["infringement_score"],
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_ECON_B11_LEGAL_STRENGTH",
status="completed",
input_params={
"patent_score": input_data.get("patent_score"),
"popularity_score": input_data.get("popularity_score"),
"infringement_score": input_data.get("infringement_score"),
},
output_result={"legal_strength_l": legal_strength},
)
development_potential = self.basic_value_calculator.calculate_development_potential_d(
input_data["patent_count"],
input_data["esg_score"],
input_data["innovation_ratio"],
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_ECON_B11_DEVELOPMENT_POTENTIAL",
status="completed",
input_params={
"patent_count": input_data.get("patent_count"),
"esg_score": input_data.get("esg_score"),
"innovation_ratio": input_data.get("innovation_ratio"),
},
output_result={"development_potential_d": development_potential},
)
industry_coefficient = input_data["industry_coefficient"]
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_ECON_B11_INDUSTRY_COEFFICIENT",
status="completed",
input_params={"industry_coefficient": industry_coefficient},
output_result={"industry_coefficient": industry_coefficient},
)
basic_value_b11 = self.basic_value_calculator.calculate_basic_value_b11( basic_value_b11 = self.basic_value_calculator.calculate_basic_value_b11(
self.basic_value_calculator.calculate_financial_value_f(input_data["three_year_income"]), financial_value,
self.basic_value_calculator.calculate_legal_strength_l(input_data["patent_score"], input_data["popularity_score"], input_data["infringement_score"]), legal_strength,
self.basic_value_calculator.calculate_development_potential_d(input_data["patent_count"], input_data["esg_score"], input_data["innovation_ratio"]), development_potential,
input_data["industry_coefficient"] industry_coefficient,
) )
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, step_order=2.11, step_name="基础价值B11", "MODEL_B_ECON_B11",
output_result={'basic_value_b11': basic_value_b11}, status="completed" status="completed",
) input_params={
"financial_value_f": financial_value,
"legal_strength_l": legal_strength,
"development_potential_d": development_potential,
"industry_coefficient": industry_coefficient,
},
output_result={
"basic_value_b11": basic_value_b11,
"financial_value_f": financial_value,
"legal_strength_l": legal_strength,
"development_potential_d": development_potential,
"industry_coefficient": industry_coefficient,
},
)
interaction_index = self.traffic_factor_calculator.calculate_interaction_index(
input_data["likes"],
input_data["comments"],
input_data["shares"],
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_FLOW_B12_INTERACTION_INDEX",
status="completed",
input_params={
"likes": input_data.get("likes"),
"comments": input_data.get("comments"),
"shares": input_data.get("shares"),
},
output_result={"interaction_index": interaction_index},
)
coverage_index = self.traffic_factor_calculator.calculate_coverage_index(input_data.get("followers", 0))
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_FLOW_B12_COVERAGE_INDEX",
status="completed",
input_params={"followers": input_data.get("followers", 0)},
output_result={"coverage_index": coverage_index},
)
conversion_efficiency = self.traffic_factor_calculator.calculate_conversion_efficiency(
input_data["sales_volume"],
input_data["link_views"],
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_FLOW_B12_CONVERSION_EFFICIENCY",
status="completed",
input_params={
"sales_volume": input_data.get("sales_volume"),
"link_views": input_data.get("link_views"),
},
output_result={"conversion_efficiency": conversion_efficiency},
)
social_media_spread_s3 = self.traffic_factor_calculator.calculate_social_media_spread_s3(
interaction_index,
coverage_index,
conversion_efficiency,
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_FLOW_B12_SOCIAL_SPREAD",
status="completed",
input_params={
"interaction_index": interaction_index,
"coverage_index": coverage_index,
"conversion_efficiency": conversion_efficiency,
},
output_result={"social_media_spread_s3": social_media_spread_s3},
) )
# 计算流量因子B12
traffic_factor_b12 = self.traffic_factor_calculator.calculate_traffic_factor_b12( traffic_factor_b12 = self.traffic_factor_calculator.calculate_traffic_factor_b12(
input_data['search_index_s1'], input_data["search_index_s1"],
input_data['industry_average_s2'], input_data["industry_average_s2"],
self.traffic_factor_calculator.calculate_social_media_spread_s3( social_media_spread_s3,
self.traffic_factor_calculator.calculate_interaction_index(input_data["likes"], input_data["comments"], input_data["shares"]),
self.traffic_factor_calculator.calculate_coverage_index(0),
self.traffic_factor_calculator.calculate_conversion_efficiency(input_data["sales_volume"], input_data["link_views"])
)
) )
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, step_order=2.12, step_name="流量因子B12", "MODEL_B_FLOW_B12",
output_result={'traffic_factor_b12': traffic_factor_b12}, status="completed" status="completed",
) input_params={
"search_index_s1": input_data.get("search_index_s1"),
"industry_average_s2": input_data.get("industry_average_s2"),
},
output_result={
"traffic_factor_b12": traffic_factor_b12,
"social_media_spread_s3": social_media_spread_s3,
},
)
policy_compatibility = self.policy_multiplier_calculator.calculate_policy_compatibility_score(
input_data["policy_match_score"],
input_data["implementation_stage"],
input_data["funding_support"],
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_POLICY_B13",
status="processing",
input_params={
"policy_match_score": input_data.get("policy_match_score"),
"implementation_stage": input_data.get("implementation_stage"),
"funding_support": input_data.get("funding_support"),
},
output_result={"policy_compatibility_score": policy_compatibility},
) )
# 计算政策乘数B13
policy_multiplier_b13 = self.policy_multiplier_calculator.calculate_policy_multiplier_b13( policy_multiplier_b13 = self.policy_multiplier_calculator.calculate_policy_multiplier_b13(
self.policy_multiplier_calculator.calculate_policy_compatibility_score( policy_compatibility,
input_data["policy_match_score"], input_data["implementation_stage"], input_data["funding_support"]
)
) )
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, step_order=2.13, step_name="政策乘数B13", "MODEL_B_POLICY_B13",
output_result={'policy_multiplier_b13': policy_multiplier_b13}, status="completed" status="completed",
) output_result={
"policy_multiplier_b13": policy_multiplier_b13,
"policy_compatibility_score": policy_compatibility,
},
) )
# 计算经济价值B1
economic_value_b1 = self.calculate_economic_value_b1( economic_value_b1 = self.calculate_economic_value_b1(
basic_value_b11, basic_value_b11,
traffic_factor_b12, traffic_factor_b12,
policy_multiplier_b13 policy_multiplier_b13,
) )
result = {
await self.valuation_controller.update_calculation_step( "economic_value_b1": economic_value_b1,
step.id, {"status": "completed", "output_result": {"economic_value_b1": economic_value_b1}} "basic_value_b11": basic_value_b11,
"traffic_factor_b12": traffic_factor_b12,
"policy_multiplier_b13": policy_multiplier_b13,
"financial_value_f": financial_value,
"legal_strength_l": legal_strength,
"development_potential_d": development_potential,
}
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_ECON_B1",
status="completed",
output_result=result,
) )
return economic_value_b1 return result
except Exception as e: except Exception as e:
error_message = f"经济价值B1计算失败: {e}" await self.valuation_controller.log_formula_step(
await self.valuation_controller.update_calculation_step( valuation_id,
step.id, {"status": "failed", "error_message": error_message} "MODEL_B_ECON_B1",
status="failed",
error_message=str(e),
) )
raise raise

View File

@ -20,13 +20,11 @@ try:
from .model_value_b import ModelValueBCalculator from .model_value_b import ModelValueBCalculator
from ..market_value_c import MarketValueCCalculator from ..market_value_c import MarketValueCCalculator
from app.controllers.valuation import ValuationController from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
except ImportError: except ImportError:
# 直接运行时的绝对导入 # 直接运行时的绝对导入
from app.utils.calculation_engine.final_value_ab.model_value_b import ModelValueBCalculator from app.utils.calculation_engine.final_value_ab.model_value_b import ModelValueBCalculator
from app.utils.calculation_engine.market_value_c import MarketValueCCalculator from app.utils.calculation_engine.market_value_c import MarketValueCCalculator
from app.controllers.valuation import ValuationController from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
class FinalValueACalculator: class FinalValueACalculator:
@ -94,25 +92,17 @@ class FinalValueACalculator:
""" """
import time import time
start_time = time.time() start_time = time.time()
step_order = 1
# 记录输入参数
logger.info("final_value_a.calculation_start input_data_keys={} model_data_keys={} market_data_keys={}", logger.info("final_value_a.calculation_start input_data_keys={} model_data_keys={} market_data_keys={}",
list(input_data.keys()), list(input_data.keys()),
list(input_data.get('model_data', {}).keys()), list(input_data.get('model_data', {}).keys()),
list(input_data.get('market_data', {}).keys())) list(input_data.get('market_data', {}).keys()))
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, "FINAL_A",
step_order=step_order, status="processing",
step_name="开始计算最终估值A", input_params=input_data,
step_description="接收输入参数,准备开始计算。",
input_params=input_data,
status="processing"
)
) )
step_order += 1
try: try:
# 详细记录模型数据参数 # 详细记录模型数据参数
@ -178,19 +168,6 @@ class FinalValueACalculator:
int(model_duration * 1000), int(model_duration * 1000),
list(model_result.keys())) list(model_result.keys()))
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算模型估值B",
step_description="调用ModelValueBCalculator计算模型估值B。",
input_params=input_data.get('model_data', {}),
output_result=model_result,
status="completed"
)
)
step_order += 1
# 计算市场估值C # 计算市场估值C
logger.info("final_value_a.calculating_market_value_c 开始计算市场估值C") logger.info("final_value_a.calculating_market_value_c 开始计算市场估值C")
market_start_time = time.time() market_start_time = time.time()
@ -208,19 +185,6 @@ class FinalValueACalculator:
int(market_duration * 1000), int(market_duration * 1000),
input_data['market_data']) input_data['market_data'])
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算市场估值C",
step_description="调用MarketValueCCalculator计算市场估值C。",
input_params=input_data.get('market_data', {}),
output_result=market_result,
status="completed"
)
)
step_order += 1
# 计算最终估值A # 计算最终估值A
logger.info("final_value_a.calculating_final_value_a 开始计算最终估值A: 模型估值B={}万元 市场估值C={}万元", logger.info("final_value_a.calculating_final_value_a 开始计算最终估值A: 模型估值B={}万元 市场估值C={}万元",
model_value_b, market_value_c) model_value_b, market_value_c)
@ -240,16 +204,18 @@ class FinalValueACalculator:
int(model_duration * 1000), int(model_duration * 1000),
int(market_duration * 1000)) int(market_duration * 1000))
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, "FINAL_A",
step_order=step_order, status="completed",
step_name="计算最终估值A", output_result={
step_description="最终估值A = 模型估值B × 0.7 + 市场估值C × 0.3", "model_value_b": model_value_b,
input_params={"model_value_b": model_value_b, "market_value_c": market_value_c}, "market_value_c": market_value_c,
output_result={"final_value_a": final_value_a}, "final_value_ab": final_value_a,
status="completed" "model_duration_ms": int(model_duration * 1000),
) "market_duration_ms": int(market_duration * 1000),
"total_duration_ms": int(total_duration * 1000),
},
) )
return { return {
"model_value_b": model_value_b, "model_value_b": model_value_b,
@ -259,15 +225,11 @@ class FinalValueACalculator:
except Exception as e: except Exception as e:
logger.error("final_value_a.calculation_failed 计算失败: 错误={}", str(e)) logger.error("final_value_a.calculation_failed 计算失败: 错误={}", str(e))
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, "FINAL_A",
step_order=step_order, status="failed",
step_name="计算失败", error_message=str(e),
step_description="计算过程中发生错误。",
status="failed",
error_message=str(e)
)
) )
raise raise

View File

@ -13,13 +13,11 @@ try:
from ..economic_value_b1.economic_value_b1 import EconomicValueB1Calculator from ..economic_value_b1.economic_value_b1 import EconomicValueB1Calculator
from ..cultural_value_b2.cultural_value_b2 import CulturalValueB2Calculator from ..cultural_value_b2.cultural_value_b2 import CulturalValueB2Calculator
from app.controllers.valuation import ValuationController from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
except ImportError: except ImportError:
# 绝对导入(当直接运行时) # 绝对导入(当直接运行时)
from app.utils.calculation_engine.economic_value_b1.economic_value_b1 import EconomicValueB1Calculator from app.utils.calculation_engine.economic_value_b1.economic_value_b1 import EconomicValueB1Calculator
from app.utils.calculation_engine.cultural_value_b2.cultural_value_b2 import CulturalValueB2Calculator from app.utils.calculation_engine.cultural_value_b2.cultural_value_b2 import CulturalValueB2Calculator
from app.controllers.valuation import ValuationController from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
class ModelValueBCalculator: class ModelValueBCalculator:
@ -74,18 +72,12 @@ class ModelValueBCalculator:
Raises: Raises:
Exception: 在计算过程中遇到的任何异常都会被捕获记录然后重新抛出 Exception: 在计算过程中遇到的任何异常都会被捕获记录然后重新抛出
""" """
step_order = 1 await self.valuation_controller.log_formula_step(
await self.valuation_controller.create_calculation_step( valuation_id,
ValuationCalculationStepCreate( "MODEL_B",
valuation_id=valuation_id, status="processing",
step_order=step_order, input_params=input_data,
step_name="开始计算模型估值B",
step_description="接收输入参数,准备开始计算。",
input_params=input_data,
status="processing"
)
) )
step_order += 1
current_stage = "初始化模型估值B参数" current_stage = "初始化模型估值B参数"
try: try:
@ -102,60 +94,27 @@ class ModelValueBCalculator:
# 计算经济价值B1传入估值ID并等待异步完成 # 计算经济价值B1传入估值ID并等待异步完成
current_stage = "经济价值B1计算" current_stage = "经济价值B1计算"
economic_value_b1 = await self.economic_value_calculator.calculate_complete_economic_value_b1( economic_result = await self.economic_value_calculator.calculate_complete_economic_value_b1(
valuation_id, valuation_id,
input_data['economic_data'] input_data['economic_data']
) )
await self.valuation_controller.create_calculation_step( economic_value_b1 = economic_result["economic_value_b1"]
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算经济价值B1",
step_description="调用EconomicValueB1Calculator计算经济价值B1。",
input_params=input_data.get('economic_data', {}),
output_result={"economic_value_b1": economic_value_b1},
status="completed"
)
)
step_order += 1
# 计算文化价值B2传入估值ID并等待异步完成 # 计算文化价值B2传入估值ID并等待异步完成
current_stage = "文化价值B2计算" current_stage = "文化价值B2计算"
cultural_value_b2 = await self.cultural_value_calculator.calculate_complete_cultural_value_b2( cultural_result = await self.cultural_value_calculator.calculate_complete_cultural_value_b2(
valuation_id, valuation_id,
input_data['cultural_data'] input_data['cultural_data']
) )
await self.valuation_controller.create_calculation_step( cultural_value_b2 = cultural_result["cultural_value_b2"]
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算文化价值B2",
step_description="调用CulturalValueB2Calculator计算文化价值B2。",
input_params=input_data.get('cultural_data', {}),
output_result={"cultural_value_b2": cultural_value_b2},
status="completed"
)
)
step_order += 1
# 计算风险调整系数B3传入估值ID并等待异步完成 # 计算风险调整系数B3传入估值ID并等待异步完成
current_stage = "风险调整系数B3计算" current_stage = "风险调整系数B3计算"
risk_value_b3 = await self.risk_adjustment_calculator.calculate_complete_risky_value_b3( risk_result = await self.risk_adjustment_calculator.calculate_complete_risky_value_b3(
valuation_id, valuation_id,
input_data['risky_data'] input_data['risky_data']
) )
await self.valuation_controller.create_calculation_step( risk_value_b3 = risk_result["risk_value_b3"]
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算风险调整系数B3",
step_description="调用RiskAdjustmentB3Calculator计算风险调整系数B3。",
input_params=input_data.get('risky_data', {}),
output_result={"risk_adjustment_b3": risk_value_b3},
status="completed"
)
)
step_order += 1
# 计算模型估值B # 计算模型估值B
current_stage = "模型估值B汇总" current_stage = "模型估值B汇总"
@ -164,33 +123,28 @@ class ModelValueBCalculator:
cultural_value_b2, cultural_value_b2,
risk_value_b3 risk_value_b3
) )
await self.valuation_controller.create_calculation_step( result = {
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算模型估值B",
step_description="模型估值B = 经济价值B1*0.7+文化价值B2*0.3*风险调整系数B3",
input_params={"economic_value_b1": economic_value_b1, "cultural_value_b2": cultural_value_b2, "risk_value_b3": risk_value_b3},
output_result={"model_value_b": model_value_b},
status="completed"
)
)
return {
"economic_value_b1": economic_value_b1, "economic_value_b1": economic_value_b1,
"cultural_value_b2": cultural_value_b2, "cultural_value_b2": cultural_value_b2,
"risk_value_b3": risk_value_b3, "risk_value_b3": risk_value_b3,
"model_value_b": model_value_b, "model_value_b": model_value_b,
"economic_details": economic_result,
"cultural_details": cultural_result,
"risk_details": risk_result,
} }
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B",
status="completed",
output_result=result,
)
return result
except Exception as e: except Exception as e:
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, "MODEL_B",
step_order=step_order, status="failed",
step_name="计算失败", error_message=f"{current_stage}失败: {e}",
step_description="计算过程中发生错误。",
status="failed",
error_message=f"{current_stage}失败: {e}"
)
) )
raise raise

View File

@ -0,0 +1,271 @@
"""
公式元数据注册表
用于将计算引擎中的每个公式节点含子公式映射到唯一的 code名称公式说明以及排序
以便在 valuation_calculation_steps 表中进行结构化记录并最终生成可读的计算报告
"""
from __future__ import annotations
from dataclasses import dataclass
from decimal import Decimal
from typing import Dict, List, Optional
@dataclass(frozen=True)
class FormulaMeta:
code: str
name: str
formula: str
order: Decimal
parent_code: Optional[str]
group_code: str
FormulaTreeNode = Dict[str, object]
def _node(
code: str,
name: str,
formula: str,
order: str,
children: Optional[List[FormulaTreeNode]] = None,
group: Optional[str] = None,
) -> FormulaTreeNode:
return {
"code": code,
"name": name,
"formula": formula,
"order": order,
"group_code": group,
"children": children or [],
}
FORMULA_TREE: List[FormulaTreeNode] = [
_node(
"FINAL_A",
"最终估值A",
"最终估值A = 模型估值B × 0.7 + 市场估值C × 0.3",
"10",
children=[
_node(
"MODEL_B",
"模型估值B",
"模型估值B = 经济价值B1 × 0.7 + 文化价值B2 × 0.3 × 风险调整系数B3",
"20",
group="MODEL_B",
children=[
_node(
"MODEL_B_ECON_B1",
"经济价值B1",
"经济价值B1 = 基础价值B11 × (1 + 流量因子B12) × 政策乘数B13",
"21",
children=[
_node(
"MODEL_B_ECON_B11",
"基础价值B11",
"基础价值B11 = 财务价值F × (0.45 + 0.05 × 行业系数I) + 法律强度L × (0.35 + 0.05 × 行业系数I) + 发展潜力D × 0.2",
"21.1",
children=[
_node(
"MODEL_B_ECON_B11_FINANCIAL_VALUE",
"财务价值F",
"财务价值F = [3年内年均收益 × (1 + 增长率)^5] ÷ 5",
"21.11",
),
_node(
"MODEL_B_ECON_B11_LEGAL_STRENGTH",
"法律强度L",
"法律强度L = 专利分 × 0.4 + 普及分 × 0.3 + 侵权分 × 0.3",
"21.12",
),
_node(
"MODEL_B_ECON_B11_DEVELOPMENT_POTENTIAL",
"发展潜力D",
"发展潜力D = 专利分 × 0.5 + ESG分 × 0.2 + 创新投入比 × 0.3",
"21.13",
),
_node(
"MODEL_B_ECON_B11_INDUSTRY_COEFFICIENT",
"行业系数I",
"行业系数I = (目标行业平均ROE - 基准行业ROE) ÷ 基准行业ROE",
"21.14",
),
],
),
_node(
"MODEL_B_FLOW_B12",
"流量因子B12",
"流量因子B12 = ln(S1 ÷ S2) × 0.3 + 社交媒体传播度S3 × 0.7",
"21.2",
children=[
_node(
"MODEL_B_FLOW_B12_INTERACTION_INDEX",
"互动量指数",
"互动量指数 = (点赞 + 评论 + 分享) ÷ 1000",
"21.21",
),
_node(
"MODEL_B_FLOW_B12_COVERAGE_INDEX",
"覆盖人群指数",
"覆盖人群指数 = 粉丝数 ÷ 10000",
"21.22",
),
_node(
"MODEL_B_FLOW_B12_CONVERSION_EFFICIENCY",
"转化效率",
"转化效率 = 商品链接点击量 ÷ 内容浏览量",
"21.23",
),
_node(
"MODEL_B_FLOW_B12_SOCIAL_SPREAD",
"社交媒体传播度S3",
"社交媒体传播度S3 = 互动量指数 × 0.4 + 覆盖人群指数 × 0.3 + 转化效率 × 0.3",
"21.24",
),
],
),
_node(
"MODEL_B_POLICY_B13",
"政策乘数B13",
"政策乘数B13 = 1 + 政策契合度评分P × 0.15,其中 P = 政策匹配度 × 0.4 + 实施阶段评分 × 0.3 + 资金支持度 × 0.3",
"21.3",
),
],
),
_node(
"MODEL_B_CULTURAL_B2",
"文化价值B2",
"文化价值B2 = 活态传承系数B21 × 0.6 + (纹样基因值B22 ÷ 10) × 0.4",
"22",
children=[
_node(
"MODEL_B_CULTURAL_B21",
"活态传承系数B21",
"活态传承系数B21 = 传承人等级系数 × 0.4 + 教学传播频次 × 0.3 + 跨界合作深度 × 0.3",
"22.1",
children=[
_node(
"MODEL_B_CULTURAL_B21_TEACHING_FREQ",
"教学传播频次",
"教学传播频次 = 线下传习次数 × 0.6 + 线上课程点击量(万) × 0.4",
"22.11",
),
],
),
_node(
"MODEL_B_CULTURAL_B22",
"纹样基因值B22",
"纹样基因值B22 = (结构复杂度SC × 0.6 + 归一化信息熵H × 0.4) × 历史传承度HI × 10",
"22.2",
),
],
),
_node(
"MODEL_B_RISK_B3",
"风险调整系数B3",
"风险调整系数B3 = 0.8 + 风险评分总和R × 0.4,其中 R = 市场风险 × 0.3 + 法律风险 × 0.4 + 传承风险 × 0.3",
"23",
children=[
_node(
"MODEL_B_RISK_B3_MARKET",
"市场风险",
"市场风险依据价格波动率:波动率 ≤5% 计10分5-15%计5分>15%计0分",
"23.1",
),
_node(
"MODEL_B_RISK_B3_LEGAL",
"法律风险",
"法律风险根据诉讼状态评分(无诉讼/已解决/未解决)",
"23.2",
),
_node(
"MODEL_B_RISK_B3_INHERITANCE",
"传承风险",
"传承风险依据传承人年龄≤50岁10分50-70岁5分>70岁0分取最高分",
"23.3",
),
],
),
],
),
_node(
"MARKET_C",
"市场估值C",
"市场估值C = 市场竞价C1 × 热度系数C2 × 稀缺性乘数C3 × 时效性衰减C4",
"30",
group="MARKET_C",
children=[
_node(
"MARKET_C_C1",
"市场竞价C1",
"市场竞价C1 结合历史交易价格、人工竞价与专家估值的加权结果",
"30.1",
),
_node(
"MARKET_C_C2",
"热度系数C2",
"热度系数C2 = 1 + 浏览热度分(依据日均浏览量与收藏数量)",
"30.2",
),
_node(
"MARKET_C_C3",
"稀缺性乘数C3",
"稀缺性乘数C3 = 1 + 稀缺等级分",
"30.3",
),
_node(
"MARKET_C_C4",
"时效性衰减C4",
"时效性衰减C4 依据距最近市场活动天数的衰减系数",
"30.4",
),
],
),
],
)
]
def _build_index() -> Dict[str, FormulaMeta]:
index: Dict[str, FormulaMeta] = {}
def dfs(nodes: List[FormulaTreeNode], parent_code: Optional[str], group_code: Optional[str]):
for node in nodes:
code = node["code"]
name = node["name"]
formula = node["formula"]
order = Decimal(str(node["order"]))
explicit_group = node.get("group_code")
if explicit_group:
current_group = explicit_group
elif parent_code is None:
current_group = code
else:
current_group = group_code or parent_code
meta = FormulaMeta(
code=code,
name=name,
formula=formula,
order=order,
parent_code=parent_code,
group_code=current_group,
)
index[code] = meta
dfs(node.get("children", []), code, current_group)
dfs(FORMULA_TREE, None, None)
return index
FORMULA_INDEX: Dict[str, FormulaMeta] = _build_index()
def get_formula_meta(code: str) -> FormulaMeta:
meta = FORMULA_INDEX.get(code)
if not meta:
raise KeyError(f"公式编码未注册: {code}")
return meta

View File

@ -20,7 +20,6 @@ try:
from .sub_formulas.temporal_decay_c4 import TemporalDecayC4Calculator from .sub_formulas.temporal_decay_c4 import TemporalDecayC4Calculator
from .market_data_analyzer import market_data_analyzer from .market_data_analyzer import market_data_analyzer
from app.controllers.valuation import ValuationController from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
except ImportError: except ImportError:
# 绝对导入(当直接运行时) # 绝对导入(当直接运行时)
from sub_formulas.market_bidding_c1 import MarketBiddingC1Calculator from sub_formulas.market_bidding_c1 import MarketBiddingC1Calculator
@ -29,7 +28,6 @@ except ImportError:
from sub_formulas.temporal_decay_c4 import TemporalDecayC4Calculator from sub_formulas.temporal_decay_c4 import TemporalDecayC4Calculator
from market_data_analyzer import market_data_analyzer from market_data_analyzer import market_data_analyzer
from app.controllers.valuation import ValuationController from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -103,7 +101,7 @@ class MarketValueCCalculator:
return market_value / 10000.0 return market_value / 10000.0
async def calculate_complete_market_value_c(self, valuation_id: int, input_data: Dict) -> float: async def calculate_complete_market_value_c(self, valuation_id: int, input_data: Dict) -> Dict[str, float]:
""" """
计算完整的市场估值C并记录每一步的计算过程 计算完整的市场估值C并记录每一步的计算过程
@ -124,20 +122,16 @@ class MarketValueCCalculator:
} }
Returns: Returns:
float: 计算得出的市场估值C Dict[str, float]: 包含市场估值C及子公式结果的字典
Raises: Raises:
Exception: 如果在计算过程中发生任何错误将记录失败状态并重新抛出异常 Exception: 如果在计算过程中发生任何错误将记录失败状态并重新抛出异常
""" """
step = await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, "MARKET_C",
step_order=3, status="processing",
step_name="市场估值C计算", input_params=input_data,
step_description="开始计算市场估值C公式为市场竞价C1 × 热度系数C2 × 稀缺性乘数C3 × 时效性衰减C4",
input_params=input_data,
status="in_progress"
)
) )
try: try:
# 计算市场竞价C1 # 计算市场竞价C1
@ -146,11 +140,16 @@ class MarketValueCCalculator:
manual_bids=input_data.get('manual_bids', []), manual_bids=input_data.get('manual_bids', []),
expert_valuations=input_data.get('expert_valuations', []) expert_valuations=input_data.get('expert_valuations', [])
) )
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, step_order=3.1, step_name="市场竞价C1", "MARKET_C_C1",
output_result={'market_bidding_c1': market_bidding_c1}, status="completed" status="completed",
) input_params={
"weighted_average_price": input_data.get('weighted_average_price'),
"manual_bids": input_data.get('manual_bids'),
"expert_valuations": input_data.get('expert_valuations'),
},
output_result={'market_bidding_c1': market_bidding_c1},
) )
# 计算热度系数C2 # 计算热度系数C2
@ -158,33 +157,39 @@ class MarketValueCCalculator:
input_data.get('daily_browse_volume', 500.0), input_data.get('daily_browse_volume', 500.0),
input_data.get('collection_count', 50) input_data.get('collection_count', 50)
) )
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, step_order=3.2, step_name="热度系数C2", "MARKET_C_C2",
output_result={'heat_coefficient_c2': heat_coefficient_c2}, status="completed" status="completed",
) input_params={
"daily_browse_volume": input_data.get('daily_browse_volume'),
"collection_count": input_data.get('collection_count'),
},
output_result={'heat_coefficient_c2': heat_coefficient_c2},
) )
# 计算稀缺性乘数C3 # 计算稀缺性乘数C3
scarcity_multiplier_c3 = self.scarcity_multiplier_calculator.calculate_scarcity_multiplier_c3( scarcity_multiplier_c3 = self.scarcity_multiplier_calculator.calculate_scarcity_multiplier_c3(
input_data.get('issuance_level', '限量') input_data.get('issuance_level', '限量')
) )
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, step_order=3.3, step_name="稀缺性乘数C3", "MARKET_C_C3",
output_result={'scarcity_multiplier_c3': scarcity_multiplier_c3}, status="completed" status="completed",
) input_params={'issuance_level': input_data.get('issuance_level')},
output_result={'scarcity_multiplier_c3': scarcity_multiplier_c3},
) )
# 计算时效性衰减C4 # 计算时效性衰减C4
temporal_decay_c4 = self.temporal_decay_calculator.calculate_temporal_decay_c4( temporal_decay_c4 = self.temporal_decay_calculator.calculate_temporal_decay_c4(
input_data.get('recent_market_activity', '2024-01-15') input_data.get('recent_market_activity', '2024-01-15')
) )
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, step_order=3.4, step_name="时效性衰减C4", "MARKET_C_C4",
output_result={'temporal_decay_c4': temporal_decay_c4}, status="completed" status="completed",
) input_params={'recent_market_activity': input_data.get('recent_market_activity')},
output_result={'temporal_decay_c4': temporal_decay_c4},
) )
# 计算市场估值C # 计算市场估值C
@ -194,16 +199,29 @@ class MarketValueCCalculator:
scarcity_multiplier_c3, scarcity_multiplier_c3,
temporal_decay_c4 temporal_decay_c4
) )
await self.valuation_controller.update_calculation_step( result = {
step.id, {"status": "completed", "output_result": {"market_value_c": market_value_c}} "market_value_c": market_value_c,
"market_bidding_c1": market_bidding_c1,
"heat_coefficient_c2": heat_coefficient_c2,
"scarcity_multiplier_c3": scarcity_multiplier_c3,
"temporal_decay_c4": temporal_decay_c4,
}
await self.valuation_controller.log_formula_step(
valuation_id,
"MARKET_C",
status="completed",
output_result=result,
) )
return market_value_c return result
except Exception as e: except Exception as e:
error_message = f"市场估值C计算失败: {e}" error_message = f"市场估值C计算失败: {e}"
logger.error(error_message, exc_info=True) logger.error(error_message, exc_info=True)
await self.valuation_controller.update_calculation_step( await self.valuation_controller.log_formula_step(
step.id, {"status": "failed", "error_message": error_message} valuation_id,
"MARKET_C",
status="failed",
error_message=str(e),
) )
raise raise

View File

@ -15,9 +15,7 @@ sys.path.append(os.path.join(current_dir, '..', '..', '..', '..'))
try: try:
from app.controllers.valuation import ValuationController from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
except ImportError: except ImportError:
# 处理可能的导入错误
pass pass
class RiskAdjustmentB3Calculator: class RiskAdjustmentB3Calculator:
@ -167,7 +165,7 @@ class RiskAdjustmentB3Calculator:
return max_score return max_score
async def calculate_complete_risky_value_b3(self, valuation_id: int, input_data: Dict) -> float: async def calculate_complete_risky_value_b3(self, valuation_id: int, input_data: Dict) -> Dict[str, float]:
""" """
计算完整的风险调整系数B3并记录所有计算步骤 计算完整的风险调整系数B3并记录所有计算步骤
@ -187,43 +185,45 @@ class RiskAdjustmentB3Calculator:
} }
Returns: Returns:
float: 计算得出的风险调整系数B3 Dict[str, float]: 包含各项风险评分和风险调整系数的字典
Raises: Raises:
Exception: 在计算过程中遇到的任何异常都会被捕获记录并重新抛出 Exception: 在计算过程中遇到的任何异常都会被捕获记录并重新抛出
""" """
step = await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, "MODEL_B_RISK_B3",
step_order=2.3, status="processing",
step_name="风险调整系数B3计算", input_params=input_data,
step_description="开始计算风险调整系数B3公式为0.8 + 风险评分总和R × 0.4",
input_params=input_data,
status="in_progress"
)
) )
try: try:
# 计算各项风险评分 # 计算各项风险评分
market_risk = self.calculate_market_risk(input_data["highest_price"], input_data["lowest_price"]) market_risk = self.calculate_market_risk(input_data["highest_price"], input_data["lowest_price"])
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, step_order=2.31, step_name="市场风险评分", "MODEL_B_RISK_B3_MARKET",
output_result={'market_risk': market_risk}, status="completed" status="completed",
) input_params={
"highest_price": input_data.get("highest_price"),
"lowest_price": input_data.get("lowest_price"),
},
output_result={'market_risk': market_risk},
) )
legal_risk = self.calculate_legal_risk(input_data["lawsuit_status"]) legal_risk = self.calculate_legal_risk(input_data["lawsuit_status"])
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, step_order=2.32, step_name="法律风险评分", "MODEL_B_RISK_B3_LEGAL",
output_result={'legal_risk': legal_risk}, status="completed" status="completed",
) input_params={"lawsuit_status": input_data.get("lawsuit_status")},
output_result={'legal_risk': legal_risk},
) )
inheritance_risk = self.calculate_inheritance_risk(input_data["inheritor_ages"]) inheritance_risk = self.calculate_inheritance_risk(input_data["inheritor_ages"])
await self.valuation_controller.create_calculation_step( await self.valuation_controller.log_formula_step(
ValuationCalculationStepCreate( valuation_id,
valuation_id=valuation_id, step_order=2.33, step_name="传承风险评分", "MODEL_B_RISK_B3_INHERITANCE",
output_result={'inheritance_risk': inheritance_risk}, status="completed" status="completed",
) input_params={"inheritor_ages": input_data.get("inheritor_ages")},
output_result={'inheritance_risk': inheritance_risk},
) )
# 计算风险评分总和R # 计算风险评分总和R
@ -231,15 +231,27 @@ class RiskAdjustmentB3Calculator:
# 计算风险调整系数B3 # 计算风险调整系数B3
risk_adjustment_b3 = self.calculate_risk_adjustment_b3(risk_score_sum) risk_adjustment_b3 = self.calculate_risk_adjustment_b3(risk_score_sum)
await self.valuation_controller.update_calculation_step( result = {
step.id, {"status": "completed", "output_result": {'risk_adjustment_b3': risk_adjustment_b3}} "risk_value_b3": risk_adjustment_b3,
"risk_score_sum": risk_score_sum,
"market_risk": market_risk,
"legal_risk": legal_risk,
"inheritance_risk": inheritance_risk,
}
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_RISK_B3",
status="completed",
output_result=result,
) )
return risk_adjustment_b3 return result
except Exception as e: except Exception as e:
error_message = f"风险调整系数B3计算失败: {e}" await self.valuation_controller.log_formula_step(
await self.valuation_controller.update_calculation_step( valuation_id,
step.id, {"status": "failed", "error_message": error_message} "MODEL_B_RISK_B3",
status="failed",
error_message=str(e),
) )
raise raise