guzhi/app/api/v1/app_valuations/app_valuations.py
2025-12-04 14:44:23 +08:00

944 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from random import random
import statistics
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from typing import Optional, List, Dict, Any
import json
import asyncio
import time
from app.controllers.user_valuation import user_valuation_controller
from app.controllers.valuation import valuation_controller
from app.schemas.valuation import ValuationAssessmentUpdate
from app.schemas.valuation import (
UserValuationCreate,
UserValuationQuery,
UserValuationList,
UserValuationOut,
UserValuationDetail
)
from app.schemas.base import Success, BasicResponse
from app.utils.app_user_jwt import get_current_app_user_id, get_current_app_user
from app.utils.calculation_engine import FinalValueACalculator
# from app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21 import cross_border_depth_dict
from app.utils.calculation_engine.drp import DynamicPledgeRateCalculator
# from app.utils.calculation_engine.economic_value_b1.sub_formulas.basic_value_b11 import calculate_popularity_score
from app.utils.calculation_engine.economic_value_b1.sub_formulas.traffic_factor_b12 import calculate_search_index_s1
from app.log.log import logger
from app.models.esg import ESG
from app.models.industry import Industry
from app.models.policy import Policy
from app.models.user import AppUser
from app.utils.universal_api_manager import universal_api
from app.utils.wechat_index_calculator import wechat_index_calculator
app_valuations_router = APIRouter(tags=["用户端估值评估"])
async def _perform_valuation_calculation(user_id: int, data: UserValuationCreate):
"""
后台任务:执行估值计算
"""
try:
start_ts = time.monotonic()
logger.info("valuation.calc_start user_id={} asset_name={} industry={}", user_id,
getattr(data, 'asset_name', None), getattr(data, 'industry', None))
# 根据行业查询 ESG 基准分(优先用行业名称匹配,如用的是行业代码就把 name 改成 code
esg_obj = None
industry_obj = None
policy_obj = None
try:
esg_obj = await ESG.filter(name=data.industry).first()
except Exception as e:
logger.warning("valuation.esg_fetch_timeout industry={} err={}", data.industry, repr(e))
esg_score = float(getattr(esg_obj, 'number', 0.0) or 0.0)
# 根据行业查询 行业修正系数与ROE
try:
industry_obj = await asyncio.wait_for(Industry.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
logger.warning("valuation.industry_fetch_timeout industry={} err={}", data.industry, repr(e))
fix_num_score = getattr(industry_obj, 'fix_num', 0.0) or 0.0
# 根据行业查询 政策匹配度
try:
policy_obj = await asyncio.wait_for(Policy.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
logger.warning("valuation.policy_fetch_timeout industry={} err={}", data.industry, repr(e))
policy_match_score = getattr(policy_obj, 'score', 0.0) or 0.0
# 提取 经济价值B1 计算参数
input_data_by_b1 = await _extract_calculation_params_b1(
data, esg_score=esg_score, industry_coefficient=fix_num_score, policy_match_score=policy_match_score
)
# ESG关联价值 ESG分 (0-10分)
input_data_by_b1["esg_score"] = esg_score
# 行业修正系数I
input_data_by_b1["industry_coefficient"] = fix_num_score
# 政策匹配度
input_data_by_b1["policy_match_score"] = policy_match_score
# 侵权分 默认 6
try:
judicial_data = universal_api.query_judicial_data(data.institution)
_data = judicial_data["data"].get("target", None) # 诉讼标的
if _data:
infringement_score = 0.0
else:
infringement_score = 10.0
except:
infringement_score = 0.0
input_data_by_b1["infringement_score"] = infringement_score
# 获取专利信息 TODO 参数
try:
patent_data = universal_api.query_patent_info(data.industry)
except Exception as e:
logger.warning("valuation.patent_api_error err={}", repr(e))
input_data_by_b1["patent_count"] = 0.0
input_data_by_b1["patent_score"] = 0.0
patent_dict = patent_data if isinstance(patent_data, dict) else {}
inner_data = patent_dict.get("data", {}) if isinstance(patent_dict.get("data", {}), dict) else {}
data_list = inner_data.get("dataList", [])
data_list = data_list if isinstance(data_list, list) else []
# 验证 专利剩余年限
# 发展潜力D相关参数 专利数量
# 查询匹配申请号的记录集合
matched = [item for item in data_list if
isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)]
if matched:
patent_count_score = min(len(matched) * 2.5, 10.0)
input_data_by_b1["patent_count"] = float(patent_count_score)
else:
input_data_by_b1["patent_count"] = 0.0
years_total = calculate_total_years(data_list)
if years_total > 10:
patent_score = 10.0
elif years_total >= 5:
patent_score = 7.0
else:
patent_score = 3.0
input_data_by_b1["patent_score"] = patent_score
# 提取 文化价值B2 计算参数
input_data_by_b2 = await _extract_calculation_params_b2(data)
# 提取 风险调整系数B3 计算参数
input_data_by_b3 = await _extract_calculation_params_b3(data)
input_data_by_b3["lawsuit_status"]=infringement_score
# 提取 市场估值C 参数
input_data_by_c = await _extract_calculation_params_c(data)
input_data = {
# 模型估值B 相关参数
"model_data": {
# 经济价值B1 参数
"economic_data": input_data_by_b1,
# 文化价值B2 参数
"cultural_data": input_data_by_b2,
# 风险调整参数 B3
"risky_data": input_data_by_b3,
},
# 市场估值C 参数
"market_data": input_data_by_c,
}
calculator = FinalValueACalculator()
# 先创建估值记录以获取ID方便步骤落库关联
initial_detail = await user_valuation_controller.create_valuation(
user_id=user_id,
data=data,
calculation_result=None,
calculation_input=None,
drp_result=None,
status='pending'
)
valuation_id = initial_detail.id
logger.info("valuation.init_created user_id={} valuation_id={}", user_id, valuation_id)
# 步骤1立即更新计算输入参数不管后续是否成功
try:
await valuation_controller.update_calc(
valuation_id,
ValuationAssessmentUpdate(
calculation_input=input_data,
)
)
logger.info("valuation.input_updated valuation_id={}", valuation_id)
except Exception as e:
logger.warning("valuation.failed_to_update_input valuation_id={} err={}", valuation_id, repr(e))
# 步骤1.5更新内置API计算字段
try:
# 准备内置API计算字段的值
api_calc_fields = {}
# ESG关联价值
api_calc_fields["esg_value"] = str(esg_score) if esg_score is not None else None
# 政策匹配度
api_calc_fields["policy_matching"] = str(policy_match_score) if policy_match_score is not None else None
# 侵权记录/法律风险
infringement_record_value = "有侵权记录" if infringement_score == 0.0 else "无侵权记录"
api_calc_fields["infringement_record"] = infringement_record_value
api_calc_fields["legal_risk"] = infringement_record_value
# 专利使用量
patent_count_value = input_data_by_b1.get("patent_count", 0.0)
api_calc_fields["patent_count"] = str(patent_count_value) if patent_count_value is not None else None
# 结构复杂度纹样基因熵值B22
structure_complexity_value = input_data_by_b2.get("structure_complexity", 1.5)
api_calc_fields["pattern_complexity"] = str(structure_complexity_value) if structure_complexity_value is not None else None
# 归一化信息熵H
normalized_entropy_value = input_data_by_b2.get("normalized_entropy", 9)
api_calc_fields["normalized_entropy"] = str(normalized_entropy_value) if normalized_entropy_value is not None else None
# 线上课程点击量暂时没有计算逻辑设为None或默认值
# api_calc_fields["online_course_views"] = None
# 基础质押率和流量修正系数暂时没有计算逻辑设为None或默认值
# api_calc_fields["base_pledge_rate"] = None
# api_calc_fields["flow_correction"] = None
if api_calc_fields:
await valuation_controller.update_calc(
valuation_id,
ValuationAssessmentUpdate(**api_calc_fields)
)
logger.info("valuation.api_calc_fields_updated valuation_id={} fields={}", valuation_id, list(api_calc_fields.keys()))
except Exception as e:
logger.warning("valuation.failed_to_update_api_calc_fields valuation_id={} err={}", valuation_id, repr(e))
# 计算最终估值A统一计算传入估值ID以关联步骤落库
calculation_result = await calculator.calculate_complete_final_value_a(valuation_id, input_data)
# 步骤2更新计算结果字段模型估值B、市场估值C、最终估值AB、完整计算结果
try:
await valuation_controller.update_calc(
valuation_id,
ValuationAssessmentUpdate(
model_value_b=calculation_result.get('model_value_b'),
market_value_c=calculation_result.get('market_value_c'),
final_value_ab=calculation_result.get('final_value_ab'),
calculation_result=calculation_result,
status='pending',
)
)
logger.info(
"valuation.result_updated valuation_id={} model_b={} market_c={} final_ab={}",
valuation_id,
calculation_result.get('model_value_b'),
calculation_result.get('market_value_c'),
calculation_result.get('final_value_ab'),
)
except Exception as e:
logger.warning("valuation.failed_to_update_result valuation_id={} err={}", valuation_id, repr(e))
# 计算动态质押
drp_c = DynamicPledgeRateCalculator()
'''
monthly_amount (float): 月交易额(万元)
heritage_level (str): 非遗等级
'''
# 解析月交易额字符串为数值
monthly_amount = drp_c.parse_monthly_transaction_amount(data.monthly_transaction_amount or "")
drp_start_ts = time.monotonic()
drp_result = drp_c.calculate_dynamic_pledge_rate(monthly_amount, data.heritage_asset_level)
drp_duration_ms = int((time.monotonic() - drp_start_ts) * 1000)
# 记录动态质押率计算步骤
await valuation_controller.log_formula_step(
valuation_id,
"DYNAMIC_PLEDGE_RATE",
status="completed",
input_params={
"monthly_transaction_amount": data.monthly_transaction_amount,
"monthly_amount": monthly_amount,
"heritage_asset_level": data.heritage_asset_level,
},
output_result={
"dynamic_pledge_rate": drp_result,
"duration_ms": drp_duration_ms,
},
)
logger.info("valuation.drp_calculated valuation_id={} drp={} duration_ms={}", valuation_id, drp_result, drp_duration_ms)
# 步骤3更新动态质押率及相关字段
try:
# 从动态质押率计算器中获取基础质押率和流量修正系数
base_pledge_rate_value = "0.5" # 固定值:基础质押率 = 0.5
flow_correction_value = "0.3" # 固定值:流量修正系数 = 0.3
await valuation_controller.update_calc(
valuation_id,
ValuationAssessmentUpdate(
dynamic_pledge_rate=drp_result,
base_pledge_rate=base_pledge_rate_value,
flow_correction=flow_correction_value,
)
)
logger.info("valuation.drp_updated valuation_id={} drp={} base_rate={} flow_correction={}",
valuation_id, drp_result, base_pledge_rate_value, flow_correction_value)
except Exception as e:
logger.warning("valuation.failed_to_update_drp valuation_id={} err={}", valuation_id, repr(e))
# 结构化日志:关键分值
try:
duration_ms = int((time.monotonic() - start_ts) * 1000)
logger.info(
"valuation.calc_done user_id={} duration_ms={} model_value_b={} market_value_c={} final_value_ab={}",
user_id,
duration_ms,
calculation_result.get('model_value_b'),
calculation_result.get('market_value_c'),
calculation_result.get('final_value_ab'),
)
except Exception:
pass
# 步骤4计算完成保持状态为 pending等待后台审核
try:
result = await valuation_controller.get_by_id(valuation_id)
logger.info("valuation.calc_finished valuation_id={} status=pending", valuation_id)
except Exception as e:
logger.warning("valuation.failed_to_fetch_after_calc valuation_id={} err={}", valuation_id, repr(e))
result = None
logger.info("valuation.background_calc_success user_id={} valuation_id={}", user_id, valuation_id)
except Exception as e:
import traceback
print(traceback.format_exc())
logger.error("valuation.background_calc_failed user_id={} err={}", user_id, repr(e))
# 计算失败时更新记录为失败状态
try:
if 'valuation_id' in locals():
# 准备失败时需要更新的字段
fail_update_fields = {"status": "rejected"}
# 如果 input_data 已经准备好,确保 calculation_input 被更新(即使计算失败)
if 'input_data' in locals():
fail_update_fields["calculation_input"] = input_data
# 如果内置API计算字段已经准备好也尝试更新即使计算失败
# 这些字段在步骤1.5中计算如果步骤1.5执行了,这些变量应该已经存在
api_calc_fields = {}
if 'esg_score' in locals():
api_calc_fields["esg_value"] = str(esg_score) if esg_score is not None else None
if 'policy_match_score' in locals():
api_calc_fields["policy_matching"] = str(policy_match_score) if policy_match_score is not None else None
if 'infringement_score' in locals():
infringement_record_value = "有侵权记录" if infringement_score == 0.0 else "无侵权记录"
api_calc_fields["infringement_record"] = infringement_record_value
api_calc_fields["legal_risk"] = infringement_record_value
if 'input_data_by_b1' in locals():
patent_count_value = input_data_by_b1.get("patent_count", 0.0)
api_calc_fields["patent_count"] = str(patent_count_value) if patent_count_value is not None else None
if 'input_data_by_b2' in locals():
structure_complexity_value = input_data_by_b2.get("structure_complexity", 1.5)
api_calc_fields["pattern_complexity"] = str(structure_complexity_value) if structure_complexity_value is not None else None
normalized_entropy_value = input_data_by_b2.get("normalized_entropy", 9)
api_calc_fields["normalized_entropy"] = str(normalized_entropy_value) if normalized_entropy_value is not None else None
# 合并所有需要更新的字段
fail_update_fields.update(api_calc_fields)
try:
await valuation_controller.update_calc(
valuation_id,
ValuationAssessmentUpdate(**fail_update_fields)
)
logger.info("valuation.failed_but_fields_saved valuation_id={} fields={}", valuation_id, list(fail_update_fields.keys()))
except Exception as input_err:
logger.warning("valuation.failed_to_save_fields_on_error valuation_id={} err={}", valuation_id, repr(input_err))
# 如果保存失败,至少更新状态
try:
fail_update = ValuationAssessmentUpdate(status='rejected')
await valuation_controller.update_calc(valuation_id, fail_update)
except Exception:
pass
else:
# 如果 valuation_id 都不存在,说明在创建记录时就失败了,无法更新
logger.warning("valuation.failed_before_creation user_id={}", user_id)
except Exception as create_error:
logger.error("valuation.failed_to_update_record user_id={} err={}", user_id, repr(create_error))
@app_valuations_router.post("/", summary="创建估值评估", response_model=BasicResponse[dict])
async def calculate_valuation(
background_tasks: BackgroundTasks,
data: UserValuationCreate,
user_id: int = Depends(get_current_app_user_id)
):
"""
创建估值评估任务
根据用户提交的估值评估数据启动后台计算任务进行经济价值B1计算
请求示例JSON (仅包含用户填写部分):
{
"asset_name": "传统刺绣工艺",
"institution": "某文化传承机构",
"industry": "传统手工艺",
// 财务状况 (用户填写)
"annual_revenue": "500", // 近12个月机构营收/万元
"rd_investment": "50", // 近12个月机构研发投入/万元
"three_year_income": [400, 450, 500], // 近三年机构收益/万元
// 非遗等级与技术 (用户填写)
"inheritor_level": "国家级传承人", // 非遗传承人等级
"inheritor_ages": [45, 60, 75], // 传承人年龄列表
"heritage_level": "国家级", // 非遗等级
"patent_application_no": "CN202310123456.7", // 专利申请号
"patent_remaining_years": "15", // 专利剩余年限
"historical_evidence": { // 历史证明证据及数量
"历史文献": 3,
"考古发现": 2,
"传承谱系": 5
},
"pattern_images": ["demo.jpg"], // 非遗纹样图片
// 非遗应用与推广 (用户填写)
"application_maturity": "成熟应用", // 应用成熟度
"application_coverage": "全国覆盖", // 应用覆盖范围
"cooperation_depth": "0.5", // 跨界合作深度
"offline_activities": "12", // 近12个月线下宣讲活动次数
"online_accounts": [ // 线上宣传账号信息
{"platform": "抖音", "account": "传统刺绣大师"},
{"platform": "微博", "account": "非遗传承人"}
],
// 市场信息 (用户填写)
"sales_volume": "1000", // 近12个月销售量
"link_views": "5000", // 近12个月链接浏览量
"circulation": "限量", // 发行量
"last_market_activity": "2024-01-15", // 最近一次市场活动时间
"price_fluctuation": [95.0, 105.0], // 近30天价格波动区间
"manual_bids": [48000.0, 50000.0, 52000.0], // 手动收集的竞价列表
// 政策相关 (用户填写)
"funding_status": "国家级资助", // 资金支持情况
"implementation_stage": "成熟应用" // 实施阶段
}
API获取参数 (系统自动获取,无需用户填写):
- 搜索指数: 百度、微信、微博搜索指数
- 社交媒体数据: 点赞数、评论数、转发数、粉丝数
- 交易数据: 近3个月加权平均价格
- 热度数据: 近7日日均浏览量、收藏数
- ESG评分: 根据行业自动匹配
- 行业系数: 根据行业ROE计算
- 政策匹配度: 根据行业自动匹配
- 专利验证: 通过API验证专利有效性
- 侵权记录: 通过API查询侵权诉讼历史
"""
try:
from app.models.user import AppUser, AppUserQuotaLog
user = await AppUser.filter(id=user_id).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
if (user.remaining_quota or 0) < 1:
raise HTTPException(status_code=400, detail="估值次数不足")
before = user.remaining_quota or 0
user.remaining_quota = before - 1
await user.save()
try:
await AppUserQuotaLog.create(
app_user_id=user_id,
operator_id=user_id,
operator_name=user.alias or user.username or user.phone or "",
before_count=before,
after_count=before - 1,
remark="发起估值"
)
except Exception:
pass
background_tasks.add_task(_perform_valuation_calculation, user_id, data)
logger.info("valuation.task_queued user_id={} asset_name={} industry={}",
user_id, getattr(data, 'asset_name', None), getattr(data, 'industry', None))
return Success(
data={
"task_status": "queued",
"message": "估值计算任务已提交,正在后台处理中",
"user_id": user_id,
"asset_name": getattr(data, 'asset_name', None)
}
)
except Exception as e:
logger.error("valuation.task_queue_failed user_id={} err={}", user_id, repr(e))
raise HTTPException(status_code=500, detail=f"任务提交失败: {str(e)}")
async def _extract_calculation_params_b1(
data: UserValuationCreate,
esg_score: float = 0.0,
industry_coefficient: float = 0.0,
policy_match_score: float = 0.0,
) -> Dict[str, Any]:
"""
从用户提交的数据中提取计算所需的参数
Args:
data: 用户提交的估值评估数据
Returns:
Dict: 计算所需的参数字典
"""
# 基础价值B11相关参数
# 财务价值所需数据 从近三年收益计算
three_year_income = [safe_float(income) for income in data.three_year_income]
# 法律强度L相关参数
# 普及地域分值 默认 7分
# 普及地域分:全球覆盖(10)、全国覆盖(7)、区域覆盖(4),默认全国覆盖(7)
try:
coverage = data.application_coverage or "全国覆盖"
mapping = {"全球覆盖": 10.0, "全国覆盖": 7.0, "区域覆盖": 4.0}
popularity_score = mapping.get(coverage, 7.0)
except Exception:
popularity_score = 7.0
# 创新投入比 = (研发费用/营收) * 100
try:
rd_investment = float(data.rd_investment) or 0
annual_revenue = float(data.annual_revenue) or 1 # 避免除零
innovation_ratio = (rd_investment / annual_revenue) * 100 if annual_revenue > 0 else 0
except (ValueError, TypeError):
innovation_ratio = 0.0
# 流量因子B12相关参数
# 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API
baidu_index = 1
# 获取微信指数并计算近30天平均值
try:
wechat_index_response = universal_api.wx_index(data.asset_name)
wechat_index = wechat_index_calculator.process_wechat_index_response(wechat_index_response)
logger.info(f"资产 '{data.asset_name}' 的微信指数近30天平均值: {wechat_index}")
except Exception as e:
logger.error(f"获取微信指数失败: {e}")
wechat_index = 1
weibo_index = 1
search_index_s1 = calculate_search_index_s1(baidu_index, wechat_index, weibo_index) # 默认值实际应从API获取
# 行业均值S2 - 从数据库查询行业数据计算
from app.utils.industry_calculator import calculate_industry_average_s2
industry_average_s2 = await calculate_industry_average_s2(data.industry)
# 社交媒体传播度S3
# likes: 点赞数(API获取)
# comments: 评论数(API获取)
# shares: 转发数(API获取)
# followers: 粉丝数
# click_count: 商品链接点击量(用户填写) sales_volume 使用 sales_volume 销售量
# view_count: 内容浏览量(用户填写) link_views
# 政策乘数B13相关参数
# 政策契合度评分P - 根据行业和资助情况计算
# 实施阶段 - 需要转换为对应的评分
implementation_stage_str = data.implementation_stage or "成熟应用"
# 资金支持 - 需要转换为对应的评分
funding_support_str = data.funding_status or "无资助"
# 使用PolicyMultiplierB13Calculator来计算评分
from app.utils.calculation_engine.economic_value_b1.sub_formulas.policy_multiplier_b13 import \
PolicyMultiplierB13Calculator
policy_calculator = PolicyMultiplierB13Calculator()
implementation_stage = policy_calculator.calculate_implementation_stage_score(implementation_stage_str)
funding_support = policy_calculator.calculate_funding_support_score(funding_support_str)
# 获取线下账号 转发 点赞 评论信息 {kuaishou: {account: "123456789", likes: "33", comments: "33", shares: "33"}}
platform_accounts_data = data.platform_accounts
platform_key = next(iter(platform_accounts_data)) # 或 list(data.keys())[0]
info = platform_accounts_data[platform_key]
return {
# 基础价值B11相关参数
'three_year_income': three_year_income,
'popularity_score': popularity_score,
'innovation_ratio': innovation_ratio,
# 流量因子B12相关参数
'search_index_s1': search_index_s1,
'industry_average_s2': industry_average_s2,
# 'social_media_spread_s3': 0.0,
# 这些社交数据暂未来源置为0后续接入API再填充
'likes': safe_float(info["likes"]),
'comments': safe_float(info["comments"]),
'shares': safe_float(info["shares"]),
# followers 非当前计算用键,先移除避免干扰
# click_count 与 view_count 目前未参与计算,先移除
'sales_volume': safe_float(data.sales_volume),#
'link_views': safe_float(data.link_views),
# 政策乘数B13相关参数
'implementation_stage': implementation_stage,
'funding_support': funding_support,
'esg_score': safe_float(esg_score),
'industry_coefficient': safe_float(industry_coefficient),
'policy_match_score': safe_float(policy_match_score),
}
# 获取 文化价值B2 相关参数
async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str, Any]:
"""
argrg:
data: 用户提交的估值评估数据
retus:
Dict: 计算所需的参数字典
"""
# 导入计算器来转换传承人等级
from app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21 import \
LivingHeritageB21Calculator
# 活态传承系数B21 县官参数
living_heritage_calculator = LivingHeritageB21Calculator()
inheritor_level = data.inheritor_level or "市级传承人" # 设置默认值
inheritor_level_coefficient = living_heritage_calculator.calculate_inheritor_level_coefficient(inheritor_level)
offline_sessions = safe_float(data.offline_activities) # 线下传习次数
platform_accounts_data = data.platform_accounts
rs = {}
for platform, info in platform_accounts_data.items():
rs[platform] = {
"likes": safe_float(info.get("likes")),
}
# 以下调用API douyin\bilibili\kuaishou
douyin_views = safe_float(rs.get("douyin", None).get("likes", 0)) if rs.get("douyin", None) else 0
kuaishou_views = safe_float(rs.get("kuaishou", None).get("likes", 0)) if rs.get("kuaishou", None) else 0
bilibili_views = safe_float(rs.get("bilibili", None).get("likes", 0)) if rs.get("bilibili", None) else 0
# 跨界合作深度:将枚举映射为项目数;若为数值字符串则直接取数值
try:
val = getattr(data, 'cooperation_depth', None)
mapping = {
"品牌联名": 3.0,
"科技载体": 5.0,
"国家外交礼品": 10.0,
}
if isinstance(val, str):
cross_border_depth = mapping.get(val, safe_float(val))
else:
cross_border_depth = safe_float(val)
except Exception:
cross_border_depth = 0.0
# 纹样基因值B22相关参数
# 以下三项需由后续模型/服务计算;此处提供默认可计算占位
#
# 历史传承度HI(用户填写)
historical_inheritance = 0.0
try:
if isinstance(data.historical_evidence, dict):
historical_inheritance = sum([safe_float(v) for v in data.historical_evidence.values()])
elif isinstance(data.historical_evidence, (list, tuple)):
historical_inheritance = sum([safe_float(i) for i in data.historical_evidence])
except Exception:
historical_inheritance = 0.0
structure_complexity = 1.5 # 默认值 纹样基因熵值B22(系统计算)
normalized_entropy = 9 # 默认值 归一化信息熵H(系统计算)
logger.info(
"b2.params inheritor_level_coefficient={} offline_sessions={} douyin_views={} kuaishou_views={} bilibili_views={} cross_border_depth={} historical_inheritance={} structure_complexity={} normalized_entropy={}",
inheritor_level_coefficient,
offline_sessions,
douyin_views,
kuaishou_views,
bilibili_views,
cross_border_depth,
historical_inheritance,
structure_complexity,
normalized_entropy,
)
return {
"inheritor_level_coefficient": inheritor_level_coefficient,
"offline_sessions": offline_sessions,
"douyin_views": douyin_views,
"kuaishou_views": kuaishou_views,
"bilibili_views": bilibili_views,
"cross_border_depth": cross_border_depth,
"historical_inheritance": historical_inheritance,
"structure_complexity": structure_complexity,
"normalized_entropy": normalized_entropy,
}
# 获取 文化价值B2 相关参数
async def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]:
# 过去30天最高价格 过去30天最低价格 TODO 需要根据字样进行切分获取最高价和最低价 转换成 float 类型
price_fluctuation = [float(i) for i in data.price_fluctuation]
highest_price, lowest_price = max(price_fluctuation), min(price_fluctuation)
# lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取)
inheritor_ages = [float(i) for i in data.inheritor_age_count] # [45, 60, 75] # 传承人年龄列表
return {
"highest_price": highest_price,
"lowest_price": lowest_price,
"inheritor_ages": inheritor_ages,
}
# 获取 市场估值C 相关参数
async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str, Any]:
# 市场竞价C1 TODO 暂无
# transaction_data: 交易数据字典(API获取)
# manual_bids: 手动收集的竞价列表(用户填写)
# expert_valuations: 专家估值列表(系统配置)
transaction_data: Dict = {}
manual_bids: List[float] = []
# 处理月交易额波动区间的三个关键数值:最高价、最低价、中位数
# 已实现从data.price_fluctuation中提取并计算三个数值
price_fluctuation_median = 0 # 中位数
price_fluctuation_max = 0 # 最高价
price_fluctuation_min = 0 # 最低价
if hasattr(data, 'price_fluctuation') and data.price_fluctuation:
try:
# 将price_fluctuation转换为浮点数列表
price_values = [float(i) for i in data.price_fluctuation if i is not None]
if price_values:
price_fluctuation_max = max(price_values)
manual_bids.append(price_fluctuation_max)
price_fluctuation_min = min(price_values)
manual_bids.append(price_fluctuation_min)
price_fluctuation_median = statistics.median(price_values)
manual_bids.append(price_fluctuation_median)
except (ValueError, TypeError) as e:
# 如果转换失败,记录日志但不中断程序
logger.warning(f"价格波动区间数据转换失败: {e}")
expert_valuations = []
# 浏览热度分 - 优化数据获取逻辑
daily_browse_volume = 0 # 默认值
# 第一优先级使用用户填写的该商品近12个月的链接浏览量
if hasattr(data, 'link_views') and data.link_views:
try:
# 尝试将字符串转换为浮点数
daily_browse_volume = float(data.link_views)
except (ValueError, TypeError):
# 如果转换失败,继续使用其他数据源
pass
# 第二优先级如果没有link_views尝试从平台账户数据中获取
if daily_browse_volume == 0 and hasattr(data, 'platform_accounts') and data.platform_accounts:
try:
platform_accounts_data = data.platform_accounts
# 获取第一个平台的数据
first_platform_key = next(iter(platform_accounts_data))
first_platform_data = platform_accounts_data[first_platform_key]
# 尝试获取浏览量
platform_views = first_platform_data.get("views", "0")
daily_browse_volume = float(platform_views)
except (ValueError, TypeError, StopIteration, AttributeError):
# 如果获取失败,保持默认值
daily_browse_volume = 0
# 收藏数 - 尝试从平台账户数据中获取
collection_count = 0 # 默认值
if hasattr(data, 'platform_accounts') and data.platform_accounts:
try:
platform_accounts_data = data.platform_accounts
# 获取第一个平台的数据
first_platform_key = next(iter(platform_accounts_data))
first_platform_data = platform_accounts_data[first_platform_key]
# 尝试获取收藏数
platform_likes = first_platform_data.get("likes", "0")
collection_count = int(platform_likes)
except (ValueError, TypeError, StopIteration, AttributeError):
# 如果获取失败,保持默认值
collection_count = 0
# 稀缺性乘数C3 发行量
circulation = data.scarcity_level or '限量'
# 时效性衰减C4 参数 用户选择距离最近一次市场活动(交易、报价、评估)的相距时间
recent_market_activity = data.market_activity_time
# 如果为空、None或"0",设置默认值
if not recent_market_activity or recent_market_activity == "0":
# recent_market_activity = '2024-01-15'
recent_market_activity = '近一月'
return {
# 计算市场竞价C1
# C1 的实现接受 transaction_data={'weighted_average_price': x}
"weighted_average_price": transaction_data,
"manual_bids": manual_bids, # 手动收集的竞价列表 (用户填写)
"expert_valuations": expert_valuations, # 专家估值列表 (系统配置)
# 计算热度系数C2
"daily_browse_volume": daily_browse_volume, # 近7日日均浏览量 (API获取)
"collection_count": collection_count, # 收藏数
"issuance_level": circulation, # 默认 限量发行 计算稀缺性乘数C3
"recent_market_activity": recent_market_activity, # 默认 '近一月' 计算市场估值C
}
@app_valuations_router.get("/", summary="获取我的估值评估列表", response_model=BasicResponse[dict])
async def get_my_valuations(
query: UserValuationQuery = Depends(),
current_user: AppUser = Depends(get_current_app_user)
):
"""
获取当前用户的估值评估列表
"""
try:
result = await user_valuation_controller.get_user_valuations(
user_id=current_user.id,
query=query
)
# 使用model_dump_json()来正确序列化datetime然后解析为dict列表
import json
serialized_items = [json.loads(item.model_dump_json()) for item in result.items]
return Success(
data={
"items": serialized_items,
"total": result.total,
"page": result.page,
"page_size": result.size,
"pages": result.pages,
}
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取估值评估列表失败: {str(e)}"
)
@app_valuations_router.get("/{valuation_id}", summary="获取估值评估详情", response_model=BasicResponse[dict])
async def get_valuation_detail(
valuation_id: int,
current_user: AppUser = Depends(get_current_app_user)
):
"""
获取指定估值评估的详细信息
"""
try:
result = await user_valuation_controller.get_user_valuation_detail(
user_id=current_user.id,
valuation_id=valuation_id
)
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="估值评估记录不存在"
)
# 使用model_dump_json()来正确序列化datetime然后解析为dict
import json
result_dict = json.loads(result.model_dump_json())
return Success(data=result_dict)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取估值评估详情失败: {str(e)}"
)
@app_valuations_router.get("/statistics/overview", summary="获取我的估值评估统计", response_model=BasicResponse[dict])
async def get_my_valuation_statistics(
current_user: AppUser = Depends(get_current_app_user)
):
"""
获取当前用户的估值评估统计信息
"""
try:
result = await user_valuation_controller.get_user_valuation_statistics(
user_id=current_user.id
)
return Success(data=result)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取统计信息失败: {str(e)}"
)
@app_valuations_router.delete("/{valuation_id}", summary="删除估值评估", response_model=BasicResponse[dict])
async def delete_valuation(
valuation_id: int,
current_user: AppUser = Depends(get_current_app_user)
):
"""
删除指定的估值评估记录(软删除)
"""
try:
result = await user_valuation_controller.delete_user_valuation(
user_id=current_user.id,
valuation_id=valuation_id
)
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="估值评估记录不存在或已被删除"
)
return Success(data={"deleted": True})
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"删除估值评估失败: {str(e)}"
)
def calculate_total_years(data_list):
current_date = datetime.now().date()
total_years = 0
date_count = 0
for item in data_list:
if "SQRQ" in item and item["SQRQ"]:
try:
# 解析日期字符串
sqrq_date = datetime.strptime(item["SQRQ"], "%Y-%m-%d").date()
# 计算与当前日期的差值(年,包含小数)
date_diff = current_date - sqrq_date
years_diff = date_diff.days / 365.25
total_years += years_diff
date_count += 1
except ValueError as e:
return 0
return total_years
def safe_float(v):
try:
return float(v)
except (ValueError, TypeError):
return 0.0
from app.log.log import logger