up repo md

This commit is contained in:
dubingyan666 2025-11-26 17:28:51 +08:00
parent df7ff7af23
commit d347f1b4c9
20 changed files with 1338 additions and 365 deletions

View File

@ -203,7 +203,7 @@ async def _perform_valuation_calculation(user_id: int, data: UserValuationCreate
},
'market_data': list(input_data.get('market_data', {}).keys()),
},
status='pending'
status='success'
)
result = await valuation_controller.update(valuation_id, update_data)
logger.info(

View File

@ -106,7 +106,7 @@ async def delete_invoice(id: int = Query(...)):
@invoice_router.post("/update-status", summary="更新发票状态", response_model=BasicResponse[InvoiceOut], dependencies=[DependAuth, DependPermission])
async def update_invoice_status(data: UpdateStatus):
"""
更新发票状态pending|invoiced|refunded
更新发票状态pending|invoiced|rejected|refunded
"""
out = await invoice_controller.update_status(data)
return Success(data=out or {}, msg="更新成功" if out else "未找到")

View File

@ -60,6 +60,33 @@ async def get_valuation_steps(valuation_id: int):
return Success(data=steps_out, msg="获取计算步骤成功")
@valuations_router.get("/{valuation_id}/report", summary="获取估值计算报告Markdown格式")
async def get_valuation_report(valuation_id: int):
"""
根据估值ID生成计算过程的 Markdown 报告
返回格式化的 Markdown 文档包含
- 估值基本信息
- 计算结果摘要
- 详细计算过程按公式层级组织
- 每个公式的输入参数输出结果状态等信息
"""
try:
markdown = await valuation_controller.get_calculation_report_markdown(valuation_id)
from fastapi import Response
return Response(
content=markdown,
media_type="text/markdown; charset=utf-8",
headers={
"Content-Disposition": f'attachment; filename="valuation_report_{valuation_id}.md"'
}
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"生成报告失败: {str(e)}")
@valuations_router.put("/{valuation_id}", summary="更新估值评估", response_model=BasicResponse[ValuationAssessmentOut])
async def update_valuation(valuation_id: int, data: ValuationAssessmentUpdate):
"""更新估值评估记录"""
@ -92,8 +119,6 @@ async def get_valuations(
submitted_end: Optional[str] = Query(None, description="提交时间结束毫秒或ISO"),
audited_start: Optional[str] = Query(None, description="审核时间开始证书修改时间毫秒或ISO"),
audited_end: Optional[str] = Query(None, description="审核时间结束证书修改时间毫秒或ISO"),
create_start: Optional[str] = Query(None, description="创建时间开始毫秒或ISO"),
create_end: Optional[str] = Query(None, description="创建时间结束毫秒或ISO"),
page: int = Query(1, ge=1, description="页码"),
size: int = Query(10, ge=1, le=100, description="每页数量")
):
@ -110,8 +135,6 @@ async def get_valuations(
submitted_end=submitted_end,
audited_start=audited_start,
audited_end=audited_end,
create_start=create_start,
create_end=create_end,
page=page,
size=size
)

View File

@ -22,7 +22,7 @@ class UserValuationController:
"""用户创建估值评估"""
valuation_data = data.model_dump()
valuation_data['user_id'] = user_id
valuation_data['status'] = status
valuation_data['status'] = "success" # 根据计算结果显示设置状态
# 添加计算结果到数据库
if calculation_result:

View File

@ -1,4 +1,5 @@
from typing import List, Optional
import json
from typing import Any, Dict, List, Optional
from tortoise.expressions import Q
from tortoise.queryset import QuerySet
from tortoise.functions import Count
@ -14,6 +15,7 @@ from app.schemas.valuation import (
ValuationCalculationStepOut
)
from app.models.user import AppUser
from app.utils.calculation_engine.formula_registry import get_formula_meta
class ValuationController:
@ -41,6 +43,152 @@ class ValuationController:
)
return ValuationCalculationStepOut.model_validate(step)
async def log_formula_step(
self,
valuation_id: int,
formula_code: str,
*,
status: str = "processing",
input_params: Optional[Dict[str, Any]] = None,
output_result: Optional[Dict[str, Any]] = None,
error_message: Optional[str] = None,
step_description: Optional[str] = None,
duration_ms: Optional[int] = None,
) -> ValuationCalculationStepOut:
"""
幂等记录或更新某个公式节点的计算过程
"""
meta = get_formula_meta(formula_code)
description = step_description or meta.formula
create_payload: Dict[str, Any] = {
"valuation_id": valuation_id,
"formula_code": meta.code,
"formula_name": meta.name,
"formula_text": meta.formula,
"parent_formula_code": meta.parent_code,
"group_code": meta.group_code,
"step_order": meta.order,
"step_name": meta.name,
"step_description": description,
"status": status,
}
if input_params is not None:
create_payload["input_params"] = input_params
if output_result is not None:
create_payload["output_result"] = output_result
if error_message is not None:
create_payload["error_message"] = error_message
# 准备更新字段
update_fields: Dict[str, Any] = {
"status": status,
"step_description": description,
"formula_name": meta.name,
"formula_text": meta.formula,
"parent_formula_code": meta.parent_code,
"group_code": meta.group_code,
"step_order": meta.order,
"step_name": meta.name,
}
if input_params is not None:
update_fields["input_params"] = input_params
if output_result is not None:
update_fields["output_result"] = output_result
if error_message is not None:
update_fields["error_message"] = error_message
if duration_ms is not None:
result = update_fields.get("output_result") or {}
if not isinstance(result, dict):
result = {}
result["duration_ms"] = duration_ms
update_fields["output_result"] = result
# 先尝试查询是否存在(明确排除 formula_code 为 NULL 的情况)
step = await self.step_model.filter(
valuation_id=valuation_id,
formula_code=meta.code
).first()
# 如果没找到,再检查是否有 formula_code 为 NULL 的旧记录(不应该有,但为了安全)
if not step and meta.code:
# 检查是否有重复的旧记录formula_code 为 NULL
old_steps = await self.step_model.filter(
valuation_id=valuation_id,
formula_code__isnull=True
).all()
if old_steps:
logger.warning(
"calcstep.log_formula found old records with NULL formula_code: valuation_id={} count={}",
valuation_id,
len(old_steps),
)
logger.info(
"calcstep.log_formula query: valuation_id={} formula_code={} found={}",
valuation_id,
meta.code,
step is not None,
)
if step:
# 更新现有记录
await step.update_from_dict(update_fields).save()
logger.info(
"calcstep.log_formula updated valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
else:
# 尝试创建新记录
if duration_ms is not None:
result = create_payload.setdefault("output_result", {}) or {}
if not isinstance(result, dict):
result = {}
result["duration_ms"] = duration_ms
create_payload["output_result"] = result
try:
step = await self.step_model.create(**create_payload)
logger.info(
"calcstep.log_formula created valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
except Exception as e:
# 如果因为唯一约束冲突而失败(可能是并发插入),重新查询并更新
error_str = str(e).lower()
if "duplicate" in error_str or "unique" in error_str or "1062" in error_str:
logger.warning(
"calcstep.log_formula duplicate key detected, retrying query: {}",
str(e),
)
# 重新查询(可能已被其他请求插入)
step = await self.step_model.filter(
valuation_id=valuation_id,
formula_code=meta.code
).first()
if step:
# 更新刚插入的记录
await step.update_from_dict(update_fields).save()
logger.info(
"calcstep.log_formula updated after duplicate key: valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
else:
# 如果还是找不到,记录错误但继续
logger.error(
"calcstep.log_formula failed to find record after duplicate key error: valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
raise
else:
# 其他错误直接抛出
raise
return ValuationCalculationStepOut.model_validate(step)
async def update_calculation_step(self, step_id: int, update: dict) -> ValuationCalculationStepOut:
step = await self.step_model.filter(id=step_id).first()
if not step:
@ -71,6 +219,247 @@ class ValuationController:
logger.info("calcstep.list valuation_id={} count={}", valuation_id, len(steps))
return [ValuationCalculationStepOut.model_validate(step) for step in steps]
async def get_calculation_report_markdown(self, valuation_id: int) -> str:
"""
根据估值ID生成计算过程的 Markdown 报告
此方法会查询所有相关的计算步骤按照公式的层级关系组织
并生成格式化的 Markdown 文档包含
- 公式名称和说明
- 输入参数
- 输出结果
- 计算状态
- 错误信息如果有
Args:
valuation_id (int): 估值的唯一标识符
Returns:
str: Markdown 格式的计算报告
Raises:
ValueError: 如果找不到对应的估值记录
"""
# 验证估值记录是否存在
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
raise ValueError(f"估值记录不存在: {valuation_id}")
# 获取所有计算步骤
steps = await self.step_model.filter(valuation_id=valuation_id).order_by('step_order')
if not steps:
return f"# 估值计算报告\n\n**估值ID**: {valuation_id}\n\n**资产名称**: {valuation.asset_name}\n\n> 暂无计算步骤记录。\n"
# 转换为字典列表,便于处理
steps_data = []
for step in steps:
step_dict = ValuationCalculationStepOut.model_validate(step).model_dump()
steps_data.append(step_dict)
# 构建公式树形结构
formula_tree = self._build_formula_tree(steps_data)
# 生成 Markdown
markdown = self._generate_markdown(valuation, formula_tree)
logger.info("calcstep.report_markdown generated valuation_id={} steps_count={}", valuation_id, len(steps_data))
return markdown
def _build_formula_tree(self, steps: List[Dict]) -> Dict:
"""
构建公式的树形结构
Args:
steps: 计算步骤列表
Returns:
Dict: 树形结构的字典key formula_codevalue 为步骤数据和子节点
"""
# 按 formula_code 索引
step_map = {}
for step in steps:
code = step.get('formula_code')
if code:
step_map[code] = step
# 构建树形结构
tree = {}
processed = set()
# 第一遍:创建所有节点
for step in steps:
code = step.get('formula_code')
if not code or code in processed:
continue
node = {
'step': step,
'children': []
}
tree[code] = node
processed.add(code)
# 第二遍:建立父子关系
root_nodes = []
for step in steps:
code = step.get('formula_code')
if not code:
continue
parent_code = step.get('parent_formula_code')
node = tree[code]
if parent_code and parent_code in tree:
# 有父节点,添加到父节点的 children
tree[parent_code]['children'].append(node)
else:
# 根节点
root_nodes.append(node)
# 按 step_order 排序
def sort_nodes(nodes):
nodes.sort(key=lambda n: float(n['step'].get('step_order', 0)))
for node in nodes:
if node['children']:
sort_nodes(node['children'])
sort_nodes(root_nodes)
return {'roots': root_nodes, 'all': tree}
def _generate_markdown(self, valuation, formula_tree: Dict) -> str:
"""
生成 Markdown 格式的报告
Args:
valuation: 估值评估对象
formula_tree: 公式树形结构
Returns:
str: Markdown 格式的字符串
"""
lines = []
# 标题和基本信息
lines.append("# 估值计算报告")
lines.append("")
lines.append("## 基本信息")
lines.append("")
lines.append("| 字段 | 值 |")
lines.append("|------|----|")
lines.append(f"| 估值ID | {valuation.id} |")
lines.append(f"| 资产名称 | {valuation.asset_name} |")
lines.append(f"| 所属机构 | {valuation.institution} |")
lines.append(f"| 所属行业 | {valuation.industry} |")
heritage_value = valuation.heritage_level if valuation.heritage_level else "-"
lines.append(f"| 非遗等级 | {heritage_value} |")
created_at_str = valuation.created_at.strftime("%Y-%m-%d %H:%M:%S") if valuation.created_at else "N/A"
lines.append(f"| 创建时间 | {created_at_str} |")
lines.append("")
# 计算结果摘要
if valuation.final_value_ab is not None:
lines.append("## 计算结果摘要")
lines.append("")
lines.append("| 项目 | 数值(万元) |")
lines.append("|------|-------------|")
if valuation.model_value_b is not None:
lines.append(f"| 模型估值B | {valuation.model_value_b:.2f} |")
if valuation.market_value_c is not None:
lines.append(f"| 市场估值C | {valuation.market_value_c:.2f} |")
lines.append(f"| **最终估值AB** | **{valuation.final_value_ab:.2f}** |")
if valuation.dynamic_pledge_rate is not None:
lines.append(f"| 动态质押率 | {valuation.dynamic_pledge_rate:.4f} |")
lines.append("")
# 详细计算过程
lines.append("## 详细计算过程")
lines.append("")
def _format_json_block(value: Any, indent_prefix: str = "") -> List[str]:
"""格式化 JSON 代码块"""
json_text = json.dumps(value, ensure_ascii=False, indent=2)
block_lines = [f"{indent_prefix}```json"]
for line in json_text.splitlines():
block_lines.append(f"{indent_prefix}{line}")
block_lines.append(f"{indent_prefix}```")
return block_lines
# 递归生成公式树
heading_levels = ["####", "#####", "######", "######", "######"]
def render_node(node: Dict, level: int = 0, prefix: str = ""):
step = node['step']
heading = heading_levels[min(level, len(heading_levels) - 1)]
name = step.get('formula_name', step.get('step_name', '未知'))
formula_text = step.get('formula_text', step.get('step_description', ''))
status = step.get('status', 'unknown')
input_params = step.get('input_params')
output_result = step.get('output_result')
error_message = step.get('error_message')
duration_ms = None
if output_result and isinstance(output_result, dict):
duration_ms = output_result.get('duration_ms')
lines.append(f"{heading} {prefix}{name}")
lines.append("")
# 公式说明
if formula_text:
if level == 0:
lines.append(f"> {formula_text}")
lines.append("")
else:
lines.append("计算公式:")
lines.append(f"`{formula_text}`")
lines.append("")
# 状态和耗时
status_label = {
'processing': '计算中',
'completed': '已完成',
'failed': '计算失败'
}.get(status, status)
lines.append(f"**状态**: {status_label}")
if duration_ms is not None:
lines.append(f"**耗时**: {duration_ms}ms")
lines.append("")
# 输入参数
if input_params:
lines.append("**输入参数**:")
lines.extend(_format_json_block(input_params, ""))
lines.append("")
# 输出结果
if output_result:
# 移除 duration_ms因为已经在状态中显示了
result_display = {k: v for k, v in output_result.items() if k != 'duration_ms'}
if result_display:
lines.append("**输出结果**:")
lines.extend(_format_json_block(result_display, ""))
lines.append("")
# 错误信息
if error_message:
lines.append(f"> ⚠️ **错误**: {error_message}")
lines.append("")
# 子节点
children = node.get('children', [])
if children:
for idx, child in enumerate(children, start=1):
lines.append("")
child_prefix = f"{prefix}{idx}."
render_node(child, level + 1, child_prefix)
lines.append("")
# 渲染所有根节点
for idx, root in enumerate(formula_tree['roots'], start=1):
prefix = f"{idx}."
render_node(root, 0, prefix)
return "\n".join(lines)
async def create(self, data: ValuationAssessmentCreate, user_id: int) -> ValuationAssessmentOut:
"""创建估值评估"""
# 将用户ID添加到数据中
@ -159,17 +548,9 @@ class ValuationController:
if query.is_active is not None:
queryset = queryset.filter(is_active=query.is_active)
# 添加状态筛选(支持 completed 聚合以及旧值兼容)
# 添加状态筛选
if hasattr(query, 'status') and query.status:
s = query.status
if s == 'completed':
queryset = queryset.filter(status__in=['success'])
elif s == 'approved':
queryset = queryset.filter(status='success')
elif s == 'rejected':
queryset = queryset.filter(status='fail')
else:
queryset = queryset.filter(status=s)
queryset = queryset.filter(status=query.status)
if getattr(query, 'phone', None):
queryset = queryset.filter(user__phone__icontains=query.phone)
@ -188,8 +569,8 @@ class ValuationController:
except Exception:
return None
s_dt = _parse_time(getattr(query, 'submitted_start', None) or getattr(query, 'create_start', None))
e_dt = _parse_time(getattr(query, 'submitted_end', None) or getattr(query, 'create_end', None))
s_dt = _parse_time(getattr(query, 'submitted_start', None))
e_dt = _parse_time(getattr(query, 'submitted_end', None))
if s_dt:
queryset = queryset.filter(created_at__gte=s_dt)
if e_dt:

View File

@ -284,7 +284,90 @@ async def init_apis():
await api_controller.refresh_api()
async def _ensure_unique_index():
"""确保 valuation_calculation_steps 表的唯一索引存在"""
try:
conn_alias = settings.TORTOISE_ORM["apps"]["models"]["default_connection"]
from tortoise import connections
conn = connections.get(conn_alias)
# 检查表是否存在
result = await conn.execute_query(
"SHOW TABLES LIKE 'valuation_calculation_steps'"
)
if not result or len(result[1]) == 0:
logger.info("Table valuation_calculation_steps does not exist, skipping index check")
return
# 检查唯一索引是否存在
# 查找包含 valuation_id 和 formula_code 的唯一索引
index_result = await conn.execute_query(
"SHOW INDEX FROM `valuation_calculation_steps` WHERE Non_unique = 0 AND Column_name IN ('valuation_id', 'formula_code')"
)
# 查找是否存在 (valuation_id, formula_code) 的唯一索引
# 对于复合索引SHOW INDEX 会返回多行,每行对应一个列
# 需要检查是否有同一个 Key_name 包含两个列
has_unique_index = False
if index_result and len(index_result) > 1:
# 按 Key_name 分组
index_groups = {}
for row in index_result[1]:
if len(row) >= 5:
key_name = row[2] if len(row) > 2 else ""
non_unique = row[1] if len(row) > 1 else 1
column_name = row[4] if len(row) > 4 else ""
seq_in_index = row[3] if len(row) > 3 else 0
if non_unique == 0 and column_name in ('valuation_id', 'formula_code'):
if key_name not in index_groups:
index_groups[key_name] = []
index_groups[key_name].append(column_name)
# 检查是否有索引包含两个列
for key_name, columns in index_groups.items():
if 'valuation_id' in columns and 'formula_code' in columns:
has_unique_index = True
logger.debug(f"Found unique index: {key_name} on (valuation_id, formula_code)")
break
if not has_unique_index:
logger.warning("Unique index on (valuation_id, formula_code) not found, attempting to create...")
try:
# 先删除可能存在的重复记录
await conn.execute_query("""
DELETE t1 FROM `valuation_calculation_steps` t1
INNER JOIN `valuation_calculation_steps` t2
WHERE t1.id > t2.id
AND t1.valuation_id = t2.valuation_id
AND t1.formula_code = t2.formula_code
AND t1.formula_code IS NOT NULL
""")
logger.info("Cleaned up duplicate records")
# 创建唯一索引
await conn.execute_query("""
CREATE UNIQUE INDEX `uidx_valuation_formula`
ON `valuation_calculation_steps` (`valuation_id`, `formula_code`)
""")
logger.info("Created unique index on (valuation_id, formula_code)")
except Exception as idx_err:
error_str = str(idx_err).lower()
if "duplicate key name" in error_str or "already exists" in error_str:
logger.info("Unique index already exists (different name)")
else:
logger.warning(f"Failed to create unique index: {idx_err}")
else:
logger.debug("Unique index on (valuation_id, formula_code) already exists")
except Exception as e:
logger.warning(f"Failed to ensure unique index: {e}")
async def init_db():
import os
from pathlib import Path
from tortoise import Tortoise
from tortoise.exceptions import OperationalError
command = Command(tortoise_config=settings.TORTOISE_ORM)
try:
await command.init_db(safe=True)
@ -292,14 +375,84 @@ async def init_db():
pass
await command.init()
# 检查并清理可能冲突的迁移文件(避免交互式提示)
# Aerich 在检测到迁移文件已存在时会交互式提示,我们提前删除冲突文件
migrations_dir = Path("migrations/models")
if migrations_dir.exists():
# 查找包含 "update" 的迁移文件(通常是自动生成的冲突文件)
for migration_file in migrations_dir.glob("*update*.py"):
if migration_file.name != "__init__.py":
logger.info(f"Removing conflicting migration file: {migration_file.name}")
migration_file.unlink()
# 尝试执行 migrate
try:
await command.migrate()
except AttributeError:
logger.warning("unable to retrieve model history from database, model history will be created from scratch")
shutil.rmtree("migrations")
await command.init_db(safe=True)
except Exception as e:
# 如果 migrate 失败,记录警告但继续执行 upgrade
logger.warning(f"Migrate failed: {e}, continuing with upgrade...")
await command.upgrade(run_in_transaction=True)
# 在 upgrade 之前,先检查表是否存在,如果不存在则先创建表
try:
await command.upgrade(run_in_transaction=True)
# upgrade 成功后,验证并修复唯一索引
await _ensure_unique_index()
except (OperationalError, Exception) as e:
error_msg = str(e)
# 如果是因为表不存在而失败,先让 Tortoise 生成表结构
if "doesn't exist" in error_msg.lower() or ("table" in error_msg.lower() and "valuation_calculation_steps" in error_msg):
logger.warning(f"Table not found during upgrade: {error_msg}, generating schemas first...")
# 确保 Tortoise 已初始化Aerich 的 init 应该已经初始化了,但为了安全再检查)
try:
# 生成表结构safe=True 表示如果表已存在则跳过)
await Tortoise.generate_schemas(safe=True)
logger.info("Tables generated successfully, retrying upgrade...")
# 重新尝试 upgrade这次应该会成功因为表已经存在
try:
await command.upgrade(run_in_transaction=True)
except Exception as upgrade_err:
# 如果 upgrade 仍然失败,可能是迁移文件的问题,记录警告但继续
logger.warning(f"Upgrade still failed after generating schemas: {upgrade_err}, continuing anyway...")
except Exception as gen_err:
logger.error(f"Failed to generate schemas: {gen_err}")
raise
# 如果是重复字段错误,说明迁移已经执行过,直接跳过并确保索引
elif "duplicate column name" in error_msg.lower():
logger.warning(f"Duplicate column detected during upgrade: {error_msg}, skipping migration step and ensuring schema integrity...")
await _ensure_unique_index()
# 如果是重复索引错误,删除表并重新创建(最简单可靠的方法)
elif "duplicate key" in error_msg.lower() or "duplicate key name" in error_msg.lower():
logger.warning(f"Duplicate index detected: {error_msg}, dropping and recreating table...")
try:
# Aerich 的 command.init() 已经初始化了 Tortoise直接使用连接
# 连接别名是 "mysql"(从配置中读取)
conn_alias = settings.TORTOISE_ORM["apps"]["models"]["default_connection"]
from tortoise import connections
# 尝试获取连接,如果失败则重新初始化
try:
conn = connections.get(conn_alias)
except Exception:
# 如果连接不存在,重新初始化 Tortoise
await Tortoise.init(config=settings.TORTOISE_ORM)
conn = connections.get(conn_alias)
# 删除表
await conn.execute_query("DROP TABLE IF EXISTS `valuation_calculation_steps`")
logger.info("Dropped valuation_calculation_steps table")
# 重新生成表结构(包含正确的唯一索引)
# 使用 safe=True 避免尝试创建已存在的其他表(如 user_role只创建不存在的表
await Tortoise.generate_schemas(safe=True)
logger.info("Table regenerated successfully with correct unique index")
except Exception as recreate_err:
logger.error(f"Failed to recreate table: {recreate_err}")
raise
else:
raise
async def init_roles():

View File

@ -149,7 +149,8 @@ class HttpAuditLogMiddleware(BaseHTTPMiddleware):
try:
return json.loads(stripped)
except (ValueError, TypeError):
return stripped
# 将非 JSON 字符串包装为字典,以便 JSONField 能够正确存储
return {"text": stripped}
if isinstance(value, (dict, list, int, float, bool)):
return value

View File

@ -30,7 +30,7 @@ class Invoice(BaseModel, TimestampMixin):
register_phone = fields.CharField(max_length=32, description="注册电话")
bank_name = fields.CharField(max_length=128, description="开户银行")
bank_account = fields.CharField(max_length=64, description="银行账号")
status = fields.CharField(max_length=16, description="状态: pending|invoiced|refunded", index=True, default="pending")
status = fields.CharField(max_length=16, description="状态: pending|invoiced|rejected|refunded", index=True, default="pending")
app_user_id = fields.IntField(null=True, description="App用户ID", index=True)
header = fields.ForeignKeyField("models.InvoiceHeader", related_name="invoices", null=True, description="抬头关联")
wechat = fields.CharField(max_length=64, null=True, description="微信号", index=True)

View File

@ -82,7 +82,7 @@ class ValuationAssessment(Model):
# 系统字段
user = fields.ForeignKeyField("models.AppUser", related_name="valuations", description="提交用户")
status = fields.CharField(max_length=20, default="pending", description="评估状态: pending(待审核), approved(已完成)")
status = fields.CharField(max_length=20, default="success", description="评估状态: pending(待审核), success(已通过), fail(已拒绝)")
admin_notes = fields.TextField(null=True, description="管理员备注")
created_at = fields.DatetimeField(auto_now_add=True, description="创建时间")
updated_at = fields.DatetimeField(auto_now=True, description="更新时间")
@ -101,19 +101,28 @@ class ValuationCalculationStep(Model):
"""估值计算步骤模型"""
id = fields.IntField(pk=True, description="主键ID")
valuation = fields.ForeignKeyField("models.ValuationAssessment", related_name="calculation_steps", description="关联的估值评估")
formula_code = fields.CharField(max_length=64, null=True, description="公式编码")
formula_name = fields.CharField(max_length=255, null=True, description="公式名称")
formula_text = fields.TextField(null=True, description="公式说明")
parent_formula_code = fields.CharField(max_length=64, null=True, description="父级公式编码")
group_code = fields.CharField(max_length=64, null=True, description="分组编码")
step_order = fields.DecimalField(max_digits=8, decimal_places=3, description="步骤顺序")
step_name = fields.CharField(max_length=255, description="步骤名称")
step_description = fields.TextField(null=True, description="步骤描述")
input_params = fields.JSONField(null=True, description="输入参数")
output_result = fields.JSONField(null=True, description="输出结果")
status = fields.CharField(max_length=20, default="SUCCESS", description="步骤状态: SUCCESS, FAILED")
status = fields.CharField(max_length=20, default="processing", description="步骤状态: processing, completed, failed")
error_message = fields.TextField(null=True, description="错误信息")
created_at = fields.DatetimeField(auto_now_add=True, description="创建时间")
updated_at = fields.DatetimeField(auto_now=True, description="更新时间")
class Meta:
table = "valuation_calculation_steps"
table_description = "估值计算步骤表"
ordering = ["step_order"]
# 唯一索引同一估值ID下同一公式编码只能有一条记录
# 注意formula_code 允许为 NULL但新逻辑中 formula_code 总是有值
unique_together = [("valuation", "formula_code")]
def __str__(self):
return f"估值ID {self.valuation_id} - 步骤 {self.step_order}: {self.step_name}"

View File

@ -97,7 +97,7 @@ class InvoiceList(BaseModel):
class UpdateStatus(BaseModel):
id: int
status: str = Field(..., pattern=r"^(pending|invoiced|refunded)$")
status: str = Field(..., pattern=r"^(pending|invoiced|rejected|refunded)$")
class UpdateType(BaseModel):

View File

@ -325,13 +325,11 @@ class ValuationAssessmentQuery(BaseModel):
institution: Optional[str] = Field(None, description="所属机构")
industry: Optional[str] = Field(None, description="所属行业")
heritage_level: Optional[str] = Field(None, description="非遗等级")
status: Optional[str] = Field(None, description="评估状态: pending(待审核), completed(已完成)")
status: Optional[str] = Field(None, description="评估状态: pending(待审核), approved(已通过), rejected(已拒绝)")
is_active: Optional[bool] = Field(None, description="是否激活")
phone: Optional[str] = Field(None, description="手机号模糊查询")
submitted_start: Optional[str] = Field(None, description="提交时间开始毫秒时间戳或ISO字符串")
submitted_end: Optional[str] = Field(None, description="提交时间结束毫秒时间戳或ISO字符串")
create_start: Optional[str] = Field(None, description="创建时间开始毫秒时间戳或ISO字符串")
create_end: Optional[str] = Field(None, description="创建时间结束毫秒时间戳或ISO字符串")
audited_start: Optional[str] = Field(None, description="审核时间开始证书修改时间毫秒时间戳或ISO字符串")
audited_end: Optional[str] = Field(None, description="审核时间结束证书修改时间毫秒时间戳或ISO字符串")
page: int = Field(1, ge=1, description="页码")
@ -356,8 +354,13 @@ class ValuationCalculationStepBase(BaseModel):
step_description: Optional[str] = Field(None, description="步骤描述")
input_params: Optional[Dict[str, Any]] = Field(None, description="输入参数")
output_result: Optional[Dict[str, Any]] = Field(None, description="输出结果")
status: str = Field(..., description="步骤状态")
status: str = Field(..., description="步骤状态: processing/completed/failed")
error_message: Optional[str] = Field(None, description="错误信息")
formula_code: Optional[str] = Field(None, description="公式编码")
formula_name: Optional[str] = Field(None, description="公式名称")
formula_text: Optional[str] = Field(None, description="公式说明")
parent_formula_code: Optional[str] = Field(None, description="父级公式编码")
group_code: Optional[str] = Field(None, description="分组编码")
@field_validator('step_order', mode='before')
@classmethod
@ -382,6 +385,7 @@ class ValuationCalculationStepOut(ValuationCalculationStepBase):
id: int = Field(..., description="主键ID")
valuation_id: int = Field(..., description="关联的估值评估ID")
created_at: datetime = Field(..., description="创建时间")
updated_at: datetime = Field(..., description="更新时间")
class Config:
from_attributes = True

View File

@ -1,53 +1,53 @@
'''
这是非物质文化遗产IP知识产权评估系统的核心计算引擎包
'''
from app.utils.calculation_engine.economic_value_b1 import EconomicValueB1Calculator
from app.utils.calculation_engine.economic_value_b1.sub_formulas import (
BasicValueB11Calculator,
TrafficFactorB12Calculator,
PolicyMultiplierB13Calculator
)
from app.utils.calculation_engine.cultural_value_b2 import CulturalValueB2Calculator
from app.utils.calculation_engine.cultural_value_b2.sub_formulas import (
LivingHeritageB21Calculator,
PatternGeneB22Calculator
)
from app.utils.calculation_engine.risk_adjustment_b3 import RiskAdjustmentB3Calculator
from app.utils.calculation_engine.market_value_c import MarketValueCCalculator
from app.utils.calculation_engine.market_value_c.sub_formulas import (
MarketBiddingC1Calculator,
HeatCoefficientC2Calculator,
ScarcityMultiplierC3Calculator,
TemporalDecayC4Calculator
)
from app.utils.calculation_engine.final_value_ab import FinalValueACalculator
"""
非遗资产估值计算引擎包
提供各类计算器并通过懒加载避免循环依赖
"""
from importlib import import_module
from typing import Any
__version__ = "1.0.0"
__author__ = "Assessment Team"
__all__ = [
# 经济价值B1模块
"EconomicValueB1Calculator",
"BasicValueB11Calculator",
"TrafficFactorB12Calculator",
"PolicyMultiplierB13Calculator",
# 文化价值B2模块
"CulturalValueB2Calculator",
"LivingHeritageB21Calculator",
"PatternGeneB22Calculator",
# 风险调整系数B3模块
"RiskAdjustmentB3Calculator",
# 市场估值C模块
"MarketValueCCalculator",
"MarketBiddingC1Calculator",
"HeatCoefficientC2Calculator",
"ScarcityMultiplierC3Calculator",
"TemporalDecayC4Calculator",
# 最终估值A模块
"FinalValueACalculator"
"FinalValueACalculator",
]
_EXPORT_MAP = {
"EconomicValueB1Calculator": "app.utils.calculation_engine.economic_value_b1",
"BasicValueB11Calculator": "app.utils.calculation_engine.economic_value_b1.sub_formulas.basic_value_b11",
"TrafficFactorB12Calculator": "app.utils.calculation_engine.economic_value_b1.sub_formulas.traffic_factor_b12",
"PolicyMultiplierB13Calculator": "app.utils.calculation_engine.economic_value_b1.sub_formulas.policy_multiplier_b13",
"CulturalValueB2Calculator": "app.utils.calculation_engine.cultural_value_b2.cultural_value_b2",
"LivingHeritageB21Calculator": "app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21",
"PatternGeneB22Calculator": "app.utils.calculation_engine.cultural_value_b2.sub_formulas.pattern_gene_b22",
"RiskAdjustmentB3Calculator": "app.utils.calculation_engine.risk_adjustment_b3.sub_formulas.risk_adjustment_b3",
"MarketValueCCalculator": "app.utils.calculation_engine.market_value_c.market_value_c",
"MarketBiddingC1Calculator": "app.utils.calculation_engine.market_value_c.sub_formulas.market_bidding_c1",
"HeatCoefficientC2Calculator": "app.utils.calculation_engine.market_value_c.sub_formulas.heat_coefficient_c2",
"ScarcityMultiplierC3Calculator": "app.utils.calculation_engine.market_value_c.sub_formulas.scarcity_multiplier_c3",
"TemporalDecayC4Calculator": "app.utils.calculation_engine.market_value_c.sub_formulas.temporal_decay_c4",
"FinalValueACalculator": "app.utils.calculation_engine.final_value_ab.final_value_a",
}
def __getattr__(name: str) -> Any:
module_path = _EXPORT_MAP.get(name)
if not module_path:
raise AttributeError(f"module {__name__} has no attribute {name}")
module = import_module(module_path)
attr = getattr(module, name)
globals()[name] = attr
return attr

View File

@ -18,13 +18,12 @@ try:
from .sub_formulas.living_heritage_b21 import LivingHeritageB21Calculator
from .sub_formulas.pattern_gene_b22 import PatternGeneB22Calculator
from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
except ImportError:
# 绝对导入(当直接运行时)
from sub_formulas.living_heritage_b21 import LivingHeritageB21Calculator
from sub_formulas.pattern_gene_b22 import PatternGeneB22Calculator
from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
from app.controllers.valuation import ValuationController
class CulturalValueB2Calculator:
@ -54,7 +53,7 @@ class CulturalValueB2Calculator:
return cultural_value
async def calculate_complete_cultural_value_b2(self, valuation_id: int, input_data: Dict) -> float:
async def calculate_complete_cultural_value_b2(self, valuation_id: int, input_data: Dict) -> Dict[str, float]:
"""
计算完整的文化价值B2并记录所有计算步骤
@ -73,38 +72,59 @@ class CulturalValueB2Calculator:
}
Returns:
float: 计算得出的文化价值B2
Dict[str, float]: 包含文化价值B2及子公式结果的字典
Raises:
Exception: 在计算过程中遇到的任何异常都会被捕获记录并重新抛出
"""
step = await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=2.2,
step_name="文化价值B2计算",
step_description="开始计算文化价值B2公式为活态传承系数B21 × 0.6 + (纹样基因值B22 / 10) × 0.4",
input_params=input_data,
status="in_progress"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_CULTURAL_B2",
status="processing",
input_params=input_data,
)
try:
# 计算活态传承系数B21
teaching_frequency = self.living_heritage_calculator.calculate_teaching_frequency(
input_data["offline_sessions"],
input_data["douyin_views"],
input_data["kuaishou_views"],
input_data["bilibili_views"]
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_CULTURAL_B21_TEACHING_FREQ",
status="completed",
input_params={
"offline_sessions": input_data.get("offline_sessions"),
"douyin_views": input_data.get("douyin_views"),
"kuaishou_views": input_data.get("kuaishou_views"),
"bilibili_views": input_data.get("bilibili_views"),
},
output_result={"teaching_frequency": teaching_frequency},
)
living_heritage_b21 = self.living_heritage_calculator.calculate_living_heritage_b21(
input_data['inheritor_level_coefficient'],
self.living_heritage_calculator.calculate_teaching_frequency(
input_data["offline_sessions"],
input_data["douyin_views"],
input_data["kuaishou_views"],
input_data["bilibili_views"]
),
teaching_frequency,
input_data['cross_border_depth']
)
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id, step_order=2.21, step_name="活态传承系数B21",
output_result={'living_heritage_b21': living_heritage_b21}, status="completed"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_CULTURAL_B21",
status="completed",
input_params={
"inheritor_level_coefficient": input_data.get("inheritor_level_coefficient"),
"offline_sessions": input_data.get("offline_sessions"),
"douyin_views": input_data.get("douyin_views"),
"kuaishou_views": input_data.get("kuaishou_views"),
"bilibili_views": input_data.get("bilibili_views"),
"cross_border_depth": input_data.get("cross_border_depth"),
},
output_result={
"living_heritage_b21": living_heritage_b21,
"teaching_frequency": teaching_frequency,
},
)
# 计算纹样基因值B22
@ -113,11 +133,16 @@ class CulturalValueB2Calculator:
input_data['normalized_entropy'],
input_data['historical_inheritance']
)
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id, step_order=2.22, step_name="纹样基因值B22",
output_result={'pattern_gene_b22': pattern_gene_b22}, status="completed"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_CULTURAL_B22",
status="completed",
input_params={
"structure_complexity": input_data.get("structure_complexity"),
"normalized_entropy": input_data.get("normalized_entropy"),
"historical_inheritance": input_data.get("historical_inheritance"),
},
output_result={"pattern_gene_b22": pattern_gene_b22},
)
# 计算文化价值B2
@ -126,14 +151,24 @@ class CulturalValueB2Calculator:
pattern_gene_b22
)
await self.valuation_controller.update_calculation_step(
step.id, {"status": "completed", "output_result": {"cultural_value_b2": cultural_value_b2}}
result = {
"cultural_value_b2": cultural_value_b2,
"living_heritage_b21": living_heritage_b21,
"pattern_gene_b22": pattern_gene_b22,
}
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_CULTURAL_B2",
status="completed",
output_result=result,
)
return cultural_value_b2
return result
except Exception as e:
error_message = f"文化价值B2计算失败: {e}"
await self.valuation_controller.update_calculation_step(
step.id, {"status": "failed", "error_message": error_message}
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_CULTURAL_B2",
status="failed",
error_message=str(e),
)
raise

View File

@ -7,7 +7,6 @@
from typing import Dict
from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
from .sub_formulas.basic_value_b11 import BasicValueB11Calculator
from .sub_formulas.traffic_factor_b12 import TrafficFactorB12Calculator
@ -46,7 +45,7 @@ class EconomicValueB1Calculator:
return economic_value
async def calculate_complete_economic_value_b1(self, valuation_id: int, input_data: Dict) -> float:
async def calculate_complete_economic_value_b1(self, valuation_id: int, input_data: Dict) -> Dict[str, float]:
"""
计算完整的经济价值B1并记录所有计算步骤
@ -66,81 +65,229 @@ class EconomicValueB1Calculator:
}
Returns:
float: 计算得出的经济价值B1
Dict[str, float]: 包含经济价值B1及各子公式结果的字典
Raises:
Exception: 在计算过程中发生的任何异常都会被捕获记录并重新抛出
"""
step = await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=2.1,
step_name="经济价值B1计算",
step_description="开始计算经济价值B1公式为基础价值B11 × (1 + 流量因子B12) × 政策乘数B13",
input_params=input_data,
status="in_progress"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_ECON_B1",
status="processing",
input_params=input_data,
)
try:
# 计算基础价值B11
financial_value = self.basic_value_calculator.calculate_financial_value_f(input_data["three_year_income"])
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_ECON_B11_FINANCIAL_VALUE",
status="completed",
input_params={"three_year_income": input_data.get("three_year_income")},
output_result={"financial_value_f": financial_value},
)
legal_strength = self.basic_value_calculator.calculate_legal_strength_l(
input_data["patent_score"],
input_data["popularity_score"],
input_data["infringement_score"],
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_ECON_B11_LEGAL_STRENGTH",
status="completed",
input_params={
"patent_score": input_data.get("patent_score"),
"popularity_score": input_data.get("popularity_score"),
"infringement_score": input_data.get("infringement_score"),
},
output_result={"legal_strength_l": legal_strength},
)
development_potential = self.basic_value_calculator.calculate_development_potential_d(
input_data["patent_count"],
input_data["esg_score"],
input_data["innovation_ratio"],
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_ECON_B11_DEVELOPMENT_POTENTIAL",
status="completed",
input_params={
"patent_count": input_data.get("patent_count"),
"esg_score": input_data.get("esg_score"),
"innovation_ratio": input_data.get("innovation_ratio"),
},
output_result={"development_potential_d": development_potential},
)
industry_coefficient = input_data["industry_coefficient"]
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_ECON_B11_INDUSTRY_COEFFICIENT",
status="completed",
input_params={"industry_coefficient": industry_coefficient},
output_result={"industry_coefficient": industry_coefficient},
)
basic_value_b11 = self.basic_value_calculator.calculate_basic_value_b11(
self.basic_value_calculator.calculate_financial_value_f(input_data["three_year_income"]),
self.basic_value_calculator.calculate_legal_strength_l(input_data["patent_score"], input_data["popularity_score"], input_data["infringement_score"]),
self.basic_value_calculator.calculate_development_potential_d(input_data["patent_count"], input_data["esg_score"], input_data["innovation_ratio"]),
input_data["industry_coefficient"]
financial_value,
legal_strength,
development_potential,
industry_coefficient,
)
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id, step_order=2.11, step_name="基础价值B11",
output_result={'basic_value_b11': basic_value_b11}, status="completed"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_ECON_B11",
status="completed",
input_params={
"financial_value_f": financial_value,
"legal_strength_l": legal_strength,
"development_potential_d": development_potential,
"industry_coefficient": industry_coefficient,
},
output_result={
"basic_value_b11": basic_value_b11,
"financial_value_f": financial_value,
"legal_strength_l": legal_strength,
"development_potential_d": development_potential,
"industry_coefficient": industry_coefficient,
},
)
interaction_index = self.traffic_factor_calculator.calculate_interaction_index(
input_data["likes"],
input_data["comments"],
input_data["shares"],
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_FLOW_B12_INTERACTION_INDEX",
status="completed",
input_params={
"likes": input_data.get("likes"),
"comments": input_data.get("comments"),
"shares": input_data.get("shares"),
},
output_result={"interaction_index": interaction_index},
)
coverage_index = self.traffic_factor_calculator.calculate_coverage_index(input_data.get("followers", 0))
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_FLOW_B12_COVERAGE_INDEX",
status="completed",
input_params={"followers": input_data.get("followers", 0)},
output_result={"coverage_index": coverage_index},
)
conversion_efficiency = self.traffic_factor_calculator.calculate_conversion_efficiency(
input_data["sales_volume"],
input_data["link_views"],
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_FLOW_B12_CONVERSION_EFFICIENCY",
status="completed",
input_params={
"sales_volume": input_data.get("sales_volume"),
"link_views": input_data.get("link_views"),
},
output_result={"conversion_efficiency": conversion_efficiency},
)
social_media_spread_s3 = self.traffic_factor_calculator.calculate_social_media_spread_s3(
interaction_index,
coverage_index,
conversion_efficiency,
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_FLOW_B12_SOCIAL_SPREAD",
status="completed",
input_params={
"interaction_index": interaction_index,
"coverage_index": coverage_index,
"conversion_efficiency": conversion_efficiency,
},
output_result={"social_media_spread_s3": social_media_spread_s3},
)
# 计算流量因子B12
traffic_factor_b12 = self.traffic_factor_calculator.calculate_traffic_factor_b12(
input_data['search_index_s1'],
input_data['industry_average_s2'],
self.traffic_factor_calculator.calculate_social_media_spread_s3(
self.traffic_factor_calculator.calculate_interaction_index(input_data["likes"], input_data["comments"], input_data["shares"]),
self.traffic_factor_calculator.calculate_coverage_index(0),
self.traffic_factor_calculator.calculate_conversion_efficiency(input_data["sales_volume"], input_data["link_views"])
)
input_data["search_index_s1"],
input_data["industry_average_s2"],
social_media_spread_s3,
)
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id, step_order=2.12, step_name="流量因子B12",
output_result={'traffic_factor_b12': traffic_factor_b12}, status="completed"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_FLOW_B12",
status="completed",
input_params={
"search_index_s1": input_data.get("search_index_s1"),
"industry_average_s2": input_data.get("industry_average_s2"),
},
output_result={
"traffic_factor_b12": traffic_factor_b12,
"social_media_spread_s3": social_media_spread_s3,
},
)
policy_compatibility = self.policy_multiplier_calculator.calculate_policy_compatibility_score(
input_data["policy_match_score"],
input_data["implementation_stage"],
input_data["funding_support"],
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_POLICY_B13",
status="processing",
input_params={
"policy_match_score": input_data.get("policy_match_score"),
"implementation_stage": input_data.get("implementation_stage"),
"funding_support": input_data.get("funding_support"),
},
output_result={"policy_compatibility_score": policy_compatibility},
)
# 计算政策乘数B13
policy_multiplier_b13 = self.policy_multiplier_calculator.calculate_policy_multiplier_b13(
self.policy_multiplier_calculator.calculate_policy_compatibility_score(
input_data["policy_match_score"], input_data["implementation_stage"], input_data["funding_support"]
)
policy_compatibility,
)
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id, step_order=2.13, step_name="政策乘数B13",
output_result={'policy_multiplier_b13': policy_multiplier_b13}, status="completed"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_POLICY_B13",
status="completed",
output_result={
"policy_multiplier_b13": policy_multiplier_b13,
"policy_compatibility_score": policy_compatibility,
},
)
# 计算经济价值B1
economic_value_b1 = self.calculate_economic_value_b1(
basic_value_b11,
traffic_factor_b12,
policy_multiplier_b13
policy_multiplier_b13,
)
await self.valuation_controller.update_calculation_step(
step.id, {"status": "completed", "output_result": {"economic_value_b1": economic_value_b1}}
result = {
"economic_value_b1": economic_value_b1,
"basic_value_b11": basic_value_b11,
"traffic_factor_b12": traffic_factor_b12,
"policy_multiplier_b13": policy_multiplier_b13,
"financial_value_f": financial_value,
"legal_strength_l": legal_strength,
"development_potential_d": development_potential,
}
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_ECON_B1",
status="completed",
output_result=result,
)
return economic_value_b1
return result
except Exception as e:
error_message = f"经济价值B1计算失败: {e}"
await self.valuation_controller.update_calculation_step(
step.id, {"status": "failed", "error_message": error_message}
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_ECON_B1",
status="failed",
error_message=str(e),
)
raise

View File

@ -20,13 +20,11 @@ try:
from .model_value_b import ModelValueBCalculator
from ..market_value_c import MarketValueCCalculator
from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
except ImportError:
# 直接运行时的绝对导入
from app.utils.calculation_engine.final_value_ab.model_value_b import ModelValueBCalculator
from app.utils.calculation_engine.market_value_c import MarketValueCCalculator
from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
class FinalValueACalculator:
@ -94,25 +92,17 @@ class FinalValueACalculator:
"""
import time
start_time = time.time()
step_order = 1
# 记录输入参数
logger.info("final_value_a.calculation_start input_data_keys={} model_data_keys={} market_data_keys={}",
list(input_data.keys()),
list(input_data.get('model_data', {}).keys()),
list(input_data.get('market_data', {}).keys()))
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="开始计算最终估值A",
step_description="接收输入参数,准备开始计算。",
input_params=input_data,
status="processing"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"FINAL_A",
status="processing",
input_params=input_data,
)
step_order += 1
try:
# 详细记录模型数据参数
@ -178,19 +168,6 @@ class FinalValueACalculator:
int(model_duration * 1000),
list(model_result.keys()))
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算模型估值B",
step_description="调用ModelValueBCalculator计算模型估值B。",
input_params=input_data.get('model_data', {}),
output_result=model_result,
status="completed"
)
)
step_order += 1
# 计算市场估值C
logger.info("final_value_a.calculating_market_value_c 开始计算市场估值C")
market_start_time = time.time()
@ -208,19 +185,6 @@ class FinalValueACalculator:
int(market_duration * 1000),
input_data['market_data'])
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算市场估值C",
step_description="调用MarketValueCCalculator计算市场估值C。",
input_params=input_data.get('market_data', {}),
output_result=market_result,
status="completed"
)
)
step_order += 1
# 计算最终估值A
logger.info("final_value_a.calculating_final_value_a 开始计算最终估值A: 模型估值B={}万元 市场估值C={}万元",
model_value_b, market_value_c)
@ -240,16 +204,18 @@ class FinalValueACalculator:
int(model_duration * 1000),
int(market_duration * 1000))
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算最终估值A",
step_description="最终估值A = 模型估值B × 0.7 + 市场估值C × 0.3",
input_params={"model_value_b": model_value_b, "market_value_c": market_value_c},
output_result={"final_value_a": final_value_a},
status="completed"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"FINAL_A",
status="completed",
output_result={
"model_value_b": model_value_b,
"market_value_c": market_value_c,
"final_value_ab": final_value_a,
"model_duration_ms": int(model_duration * 1000),
"market_duration_ms": int(market_duration * 1000),
"total_duration_ms": int(total_duration * 1000),
},
)
return {
"model_value_b": model_value_b,
@ -259,15 +225,11 @@ class FinalValueACalculator:
except Exception as e:
logger.error("final_value_a.calculation_failed 计算失败: 错误={}", str(e))
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算失败",
step_description="计算过程中发生错误。",
status="failed",
error_message=str(e)
)
await self.valuation_controller.log_formula_step(
valuation_id,
"FINAL_A",
status="failed",
error_message=str(e),
)
raise

View File

@ -13,13 +13,11 @@ try:
from ..economic_value_b1.economic_value_b1 import EconomicValueB1Calculator
from ..cultural_value_b2.cultural_value_b2 import CulturalValueB2Calculator
from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
except ImportError:
# 绝对导入(当直接运行时)
from app.utils.calculation_engine.economic_value_b1.economic_value_b1 import EconomicValueB1Calculator
from app.utils.calculation_engine.cultural_value_b2.cultural_value_b2 import CulturalValueB2Calculator
from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
class ModelValueBCalculator:
@ -74,18 +72,12 @@ class ModelValueBCalculator:
Raises:
Exception: 在计算过程中遇到的任何异常都会被捕获记录然后重新抛出
"""
step_order = 1
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="开始计算模型估值B",
step_description="接收输入参数,准备开始计算。",
input_params=input_data,
status="processing"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B",
status="processing",
input_params=input_data,
)
step_order += 1
current_stage = "初始化模型估值B参数"
try:
@ -102,60 +94,27 @@ class ModelValueBCalculator:
# 计算经济价值B1传入估值ID并等待异步完成
current_stage = "经济价值B1计算"
economic_value_b1 = await self.economic_value_calculator.calculate_complete_economic_value_b1(
economic_result = await self.economic_value_calculator.calculate_complete_economic_value_b1(
valuation_id,
input_data['economic_data']
)
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算经济价值B1",
step_description="调用EconomicValueB1Calculator计算经济价值B1。",
input_params=input_data.get('economic_data', {}),
output_result={"economic_value_b1": economic_value_b1},
status="completed"
)
)
step_order += 1
economic_value_b1 = economic_result["economic_value_b1"]
# 计算文化价值B2传入估值ID并等待异步完成
current_stage = "文化价值B2计算"
cultural_value_b2 = await self.cultural_value_calculator.calculate_complete_cultural_value_b2(
cultural_result = await self.cultural_value_calculator.calculate_complete_cultural_value_b2(
valuation_id,
input_data['cultural_data']
)
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算文化价值B2",
step_description="调用CulturalValueB2Calculator计算文化价值B2。",
input_params=input_data.get('cultural_data', {}),
output_result={"cultural_value_b2": cultural_value_b2},
status="completed"
)
)
step_order += 1
cultural_value_b2 = cultural_result["cultural_value_b2"]
# 计算风险调整系数B3传入估值ID并等待异步完成
current_stage = "风险调整系数B3计算"
risk_value_b3 = await self.risk_adjustment_calculator.calculate_complete_risky_value_b3(
risk_result = await self.risk_adjustment_calculator.calculate_complete_risky_value_b3(
valuation_id,
input_data['risky_data']
)
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算风险调整系数B3",
step_description="调用RiskAdjustmentB3Calculator计算风险调整系数B3。",
input_params=input_data.get('risky_data', {}),
output_result={"risk_adjustment_b3": risk_value_b3},
status="completed"
)
)
step_order += 1
risk_value_b3 = risk_result["risk_value_b3"]
# 计算模型估值B
current_stage = "模型估值B汇总"
@ -164,33 +123,28 @@ class ModelValueBCalculator:
cultural_value_b2,
risk_value_b3
)
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算模型估值B",
step_description="模型估值B = 经济价值B1*0.7+文化价值B2*0.3*风险调整系数B3",
input_params={"economic_value_b1": economic_value_b1, "cultural_value_b2": cultural_value_b2, "risk_value_b3": risk_value_b3},
output_result={"model_value_b": model_value_b},
status="completed"
)
)
return {
result = {
"economic_value_b1": economic_value_b1,
"cultural_value_b2": cultural_value_b2,
"risk_value_b3": risk_value_b3,
"model_value_b": model_value_b,
"economic_details": economic_result,
"cultural_details": cultural_result,
"risk_details": risk_result,
}
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B",
status="completed",
output_result=result,
)
return result
except Exception as e:
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=step_order,
step_name="计算失败",
step_description="计算过程中发生错误。",
status="failed",
error_message=f"{current_stage}失败: {e}"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B",
status="failed",
error_message=f"{current_stage}失败: {e}",
)
raise

View File

@ -0,0 +1,271 @@
"""
公式元数据注册表
用于将计算引擎中的每个公式节点含子公式映射到唯一的 code名称公式说明以及排序
以便在 valuation_calculation_steps 表中进行结构化记录并最终生成可读的计算报告
"""
from __future__ import annotations
from dataclasses import dataclass
from decimal import Decimal
from typing import Dict, List, Optional
@dataclass(frozen=True)
class FormulaMeta:
code: str
name: str
formula: str
order: Decimal
parent_code: Optional[str]
group_code: str
FormulaTreeNode = Dict[str, object]
def _node(
code: str,
name: str,
formula: str,
order: str,
children: Optional[List[FormulaTreeNode]] = None,
group: Optional[str] = None,
) -> FormulaTreeNode:
return {
"code": code,
"name": name,
"formula": formula,
"order": order,
"group_code": group,
"children": children or [],
}
FORMULA_TREE: List[FormulaTreeNode] = [
_node(
"FINAL_A",
"最终估值A",
"最终估值A = 模型估值B × 0.7 + 市场估值C × 0.3",
"10",
children=[
_node(
"MODEL_B",
"模型估值B",
"模型估值B = 经济价值B1 × 0.7 + 文化价值B2 × 0.3 × 风险调整系数B3",
"20",
group="MODEL_B",
children=[
_node(
"MODEL_B_ECON_B1",
"经济价值B1",
"经济价值B1 = 基础价值B11 × (1 + 流量因子B12) × 政策乘数B13",
"21",
children=[
_node(
"MODEL_B_ECON_B11",
"基础价值B11",
"基础价值B11 = 财务价值F × (0.45 + 0.05 × 行业系数I) + 法律强度L × (0.35 + 0.05 × 行业系数I) + 发展潜力D × 0.2",
"21.1",
children=[
_node(
"MODEL_B_ECON_B11_FINANCIAL_VALUE",
"财务价值F",
"财务价值F = [3年内年均收益 × (1 + 增长率)^5] ÷ 5",
"21.11",
),
_node(
"MODEL_B_ECON_B11_LEGAL_STRENGTH",
"法律强度L",
"法律强度L = 专利分 × 0.4 + 普及分 × 0.3 + 侵权分 × 0.3",
"21.12",
),
_node(
"MODEL_B_ECON_B11_DEVELOPMENT_POTENTIAL",
"发展潜力D",
"发展潜力D = 专利分 × 0.5 + ESG分 × 0.2 + 创新投入比 × 0.3",
"21.13",
),
_node(
"MODEL_B_ECON_B11_INDUSTRY_COEFFICIENT",
"行业系数I",
"行业系数I = (目标行业平均ROE - 基准行业ROE) ÷ 基准行业ROE",
"21.14",
),
],
),
_node(
"MODEL_B_FLOW_B12",
"流量因子B12",
"流量因子B12 = ln(S1 ÷ S2) × 0.3 + 社交媒体传播度S3 × 0.7",
"21.2",
children=[
_node(
"MODEL_B_FLOW_B12_INTERACTION_INDEX",
"互动量指数",
"互动量指数 = (点赞 + 评论 + 分享) ÷ 1000",
"21.21",
),
_node(
"MODEL_B_FLOW_B12_COVERAGE_INDEX",
"覆盖人群指数",
"覆盖人群指数 = 粉丝数 ÷ 10000",
"21.22",
),
_node(
"MODEL_B_FLOW_B12_CONVERSION_EFFICIENCY",
"转化效率",
"转化效率 = 商品链接点击量 ÷ 内容浏览量",
"21.23",
),
_node(
"MODEL_B_FLOW_B12_SOCIAL_SPREAD",
"社交媒体传播度S3",
"社交媒体传播度S3 = 互动量指数 × 0.4 + 覆盖人群指数 × 0.3 + 转化效率 × 0.3",
"21.24",
),
],
),
_node(
"MODEL_B_POLICY_B13",
"政策乘数B13",
"政策乘数B13 = 1 + 政策契合度评分P × 0.15,其中 P = 政策匹配度 × 0.4 + 实施阶段评分 × 0.3 + 资金支持度 × 0.3",
"21.3",
),
],
),
_node(
"MODEL_B_CULTURAL_B2",
"文化价值B2",
"文化价值B2 = 活态传承系数B21 × 0.6 + (纹样基因值B22 ÷ 10) × 0.4",
"22",
children=[
_node(
"MODEL_B_CULTURAL_B21",
"活态传承系数B21",
"活态传承系数B21 = 传承人等级系数 × 0.4 + 教学传播频次 × 0.3 + 跨界合作深度 × 0.3",
"22.1",
children=[
_node(
"MODEL_B_CULTURAL_B21_TEACHING_FREQ",
"教学传播频次",
"教学传播频次 = 线下传习次数 × 0.6 + 线上课程点击量(万) × 0.4",
"22.11",
),
],
),
_node(
"MODEL_B_CULTURAL_B22",
"纹样基因值B22",
"纹样基因值B22 = (结构复杂度SC × 0.6 + 归一化信息熵H × 0.4) × 历史传承度HI × 10",
"22.2",
),
],
),
_node(
"MODEL_B_RISK_B3",
"风险调整系数B3",
"风险调整系数B3 = 0.8 + 风险评分总和R × 0.4,其中 R = 市场风险 × 0.3 + 法律风险 × 0.4 + 传承风险 × 0.3",
"23",
children=[
_node(
"MODEL_B_RISK_B3_MARKET",
"市场风险",
"市场风险依据价格波动率:波动率 ≤5% 计10分5-15%计5分>15%计0分",
"23.1",
),
_node(
"MODEL_B_RISK_B3_LEGAL",
"法律风险",
"法律风险根据诉讼状态评分(无诉讼/已解决/未解决)",
"23.2",
),
_node(
"MODEL_B_RISK_B3_INHERITANCE",
"传承风险",
"传承风险依据传承人年龄≤50岁10分50-70岁5分>70岁0分取最高分",
"23.3",
),
],
),
],
),
_node(
"MARKET_C",
"市场估值C",
"市场估值C = 市场竞价C1 × 热度系数C2 × 稀缺性乘数C3 × 时效性衰减C4",
"30",
group="MARKET_C",
children=[
_node(
"MARKET_C_C1",
"市场竞价C1",
"市场竞价C1 结合历史交易价格、人工竞价与专家估值的加权结果",
"30.1",
),
_node(
"MARKET_C_C2",
"热度系数C2",
"热度系数C2 = 1 + 浏览热度分(依据日均浏览量与收藏数量)",
"30.2",
),
_node(
"MARKET_C_C3",
"稀缺性乘数C3",
"稀缺性乘数C3 = 1 + 稀缺等级分",
"30.3",
),
_node(
"MARKET_C_C4",
"时效性衰减C4",
"时效性衰减C4 依据距最近市场活动天数的衰减系数",
"30.4",
),
],
),
],
)
]
def _build_index() -> Dict[str, FormulaMeta]:
index: Dict[str, FormulaMeta] = {}
def dfs(nodes: List[FormulaTreeNode], parent_code: Optional[str], group_code: Optional[str]):
for node in nodes:
code = node["code"]
name = node["name"]
formula = node["formula"]
order = Decimal(str(node["order"]))
explicit_group = node.get("group_code")
if explicit_group:
current_group = explicit_group
elif parent_code is None:
current_group = code
else:
current_group = group_code or parent_code
meta = FormulaMeta(
code=code,
name=name,
formula=formula,
order=order,
parent_code=parent_code,
group_code=current_group,
)
index[code] = meta
dfs(node.get("children", []), code, current_group)
dfs(FORMULA_TREE, None, None)
return index
FORMULA_INDEX: Dict[str, FormulaMeta] = _build_index()
def get_formula_meta(code: str) -> FormulaMeta:
meta = FORMULA_INDEX.get(code)
if not meta:
raise KeyError(f"公式编码未注册: {code}")
return meta

View File

@ -20,7 +20,6 @@ try:
from .sub_formulas.temporal_decay_c4 import TemporalDecayC4Calculator
from .market_data_analyzer import market_data_analyzer
from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
except ImportError:
# 绝对导入(当直接运行时)
from sub_formulas.market_bidding_c1 import MarketBiddingC1Calculator
@ -29,7 +28,6 @@ except ImportError:
from sub_formulas.temporal_decay_c4 import TemporalDecayC4Calculator
from market_data_analyzer import market_data_analyzer
from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
logger = logging.getLogger(__name__)
@ -103,7 +101,7 @@ class MarketValueCCalculator:
return market_value / 10000.0
async def calculate_complete_market_value_c(self, valuation_id: int, input_data: Dict) -> float:
async def calculate_complete_market_value_c(self, valuation_id: int, input_data: Dict) -> Dict[str, float]:
"""
计算完整的市场估值C并记录每一步的计算过程
@ -124,20 +122,16 @@ class MarketValueCCalculator:
}
Returns:
float: 计算得出的市场估值C
Dict[str, float]: 包含市场估值C及子公式结果的字典
Raises:
Exception: 如果在计算过程中发生任何错误将记录失败状态并重新抛出异常
"""
step = await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=3,
step_name="市场估值C计算",
step_description="开始计算市场估值C公式为市场竞价C1 × 热度系数C2 × 稀缺性乘数C3 × 时效性衰减C4",
input_params=input_data,
status="in_progress"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MARKET_C",
status="processing",
input_params=input_data,
)
try:
# 计算市场竞价C1
@ -146,11 +140,16 @@ class MarketValueCCalculator:
manual_bids=input_data.get('manual_bids', []),
expert_valuations=input_data.get('expert_valuations', [])
)
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id, step_order=3.1, step_name="市场竞价C1",
output_result={'market_bidding_c1': market_bidding_c1}, status="completed"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MARKET_C_C1",
status="completed",
input_params={
"weighted_average_price": input_data.get('weighted_average_price'),
"manual_bids": input_data.get('manual_bids'),
"expert_valuations": input_data.get('expert_valuations'),
},
output_result={'market_bidding_c1': market_bidding_c1},
)
# 计算热度系数C2
@ -158,33 +157,39 @@ class MarketValueCCalculator:
input_data.get('daily_browse_volume', 500.0),
input_data.get('collection_count', 50)
)
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id, step_order=3.2, step_name="热度系数C2",
output_result={'heat_coefficient_c2': heat_coefficient_c2}, status="completed"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MARKET_C_C2",
status="completed",
input_params={
"daily_browse_volume": input_data.get('daily_browse_volume'),
"collection_count": input_data.get('collection_count'),
},
output_result={'heat_coefficient_c2': heat_coefficient_c2},
)
# 计算稀缺性乘数C3
scarcity_multiplier_c3 = self.scarcity_multiplier_calculator.calculate_scarcity_multiplier_c3(
input_data.get('issuance_level', '限量')
)
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id, step_order=3.3, step_name="稀缺性乘数C3",
output_result={'scarcity_multiplier_c3': scarcity_multiplier_c3}, status="completed"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MARKET_C_C3",
status="completed",
input_params={'issuance_level': input_data.get('issuance_level')},
output_result={'scarcity_multiplier_c3': scarcity_multiplier_c3},
)
# 计算时效性衰减C4
temporal_decay_c4 = self.temporal_decay_calculator.calculate_temporal_decay_c4(
input_data.get('recent_market_activity', '2024-01-15')
)
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id, step_order=3.4, step_name="时效性衰减C4",
output_result={'temporal_decay_c4': temporal_decay_c4}, status="completed"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MARKET_C_C4",
status="completed",
input_params={'recent_market_activity': input_data.get('recent_market_activity')},
output_result={'temporal_decay_c4': temporal_decay_c4},
)
# 计算市场估值C
@ -194,16 +199,29 @@ class MarketValueCCalculator:
scarcity_multiplier_c3,
temporal_decay_c4
)
await self.valuation_controller.update_calculation_step(
step.id, {"status": "completed", "output_result": {"market_value_c": market_value_c}}
result = {
"market_value_c": market_value_c,
"market_bidding_c1": market_bidding_c1,
"heat_coefficient_c2": heat_coefficient_c2,
"scarcity_multiplier_c3": scarcity_multiplier_c3,
"temporal_decay_c4": temporal_decay_c4,
}
await self.valuation_controller.log_formula_step(
valuation_id,
"MARKET_C",
status="completed",
output_result=result,
)
return market_value_c
return result
except Exception as e:
error_message = f"市场估值C计算失败: {e}"
logger.error(error_message, exc_info=True)
await self.valuation_controller.update_calculation_step(
step.id, {"status": "failed", "error_message": error_message}
await self.valuation_controller.log_formula_step(
valuation_id,
"MARKET_C",
status="failed",
error_message=str(e),
)
raise

View File

@ -15,9 +15,7 @@ sys.path.append(os.path.join(current_dir, '..', '..', '..', '..'))
try:
from app.controllers.valuation import ValuationController
from app.schemas.valuation import ValuationCalculationStepCreate
except ImportError:
# 处理可能的导入错误
pass
class RiskAdjustmentB3Calculator:
@ -167,7 +165,7 @@ class RiskAdjustmentB3Calculator:
return max_score
async def calculate_complete_risky_value_b3(self, valuation_id: int, input_data: Dict) -> float:
async def calculate_complete_risky_value_b3(self, valuation_id: int, input_data: Dict) -> Dict[str, float]:
"""
计算完整的风险调整系数B3并记录所有计算步骤
@ -187,43 +185,45 @@ class RiskAdjustmentB3Calculator:
}
Returns:
float: 计算得出的风险调整系数B3
Dict[str, float]: 包含各项风险评分和风险调整系数的字典
Raises:
Exception: 在计算过程中遇到的任何异常都会被捕获记录并重新抛出
"""
step = await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id,
step_order=2.3,
step_name="风险调整系数B3计算",
step_description="开始计算风险调整系数B3公式为0.8 + 风险评分总和R × 0.4",
input_params=input_data,
status="in_progress"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_RISK_B3",
status="processing",
input_params=input_data,
)
try:
# 计算各项风险评分
market_risk = self.calculate_market_risk(input_data["highest_price"], input_data["lowest_price"])
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id, step_order=2.31, step_name="市场风险评分",
output_result={'market_risk': market_risk}, status="completed"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_RISK_B3_MARKET",
status="completed",
input_params={
"highest_price": input_data.get("highest_price"),
"lowest_price": input_data.get("lowest_price"),
},
output_result={'market_risk': market_risk},
)
legal_risk = self.calculate_legal_risk(input_data["lawsuit_status"])
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id, step_order=2.32, step_name="法律风险评分",
output_result={'legal_risk': legal_risk}, status="completed"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_RISK_B3_LEGAL",
status="completed",
input_params={"lawsuit_status": input_data.get("lawsuit_status")},
output_result={'legal_risk': legal_risk},
)
inheritance_risk = self.calculate_inheritance_risk(input_data["inheritor_ages"])
await self.valuation_controller.create_calculation_step(
ValuationCalculationStepCreate(
valuation_id=valuation_id, step_order=2.33, step_name="传承风险评分",
output_result={'inheritance_risk': inheritance_risk}, status="completed"
)
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_RISK_B3_INHERITANCE",
status="completed",
input_params={"inheritor_ages": input_data.get("inheritor_ages")},
output_result={'inheritance_risk': inheritance_risk},
)
# 计算风险评分总和R
@ -231,15 +231,27 @@ class RiskAdjustmentB3Calculator:
# 计算风险调整系数B3
risk_adjustment_b3 = self.calculate_risk_adjustment_b3(risk_score_sum)
await self.valuation_controller.update_calculation_step(
step.id, {"status": "completed", "output_result": {'risk_adjustment_b3': risk_adjustment_b3}}
result = {
"risk_value_b3": risk_adjustment_b3,
"risk_score_sum": risk_score_sum,
"market_risk": market_risk,
"legal_risk": legal_risk,
"inheritance_risk": inheritance_risk,
}
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_RISK_B3",
status="completed",
output_result=result,
)
return risk_adjustment_b3
return result
except Exception as e:
error_message = f"风险调整系数B3计算失败: {e}"
await self.valuation_controller.update_calculation_step(
step.id, {"status": "failed", "error_message": error_message}
await self.valuation_controller.log_formula_step(
valuation_id,
"MODEL_B_RISK_B3",
status="failed",
error_message=str(e),
)
raise

View File

@ -21,11 +21,14 @@ server {
index index.html index.htm;
try_files $uri /index.html;
}
# PC 前端(/pc/ 前缀)
location = /pc {
return 302 /pc/;
}
location ^~ /pc/ {
root /opt/vue-fastapi-admin/web1/dist;
index index.html index.htm;
try_files $uri /index.html;
alias /opt/vue-fastapi-admin/web1/dist/;
index index.html;
try_files $uri $uri/ /index.html;
}
location ^~ /api/ {
proxy_pass http://127.0.0.1:9999;
@ -33,4 +36,4 @@ server {
proxy_set_header X-Real-IP $remote_addr;
}
}
}