feat(valuation): 优化估值评估流程并添加删除功能

- 将估值计算改为后台任务执行,提高响应速度
- 添加估值评估记录的软删除功能
- 更新评估状态字段值从approved/rejected改为success/fail
- 修复注册接口的HTTP状态码问题
- 更新API版本号和服务器配置
- 禁用FastAPI尾部斜杠重定向
This commit is contained in:
邹方成 2025-10-14 10:59:56 +08:00
parent 50e02dc37d
commit 9d950ba368
8 changed files with 251 additions and 197 deletions

View File

@ -33,6 +33,7 @@ def create_app() -> FastAPI:
openapi_url="/openapi.json",
middleware=make_middlewares(),
lifespan=lifespan,
redirect_slashes=False, # 禁用尾部斜杠重定向
)
# 注册静态文件目录
# app.mount("/static", StaticFiles(directory="app/static"), name="static")

View File

@ -38,7 +38,7 @@ async def register(
}
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
raise HTTPException(status_code=200, detail=str(e))
@router.post("/login", response_model=AppUserJWTOut, summary="用户登录")

View File

@ -1,7 +1,7 @@
from random import random
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from typing import Optional, List, Dict, Any
import json
import asyncio
@ -33,15 +33,191 @@ from app.utils.wechat_index_calculator import wechat_index_calculator
app_valuations_router = APIRouter(tags=["用户端估值评估"])
async def _perform_valuation_calculation(user_id: int, data: UserValuationCreate):
"""
后台任务执行估值计算
"""
try:
start_ts = time.monotonic()
logger.info("valuation.calc_start user_id={} asset_name={} industry={}", user_id,
getattr(data, 'asset_name', None), getattr(data, 'industry', None))
# 根据行业查询 ESG 基准分(优先用行业名称匹配,如用的是行业代码就把 name 改成 code
esg_obj = None
industry_obj = None
policy_obj = None
try:
esg_obj = await asyncio.wait_for(ESG.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
logger.warning("valuation.esg_fetch_timeout industry={} err={}", data.industry, repr(e))
esg_score = float(getattr(esg_obj, 'number', 0.0) or 0.0)
# 根据行业查询 行业修正系数与ROE
try:
industry_obj = await asyncio.wait_for(Industry.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
logger.warning("valuation.industry_fetch_timeout industry={} err={}", data.industry, repr(e))
fix_num_score = getattr(industry_obj, 'fix_num', 0.0) or 0.0
# 根据行业查询 政策匹配度
try:
policy_obj = await asyncio.wait_for(Policy.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
logger.warning("valuation.policy_fetch_timeout industry={} err={}", data.industry, repr(e))
policy_match_score = getattr(policy_obj, 'score', 0.0) or 0.0
# 提取 经济价值B1 计算参数
input_data_by_b1 = await _extract_calculation_params_b1(data)
# ESG关联价值 ESG分 (0-10分)
input_data_by_b1["esg_score"] = esg_score
# 行业修正系数I
input_data_by_b1["industry_coefficient"] = fix_num_score
# 政策匹配度
input_data_by_b1["policy_match_score"] = policy_match_score
# 侵权分 默认 6
try:
judicial_data = universal_api.query_judicial_data(data.institution)
_data = judicial_data["data"].get("target",None) # 诉讼标的
if _data:
infringement_score = 0.0
else:
infringement_score = 10.0
except:
infringement_score = 0.0
input_data_by_b1["infringement_score"] = infringement_score
# 获取专利信息 TODO 参数
try:
patent_data = universal_api.query_patent_info(data.industry)
except Exception as e:
logger.warning("valuation.patent_api_error err={}", repr(e))
input_data_by_b1["patent_count"] = 0.0
input_data_by_b1["patent_score"] = 0.0
patent_dict = patent_data if isinstance(patent_data, dict) else {}
inner_data = patent_dict.get("data", {}) if isinstance(patent_dict.get("data", {}), dict) else {}
data_list = inner_data.get("dataList", [])
data_list = data_list if isinstance(data_list, list) else []
# 验证 专利剩余年限
# 发展潜力D相关参数 专利数量
# 查询匹配申请号的记录集合
matched = [item for item in data_list if isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)]
if matched:
patent_count = calculate_patent_usage_score(len(matched))
input_data_by_b1["patent_count"] = float(patent_count)
else:
input_data_by_b1["patent_count"] = 0.0
patent_score = calculate_patent_score(calculate_total_years(data_list))
input_data_by_b1["patent_score"] = patent_score
# 提取 文化价值B2 计算参数
input_data_by_b2 = await _extract_calculation_params_b2(data)
# 提取 风险调整系数B3 计算参数
input_data_by_b3 = await _extract_calculation_params_b3(data)
if infringement_score == 10.0:
input_data_by_b3["lawsuit_status"] = "无诉讼状态"
if 0 < infringement_score < 4.0:
input_data_by_b3["lawsuit_status"] = "已解决诉讼"
else:
input_data_by_b3["lawsuit_status"] = "未解决诉讼"
# 提取 市场估值C 参数
input_data_by_c = await _extract_calculation_params_c(data)
input_data = {
# 模型估值B 相关参数
"model_data": {
# 经济价值B1 参数
"economic_data": input_data_by_b1,
# 文化价值B2 参数
"cultural_data": input_data_by_b2,
# 风险调整参数 B3
"risky_data": input_data_by_b3,
},
# 市场估值C 参数
"market_data": input_data_by_c,
}
calculator = FinalValueACalculator()
# 计算最终估值A统一计算
calculation_result = calculator.calculate_complete_final_value_a(input_data)
# 计算动态质押
drp_c = DynamicPledgeRateCalculator()
'''
monthly_amount (float): 月交易额万元
heritage_level (str): 非遗等级
'''
# 解析月交易额字符串为数值
monthly_amount = drp_c.parse_monthly_transaction_amount(data.monthly_transaction_amount or "")
drp_result = drp_c.calculate_dynamic_pledge_rate(monthly_amount, data.heritage_asset_level)
# 结构化日志:关键分值
try:
duration_ms = int((time.monotonic() - start_ts) * 1000)
logger.info(
"valuation.calc_done user_id={} duration_ms={} model_value_b={} market_value_c={} final_value_ab={}",
user_id,
duration_ms,
calculation_result.get('model_value_b'),
calculation_result.get('market_value_c'),
calculation_result.get('final_value_ab'),
)
except Exception:
pass
# 创建估值评估记录
result = await user_valuation_controller.create_valuation(
user_id=user_id,
data=data,
calculation_result=calculation_result,
calculation_input={
'model_data': {
'economic_data': list(input_data.get('model_data', {}).get('economic_data', {}).keys()),
'cultural_data': list(input_data.get('model_data', {}).get('cultural_data', {}).keys()),
'risky_data': list(input_data.get('model_data', {}).get('risky_data', {}).keys()),
},
'market_data': list(input_data.get('market_data', {}).keys()),
},
drp_result=drp_result,
status='success' # 计算成功设置为approved状态
)
logger.info("valuation.background_calc_success user_id={} valuation_id={}", user_id, result.id)
except Exception as e:
import traceback
print(traceback.format_exc())
logger.error("valuation.background_calc_failed user_id={} err={}", user_id, repr(e))
# 计算失败时也创建记录状态设置为failed
try:
result = await user_valuation_controller.create_valuation(
user_id=user_id,
data=data,
calculation_result=None,
calculation_input=None,
drp_result=None,
status='rejected' # 计算失败设置为rejected状态
)
logger.info("valuation.failed_record_created user_id={} valuation_id={}", user_id, result.id)
except Exception as create_error:
logger.error("valuation.failed_to_create_record user_id={} err={}", user_id, repr(create_error))
@app_valuations_router.post("/", summary="创建估值评估")
async def calculate_valuation(
background_tasks: BackgroundTasks,
data: UserValuationCreate,
user_id: int = Depends(get_current_app_user_id)
):
"""
计算估值评估
创建估值评估任务
根据用户提交的估值评估数据调用计算引擎进行经济价值B1计算
根据用户提交的估值评估数据启动后台计算任务进行经济价值B1计算
请求示例JSON (仅包含用户填写部分):
{
@ -101,195 +277,27 @@ async def calculate_valuation(
- 专利验证: 通过API验证专利有效性
- 侵权记录: 通过API查询侵权诉讼历史
"""
try:
start_ts = time.monotonic()
logger.info("valuation.calc_start user_id={} asset_name={} industry={}", user_id,
getattr(data, 'asset_name', None), getattr(data, 'industry', None))
# 根据行业查询 ESG 基准分(优先用行业名称匹配,如用的是行业代码就把 name 改成 code
esg_obj = None
industry_obj = None
policy_obj = None
try:
esg_obj = await asyncio.wait_for(ESG.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
logger.warning("valuation.esg_fetch_timeout industry={} err={}", data.industry, repr(e))
esg_score = float(getattr(esg_obj, 'number', 0.0) or 0.0)
# 根据行业查询 行业修正系数与ROE
try:
industry_obj = await asyncio.wait_for(Industry.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
logger.warning("valuation.industry_fetch_timeout industry={} err={}", data.industry, repr(e))
fix_num_score = getattr(industry_obj, 'fix_num', 0.0) or 0.0
# 根据行业查询 政策匹配度
try:
policy_obj = await asyncio.wait_for(Policy.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
logger.warning("valuation.policy_fetch_timeout industry={} err={}", data.industry, repr(e))
policy_match_score = getattr(policy_obj, 'score', 0.0) or 0.0
# 提取 经济价值B1 计算参数
input_data_by_b1 = await _extract_calculation_params_b1(data)
# ESG关联价值 ESG分 (0-10分)
input_data_by_b1["esg_score"] = esg_score
# 行业修正系数I
input_data_by_b1["industry_coefficient"] = fix_num_score
# 政策匹配度
input_data_by_b1["policy_match_score"] = policy_match_score
# 侵权分 默认 6
try:
judicial_data = universal_api.query_judicial_data(data.institution)
_data = judicial_data["data"].get("target",None) # 诉讼标的
if _data:
infringement_score = 0.0
else:
infringement_score = 10.0
except:
infringement_score = 0.0
input_data_by_b1["infringement_score"] = infringement_score
# 获取专利信息 TODO 参数
try:
patent_data = universal_api.query_patent_info(data.industry)
except Exception as e:
logger.warning("valuation.patent_api_error err={}", repr(e))
input_data_by_b1["patent_count"] = 0.0
input_data_by_b1["patent_score"] = 0.0
patent_dict = patent_data if isinstance(patent_data, dict) else {}
inner_data = patent_dict.get("data", {}) if isinstance(patent_dict.get("data", {}), dict) else {}
data_list = inner_data.get("dataList", [])
data_list = data_list if isinstance(data_list, list) else []
# 验证 专利剩余年限
# 发展潜力D相关参数 专利数量
# 查询匹配申请号的记录集合
matched = [item for item in data_list if isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)]
if matched:
patent_count = calculate_patent_usage_score(len(matched))
input_data_by_b1["patent_count"] = float(patent_count)
else:
input_data_by_b1["patent_count"] = 0.0
patent_score = calculate_patent_score(calculate_total_years(data_list))
input_data_by_b1["patent_score"] = patent_score
# 提取 文化价值B2 计算参数
input_data_by_b2 = await _extract_calculation_params_b2(data)
# 提取 风险调整系数B3 计算参数
input_data_by_b3 = await _extract_calculation_params_b3(data)
if infringement_score == 10.0:
input_data_by_b3["lawsuit_status"] = "无诉讼状态"
if 0 < infringement_score < 4.0:
input_data_by_b3["lawsuit_status"] = "已解决诉讼"
else:
input_data_by_b3["lawsuit_status"] = "未解决诉讼"
# 提取 市场估值C 参数
input_data_by_c = await _extract_calculation_params_c(data)
input_data = {
# 模型估值B 相关参数
"model_data": {
# 经济价值B1 参数
"economic_data": input_data_by_b1,
# 文化价值B2 参数
"cultural_data": input_data_by_b2,
# 风险调整参数 B3
"risky_data": input_data_by_b3,
},
# 市场估值C 参数
"market_data": input_data_by_c,
}
calculator = FinalValueACalculator()
# 计算最终估值A统一计算
calculation_result = calculator.calculate_complete_final_value_a(input_data)
# 计算动态质押
drp_c = DynamicPledgeRateCalculator()
'''
monthly_amount (float): 月交易额万元
heritage_level (str): 非遗等级
'''
# 解析月交易额字符串为数值
monthly_amount = drp_c.parse_monthly_transaction_amount(data.monthly_transaction_amount or "")
drp_result = drp_c.calculate_dynamic_pledge_rate(monthly_amount, data.heritage_asset_level)
# 结构化日志:关键分值
try:
duration_ms = int((time.monotonic() - start_ts) * 1000)
logger.info(
"valuation.calc_done user_id={} duration_ms={} model_value_b={} market_value_c={} final_value_ab={}",
user_id,
duration_ms,
calculation_result.get('model_value_b'),
calculation_result.get('market_value_c'),
calculation_result.get('final_value_ab'),
)
except Exception:
pass
# 创建估值评估记录
result = await user_valuation_controller.create_valuation(
user_id=user_id,
data=data,
calculation_result=calculation_result,
calculation_input={
'model_data': {
'economic_data': list(input_data.get('model_data', {}).get('economic_data', {}).keys()),
'cultural_data': list(input_data.get('model_data', {}).get('cultural_data', {}).keys()),
'risky_data': list(input_data.get('model_data', {}).get('risky_data', {}).keys()),
},
'market_data': list(input_data.get('market_data', {}).keys()),
},
drp_result=drp_result,
status='approved' # 计算成功设置为approved状态
# 添加后台任务
background_tasks.add_task(_perform_valuation_calculation, user_id, data)
logger.info("valuation.task_queued user_id={} asset_name={} industry={}",
user_id, getattr(data, 'asset_name', None), getattr(data, 'industry', None))
return Success(
data={
"task_status": "queued",
"message": "估值计算任务已提交,正在后台处理中",
"user_id": user_id,
"asset_name": getattr(data, 'asset_name', None)
},
msg="估值计算任务已启动"
)
# 组装返回
result_dict = json.loads(result.model_dump_json())
# "calculation_result": {
# "model_value_b": 660.1534497474814,
# "market_value_c": 8800.0,
# "final_value_ab": 3102.107414823237
# }
result_dict['calculation_result'] = calculation_result
result_dict['calculation_input'] = {
'model_data': {
'economic_data': list(input_data.get('model_data', {}).get('economic_data', {}).keys()),
'cultural_data': list(input_data.get('model_data', {}).get('cultural_data', {}).keys()),
'risky_data': list(input_data.get('model_data', {}).get('risky_data', {}).keys()),
},
'market_data': list(input_data.get('market_data', {}).keys()),
}
return Success(data=result_dict, msg="估值计算完成")
except Exception as e:
import traceback
print(traceback.format_exc())
logger.error("valuation.calc_failed user_id={} err={}", user_id, repr(e))
# 计算失败时也创建记录状态设置为failed
try:
result = await user_valuation_controller.create_valuation(
user_id=user_id,
data=data,
calculation_result=None,
calculation_input=None,
drp_result=None,
status='rejected' # 计算失败设置为rejected状态
)
logger.info("valuation.failed_record_created user_id={} valuation_id={}", user_id, result.id)
except Exception as create_error:
logger.error("valuation.failed_to_create_record user_id={} err={}", user_id, repr(create_error))
raise HTTPException(status_code=400, detail=f"计算失败: {str(e)}")
logger.error("valuation.task_queue_failed user_id={} err={}", user_id, repr(e))
raise HTTPException(status_code=500, detail=f"任务提交失败: {str(e)}")
async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str, Any]:
@ -563,6 +571,36 @@ async def get_my_valuation_statistics(
)
@app_valuations_router.delete("/{valuation_id}", summary="删除估值评估")
async def delete_valuation(
valuation_id: int,
current_user: AppUser = Depends(get_current_app_user)
):
"""
删除指定的估值评估记录软删除
"""
try:
result = await user_valuation_controller.delete_user_valuation(
user_id=current_user.id,
valuation_id=valuation_id
)
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="估值评估记录不存在或已被删除"
)
return Success(data={"deleted": True}, msg="删除估值评估成功")
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"删除估值评估失败: {str(e)}"
)
def calculate_total_years(data_list):
current_date = datetime.now().date()
total_years = 0

View File

@ -58,7 +58,7 @@ class ThirdPartyAPIController:
)
# 站长之家API便捷方法
async def query_copyright_software(self, company_name: str, chinaz_ver: str = "1") -> APIResponse:
async def query_copyright_software(self, company_name: str, chinaz_ver: str = "1.0") -> APIResponse:
"""查询企业软件著作权"""
return await self.make_api_request(
provider="chinaz",
@ -71,7 +71,7 @@ class ThirdPartyAPIController:
}
)
async def query_patent_info(self, company_name: str, chinaz_ver: str = "1") -> APIResponse:
async def query_patent_info(self, company_name: str, chinaz_ver: str = "2.0") -> APIResponse:
"""查询企业专利信息"""
return await self.make_api_request(
provider="chinaz",

View File

@ -22,7 +22,7 @@ class UserValuationController:
"""用户创建估值评估"""
valuation_data = data.model_dump()
valuation_data['user_id'] = user_id
valuation_data['status'] = status # 根据计算结果显示设置状态
valuation_data['status'] = "success" # 根据计算结果显示设置状态
# 添加计算结果到数据库
if calculation_result:
@ -97,6 +97,21 @@ class UserValuationController:
'rejected': rejected
}
async def delete_user_valuation(self, user_id: int, valuation_id: int) -> bool:
"""删除用户的估值评估(软删除)"""
valuation = await self.model.filter(
id=valuation_id,
user_id=user_id,
is_active=True
).first()
if not valuation:
return False
# 软删除:设置 is_active 为 False
await valuation.update_from_dict({'is_active': False}).save()
return True
async def _to_user_out(self, valuation: ValuationAssessment) -> UserValuationOut:
"""转换为用户端输出模型"""
return UserValuationOut.model_validate(valuation)

View File

@ -78,7 +78,7 @@ class ValuationAssessment(Model):
# 系统字段
user = fields.ForeignKeyField("models.AppUser", related_name="valuations", description="提交用户")
status = fields.CharField(max_length=20, default="pending", description="评估状态: pending(待审核), approved(已通过), rejected(已拒绝)")
status = fields.CharField(max_length=20, default="success", description="评估状态: pending(待审核), success(已通过), fail(已拒绝)")
admin_notes = fields.TextField(null=True, description="管理员备注")
created_at = fields.DatetimeField(auto_now_add=True, description="创建时间")
updated_at = fields.DatetimeField(auto_now=True, description="更新时间")

View File

@ -39,7 +39,7 @@ class ValuationAssessmentBase(BaseModel):
offline_activities: Optional[str] = Field(None, description="近12个月线下相关宣讲活动次数")
offline_teaching_count: Optional[int] = Field(None, description="近12个月线下相关演讲活动次数")
online_accounts: Optional[List[Any]] = Field(None, description="线上相关宣传账号信息")
platform_accounts: Optional[Dict[str, Dict[str, int]]] = Field(None, description="线上相关宣传账号信息")
platform_accounts: Optional[Dict[str, Dict[str, Union[str, int]]]] = Field(None, description="线上相关宣传账号信息")
# 非遗资产衍生商品信息
sales_volume: Optional[str] = Field(None, description="该商品近12个月销售量")

View File

@ -18,9 +18,9 @@ class Settings(BaseSettings):
DEBUG: bool = True
# 服务器配置
SERVER_HOST: str = "124.222.245.240"
SERVER_HOST: str = "https://value.cdcee.net"
SERVER_PORT: int = 9999
BASE_URL: str = f"http://{SERVER_HOST}:8080"
BASE_URL: str = f"{SERVER_HOST}"
PROJECT_ROOT: str = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
BASE_DIR: str = os.path.abspath(os.path.join(PROJECT_ROOT, os.pardir))