Compare commits

..

2 Commits

Author SHA1 Message Date
f298acb431 Merge branch 'main' of https://git.1024tool.vip/zfc/guzhi 2025-10-10 16:34:02 +08:00
2322dbad00 feat: 添加估值计算功能并优化相关逻辑
refactor(valuation): 重构估值计算参数提取逻辑
fix: 修复行业均值S2计算中的除零错误
feat(api): 新增微信指数计算工具和行业数据查询工具
feat(schema): 在估值模型中添加计算结果字段
refactor: 优化动态质押率计算中的月交易额解析
fix: 处理日期解析异常时返回默认值
docs: 更新API文档中的估值计算请求示例
2025-10-10 16:33:59 +08:00
14 changed files with 1036 additions and 92 deletions

View File

@ -1,5 +1,8 @@
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from typing import Optional from typing import Optional, List, Dict, Any
import json
import asyncio
import time
from app.controllers.user_valuation import user_valuation_controller from app.controllers.user_valuation import user_valuation_controller
from app.schemas.valuation import ( from app.schemas.valuation import (
@ -9,89 +12,431 @@ from app.schemas.valuation import (
UserValuationOut, UserValuationOut,
UserValuationDetail UserValuationDetail
) )
from app.models.user import AppUser
from app.schemas.base import Success, SuccessExtra from app.schemas.base import Success, SuccessExtra
from app.utils.app_user_jwt import get_current_app_user from app.utils.app_user_jwt import get_current_app_user_id, get_current_app_user
from app.utils.calculation_engine import FinalValueACalculator
from app.utils.calculation_engine.drp import DynamicPledgeRateCalculator
from app.utils.calculation_engine.economic_value_b1.sub_formulas.basic_value_b11 import calculate_popularity_score, \
calculate_infringement_score, calculate_patent_usage_score
from app.utils.calculation_engine.economic_value_b1.sub_formulas.traffic_factor_b12 import calculate_search_index_s1
from app.log.log import logger
from app.models.esg import ESG
from app.models.industry import Industry
from app.models.policy import Policy
from app.models.user import AppUser
from app.utils.universal_api_manager import universal_api
from app.utils.wechat_index_calculator import wechat_index_calculator
app_valuations_router = APIRouter(tags=["用户端估值评估"]) app_valuations_router = APIRouter(tags=["用户端估值评估"])
@app_valuations_router.post("/", summary="创建估值评估") @app_valuations_router.post("/", summary="创建估值评估")
async def create_valuation( async def calculate_valuation(
data: UserValuationCreate, data: UserValuationCreate,
current_user: AppUser = Depends(get_current_app_user) user_id: int = Depends(get_current_app_user_id)
): ):
""" """
用户创建估值评估申请 计算估值评估
根据用户提交的估值评估数据调用计算引擎进行经济价值B1计算
请求示例JSON (仅包含用户填写部分):
{
"asset_name": "传统刺绣工艺",
"institution": "某文化传承机构",
"industry": "传统手工艺",
// 财务状况 (用户填写)
"annual_revenue": "500", // 近12个月机构营收/万元
"rd_investment": "50", // 近12个月机构研发投入/万元
"three_year_income": [400, 450, 500], // 近三年机构收益/万元
// 非遗等级与技术 (用户填写)
"inheritor_level": "国家级传承人", // 非遗传承人等级
"inheritor_ages": [45, 60, 75], // 传承人年龄列表
"heritage_level": "国家级", // 非遗等级
"patent_application_no": "CN202310123456.7", // 专利申请号
"patent_remaining_years": "15", // 专利剩余年限
"historical_evidence": { // 历史证明证据及数量
"历史文献": 3,
"考古发现": 2,
"传承谱系": 5
},
"pattern_images": ["demo.jpg"], // 非遗纹样图片
// 非遗应用与推广 (用户填写)
"application_maturity": "成熟应用", // 应用成熟度
"application_coverage": "全国覆盖", // 应用覆盖范围
"cooperation_depth": "0.5", // 跨界合作深度
"offline_activities": "12", // 近12个月线下宣讲活动次数
"online_accounts": [ // 线上宣传账号信息
{"platform": "抖音", "account": "传统刺绣大师"},
{"platform": "微博", "account": "非遗传承人"}
],
// 市场信息 (用户填写)
"sales_volume": "1000", // 近12个月销售量
"link_views": "5000", // 近12个月链接浏览量
"circulation": "限量", // 发行量
"last_market_activity": "2024-01-15", // 最近一次市场活动时间
"price_fluctuation": [95.0, 105.0], // 近30天价格波动区间
"manual_bids": [48000.0, 50000.0, 52000.0], // 手动收集的竞价列表
// 政策相关 (用户填写)
"funding_status": "国家级资助", // 资金支持情况
"implementation_stage": "成熟应用" // 实施阶段
}
API获取参数 (系统自动获取无需用户填写):
- 搜索指数: 百度微信微博搜索指数
- 社交媒体数据: 点赞数评论数转发数粉丝数
- 交易数据: 近3个月加权平均价格
- 热度数据: 近7日日均浏览量收藏数
- ESG评分: 根据行业自动匹配
- 行业系数: 根据行业ROE计算
- 政策匹配度: 根据行业自动匹配
- 专利验证: 通过API验证专利有效性
- 侵权记录: 通过API查询侵权诉讼历史
""" """
try: try:
start_ts = time.monotonic()
logger.info("valuation.calc_start user_id={} asset_name={} industry={}", user_id, getattr(data, 'asset_name', None), getattr(data, 'industry', None))
# 根据行业查询 ESG 基准分(优先用行业名称匹配,如用的是行业代码就把 name 改成 code
esg_obj = None
industry_obj = None
policy_obj = None
try:
esg_obj = await asyncio.wait_for(ESG.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
logger.warning("valuation.esg_fetch_timeout industry={} err={}", data.industry, repr(e))
esg_score = float(getattr(esg_obj, 'number', 0.0) or 0.0)
# 根据行业查询 行业修正系数与ROE
try:
industry_obj = await asyncio.wait_for(Industry.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
logger.warning("valuation.industry_fetch_timeout industry={} err={}", data.industry, repr(e))
fix_num_score = getattr(industry_obj, 'fix_num', 0.0) or 0.0
# 根据行业查询 政策匹配度
try:
policy_obj = await asyncio.wait_for(Policy.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
logger.warning("valuation.policy_fetch_timeout industry={} err={}", data.industry, repr(e))
policy_match_score = getattr(policy_obj, 'score', 0.0) or 0.0
# 提取 经济价值B1 计算参数
input_data_by_b1 = await _extract_calculation_params_b1(data)
# ESG关联价值 ESG分 (0-10分)
input_data_by_b1["esg_score"] = esg_score
# 行业修正系数I
input_data_by_b1["industry_coefficient"] = fix_num_score
# 政策匹配度
input_data_by_b1["policy_match_score"] = policy_match_score
# 获取专利信息 TODO 参数
try:
patent_data = universal_api.query_patent_info("未找到 企业名称、企业统代、企业注册号")
patent_dict = patent_data if isinstance(patent_data, dict) else {}
inner_data = patent_dict.get("data", {}) if isinstance(patent_dict.get("data", {}), dict) else {}
data_list = inner_data.get("dataList", [])
data_list = data_list if isinstance(data_list, list) else []
# 验证 专利剩余年限
# TODO 无法验证 专利剩余保护期>10年(10分)5-10年(7分)<5年(3分)
# 发展潜力D相关参数 专利数量
# 查询匹配申请号的记录集合
matched = [item for item in data_list if isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)]
if matched:
patent_count = calculate_patent_usage_score(len(matched))
input_data_by_b1["patent_count"] = float(patent_count)
else:
input_data_by_b1["patent_count"] = 0.0
input_data_by_b1["patent_score"] = 0
except Exception as e:
logger.warning("valuation.patent_api_error err={}", repr(e))
input_data_by_b1["patent_count"] = 0.0
# 提取 文化价值B2 计算参数
input_data_by_b2 = await _extract_calculation_params_b2(data)
# 提取 风险调整系数B3 计算参数
input_data_by_b3 = await _extract_calculation_params_b3(data)
# 提取 市场估值C 参数
input_data_by_c = await _extract_calculation_params_c(data)
input_data = {
# 模型估值B 相关参数
"model_data": {
# 经济价值B1 参数
"economic_data": input_data_by_b1,
# 文化价值B2 参数
"cultural_data": input_data_by_b2,
# 风险调整参数 B3
"risky_data": input_data_by_b3,
},
# 市场估值C 参数
"market_data": input_data_by_c,
}
calculator = FinalValueACalculator()
# 计算最终估值A统一计算
calculation_result = calculator.calculate_complete_final_value_a(input_data)
# 计算动态质押
drp_c = DynamicPledgeRateCalculator()
'''
monthly_amount (float): 月交易额万元
heritage_level (str): 非遗等级
'''
# 解析月交易额字符串为数值
monthly_amount = drp_c.parse_monthly_transaction_amount(data.monthly_transaction_amount or "")
drp_result = drp_c.calculate_dynamic_pledge_rate(monthly_amount, data.heritage_asset_level)
# 结构化日志:关键分值
try:
duration_ms = int((time.monotonic() - start_ts) * 1000)
logger.info(
"valuation.calc_done user_id={} duration_ms={} model_value_b={} market_value_c={} final_value_ab={}",
user_id,
duration_ms,
calculation_result.get('model_value_b'),
calculation_result.get('market_value_c'),
calculation_result.get('final_value_ab'),
)
except Exception:
pass
# 创建估值评估记录
result = await user_valuation_controller.create_valuation( result = await user_valuation_controller.create_valuation(
user_id=current_user.id, user_id=user_id,
data=data data=data,
calculation_result=calculation_result,
calculation_input={
'model_data': {
'economic_data': list(input_data.get('model_data', {}).get('economic_data', {}).keys()),
'cultural_data': list(input_data.get('model_data', {}).get('cultural_data', {}).keys()),
'risky_data': list(input_data.get('model_data', {}).get('risky_data', {}).keys()),
},
'market_data': list(input_data.get('market_data', {}).keys()),
},
drp_result=drp_result
) )
# 使用model_dump_json()来正确序列化datetime然后解析为dict
import json # 组装返回
result_dict = json.loads(result.model_dump_json()) result_dict = json.loads(result.model_dump_json())
# 开始计算 估值 信息 # "calculation_result": {
# 1 # 经济价值B1模块: EconomicValueB1Calculator | BasicValueB11Calculator | TrafficFactorB12Calculator | PolicyMultiplierB13Calculator # "model_value_b": 660.1534497474814,
# 1.1 EconomicValueB1Calculator # "market_value_c": 8800.0,
# input_data = { # "final_value_ab": 3102.107414823237
# # 基础价值B11相关参数 # }
# 'three_year_income': data.three_year_income, result_dict['calculation_result'] = calculation_result
# 'patent_score': data.pa, # 专利分
# 'popularity_score': data.popularity_score, # 普及地域分值
# 'infringement_score': data.infringement_score, # 侵权分
# 'innovation_ratio': data.innovation_ratio,
# 'esg_score':data.esg_score,
# 'industry_coefficient':data.industry_coefficient,
# # 流量因子B12相关参数 result_dict['calculation_input'] = {
# 'search_index_s1': 4500.0, 'model_data': {
# 'industry_average_s2': 5000.0, 'economic_data': list(input_data.get('model_data', {}).get('economic_data', {}).keys()),
# # 'social_media_spread_s3': social_media_spread_s3, 'cultural_data': list(input_data.get('model_data', {}).get('cultural_data', {}).keys()),
# 'likes': 4, # 点赞 'risky_data': list(input_data.get('model_data', {}).get('risky_data', {}).keys()),
# 'comments': 5, # 评论 },
# 'shares': 6, # 转发 'market_data': list(input_data.get('market_data', {}).keys()),
# 'followers': 7, # 粉丝数 }
# 'click_count': 1000,# 点击量 return Success(data=result_dict, msg="估值计算完成")
# 'view_count': 100, # 内容浏览量
# # 政策乘数B13相关参数
# 'policy_match_score': 10.0, # 政策匹配度
# 'implementation_stage': 10.0, # 实施阶段评分
# 'funding_support': 10.0 # 资金支持度
# }
# 1.2 BasicValueB11Calculator
# 1.3 TrafficFactorB12Calculator
# 1.4 PolicyMultiplierB13Calculator
# 2 # 文化价值B2模块: CulturalValueB2Calculator | LivingHeritageB21Calculator | PatternGeneB22Calculator
# 2.1 CulturalValueB2Calculator
# 2.2 LivingHeritageB21Calculator
# 2.3 PatternGeneB22Calculator
# 3 # 风险调整系数B3模块: RiskAdjustmentB3Calculator
# 3.1 RiskAdjustmentB3Calculator
# 4 # 市场估值C模块: MarketValueCCalculator | MarketBiddingC1Calculator | HeatCoefficientC2Calculator | ScarcityMultiplierC3Calculator | TemporalDecayC4Calculator
# 4.1 MarketValueCCalculator
# 4.2 MarketBiddingC1Calculator
# 4.3 HeatCoefficientC2Calculator
# 4.4 ScarcityMultiplierC3Calculator
# 4.5 TemporalDecayC4Calculator
# 5 # 最终估值A模块: FinalValueACalculator
# 5.1 FinalValueACalculator
return Success(data=result_dict, msg="估值评估申请提交成功")
except Exception as e: except Exception as e:
raise HTTPException( import traceback
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, print(traceback.format_exc())
detail=f"创建估值评估失败: {str(e)}" logger.error("valuation.calc_failed user_id={} err={}", user_id, repr(e))
) raise HTTPException(status_code=400, detail=f"计算失败: {str(e)}")
async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str, Any]:
"""
从用户提交的数据中提取计算所需的参数
Args:
data: 用户提交的估值评估数据
Returns:
Dict: 计算所需的参数字典
"""
# 基础价值B11相关参数
# 财务价值所需数据 从近三年收益计算
three_year_income = data.three_year_income or [0, 0, 0]
# 法律强度L相关参数
# 普及地域分值 默认 7分
popularity_score = calculate_popularity_score(data.application_coverage)
# 侵权分 默认 6 TODO 需要使用第三方API 无侵权记录(10分),历史侵权已解决(6分),现存纠纷(2分)
infringement_score = calculate_infringement_score("无侵权信息")
infringement_score = 0
# 创新投入比 = (研发费用/营收) * 100
try:
rd_investment = float(data.rd_investment or 0)
annual_revenue = float(data.annual_revenue or 1) # 避免除零
innovation_ratio = (rd_investment / annual_revenue) * 100 if annual_revenue > 0 else 0
except (ValueError, TypeError):
innovation_ratio = 0.0
# 流量因子B12相关参数
# 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API
baidu_index = 0
# 获取微信指数并计算近30天平均值
try:
1/0
wechat_index_response = universal_api.wx_index(data.asset_name)
wechat_index = wechat_index_calculator.process_wechat_index_response(wechat_index_response)
logger.info(f"资产 '{data.asset_name}' 的微信指数近30天平均值: {wechat_index}")
except Exception as e:
logger.error(f"获取微信指数失败: {e}")
wechat_index = 0
weibo_index = 0
search_index_s1 = calculate_search_index_s1(baidu_index,wechat_index,weibo_index) # 默认值实际应从API获取
# 行业均值S2 - 从数据库查询行业数据计算
from app.utils.industry_calculator import calculate_industry_average_s2
industry_average_s2 = await calculate_industry_average_s2(data.industry)
# 社交媒体传播度S3 - TODO 需要使用第三方API,click_count view_count 未找到对应参数
# likes: 点赞数(API获取)
# comments: 评论数(API获取)
# shares: 转发数(API获取)
# followers: 粉丝数
# click_count: 商品链接点击量(用户填写) sales_volume 使用 sales_volume 销售量
# view_count: 内容浏览量(用户填写) link_views
# 政策乘数B13相关参数
# 政策契合度评分P - 根据行业和资助情况计算
# 实施阶段 - 需要转换为对应的评分
implementation_stage_str = data.application_maturity or "成熟应用"
# 资金支持 - 需要转换为对应的评分
funding_support_str = data.funding_status or "无资助"
# 使用PolicyMultiplierB13Calculator来计算评分
from app.utils.calculation_engine.economic_value_b1.sub_formulas.policy_multiplier_b13 import PolicyMultiplierB13Calculator
policy_calculator = PolicyMultiplierB13Calculator()
implementation_stage = policy_calculator.calculate_implementation_stage_score(implementation_stage_str)
funding_support = policy_calculator.calculate_funding_support_score(funding_support_str)
return {
# 基础价值B11相关参数
'three_year_income': three_year_income,
'popularity_score': popularity_score,
'infringement_score': infringement_score,
'innovation_ratio': innovation_ratio,
# 流量因子B12相关参数
'search_index_s1': search_index_s1,
'industry_average_s2': industry_average_s2,
'social_media_spread_s3': 0.0,
# 这些社交数据暂未来源置为0后续接入API再填充
'likes': 0,
'comments': 0,
'shares': 0,
# followers 非当前计算用键,先移除避免干扰
# click_count 与 view_count 目前未参与计算,先移除
# 政策乘数B13相关参数
'implementation_stage': implementation_stage,
'funding_support':funding_support
}
# 获取 文化价值B2 相关参数
async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str, Any]:
"""
argrg:
data: 用户提交的估值评估数据
retus:
Dict: 计算所需的参数字典
"""
# 导入计算器来转换传承人等级
from app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21 import LivingHeritageB21Calculator
# 活态传承系数B21 县官参数
living_heritage_calculator = LivingHeritageB21Calculator()
inheritor_level = data.inheritor_level or "市级传承人" # 设置默认值
inheritor_level_coefficient = living_heritage_calculator.calculate_inheritor_level_coefficient(inheritor_level)
offline_sessions = int(data.offline_activities) #线下传习次数
# 以下调用API douyin\bilibili\kuaishou
douyin_views = 0
kuaishou_views= 0
bilibili_views= 0
# 跨界合作深度 品牌联名0.3科技载体0.5国家外交礼品1.0
cross_border_depth = float(data.cooperation_depth)
# 纹样基因值B22相关参数
# 以下三项需由后续模型/服务计算;此处提供默认可计算占位
historical_inheritance = 0.8
structure_complexity = 0.75
normalized_entropy = 0.85
return {
"inheritor_level_coefficient": inheritor_level_coefficient,
"offline_sessions": offline_sessions,
"douyin_views": douyin_views,
"kuaishou_views": kuaishou_views,
"bilibili_views": bilibili_views,
"cross_border_depth": cross_border_depth,
"historical_inheritance": historical_inheritance,
"structure_complexity": structure_complexity,
"normalized_entropy": normalized_entropy,
}
# 获取 文化价值B2 相关参数
async def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]:
# 过去30天最高价格 过去30天最低价格 TODO 需要根据字样进行切分获取最高价和最低价 转换成 float 类型
highest_price,lowest_price= data.price_fluctuation
lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取)
inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表
return {
"highest_price": highest_price,
"lowest_price": lowest_price,
"lawsuit_status": lawsuit_status,
"inheritor_ages": inheritor_ages,
}
# 获取 市场估值C 相关参数
async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str, Any]:
# 市场竞价C1 TODO 暂无
# transaction_data: 交易数据字典(API获取)
# manual_bids: 手动收集的竞价列表(用户填写)
# expert_valuations: 专家估值列表(系统配置)
transaction_data: Dict = None
manual_bids: List[float] = None
expert_valuations: List[float] = None
# 浏览热度分 TODO 需要先确定平台信息
daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位)
collection_count = 50 # 收藏数(默认占位)
# 稀缺性乘数C3 发行量
circulation = data.circulation or '限量'
# 时效性衰减C4 参数 用户选择距离最近一次市场活动(交易、报价、评估)的相距时间
recent_market_activity = data.last_market_activity
# 如果为空、None或"0",设置默认值
if not recent_market_activity or recent_market_activity == "0":
recent_market_activity = '2024-01-15'
return {
# 计算市场竞价C1
# C1 的实现接受 transaction_data={'weighted_average_price': x}
"weighted_average_price": transaction_data,
"manual_bids": manual_bids, # 手动收集的竞价列表 (用户填写)
"expert_valuations": expert_valuations, # 专家估值列表 (系统配置)
# 计算热度系数C2
"daily_browse_volume": daily_browse_volume, # 近7日日均浏览量 (API获取)
"collection_count": collection_count, # 收藏数
"issuance_level": circulation, # 默认 限量发行 计算稀缺性乘数C3
"recent_market_activity": recent_market_activity, # 默认 '近一月' 计算市场估值C
}
@app_valuations_router.get("/", summary="获取我的估值评估列表") @app_valuations_router.get("/", summary="获取我的估值评估列表")
async def get_my_valuations( async def get_my_valuations(
query: UserValuationQuery = Depends(), query: UserValuationQuery = Depends(),

View File

@ -0,0 +1,261 @@
{
'category': [
{
'type': 0,
'word': '全部'
},
{
'type': 7,
'word': '视频号'
},
{
'type': 2,
'word': '文章'
},
{
'type': 1,
'word': '公众号'
},
{
'type': 262208,
'word': '小程序'
},
{
'extra_kvs': [
{
'key': 'modelName',
'value': 'hunyuan-t1-turbos'
},
{
'key': 'modelType',
'value': 't1'
},
{
'key': 'fastModelType',
'value': 'fast'
}
],
'type': 53,
'word': 'AI搜索'
},
{
'type': 9,
'word': '直播'
},
{
'type': 1024,
'word': '读书'
},
{
'type': 512,
'word': '听一听'
},
{
'type': 384,
'word': '表情'
},
{
'type': 16777728,
'word': '百科'
},
{
'type': 16384,
'word': '新闻'
},
{
'selected': 1,
'type': 8192,
'word': '微信指数'
},
{
'type': 33562630,
'word': '划线'
},
{
'type': 8,
'word': '朋友圈'
}
],
'continueFlag': 0,
'cookies': '{"box_offset":0,"businessType":8192,"cookies_buffer":"UisIYhABGIBAIgzotYTkuqflkI3np7BQAYIBBRAAogEAsgECGACaAwRCAHIA","doc_offset":0,"dup_bf":"","isHomepage":0,"page_cnt":1,"query":"资产名称","scene":98}\n',
'data': [
{
'boxID': '0x40000-15048-',
'boxPos': 1,
'boxPosMerge': 1,
'count': 1,
'items': [
{
'docID': '1-7123206500956315942',
'itemInfo': {
'data': [
{
'index': '113',
'timestamp': 1757433600
},
{
'index': '51',
'timestamp': 1757520000
},
{
'index': '112',
'timestamp': 1757606400
},
{
'index': '26',
'timestamp': 1757692800
},
{
'index': '24',
'timestamp': 1757779200
},
{
'index': '118',
'timestamp': 1757865600
},
{
'index': '46',
'timestamp': 1757952000
},
{
'index': '90',
'timestamp': 1758038400
},
{
'index': '200',
'timestamp': 1758124800
},
{
'index': '70',
'timestamp': 1758211200
},
{
'index': '23',
'timestamp': 1758297600
},
{
'index': '1',
'timestamp': 1758384000
},
{
'index': '44',
'timestamp': 1758470400
},
{
'index': '93',
'timestamp': 1758556800
},
{
'index': '91',
'timestamp': 1758643200
},
{
'index': '6',
'timestamp': 1758729600
},
{
'index': '46',
'timestamp': 1758816000
},
{
'index': '1',
'timestamp': 1758902400
},
{
'index': '66',
'timestamp': 1758988800
},
{
'index': '45',
'timestamp': 1759075200
},
{
'index': '23',
'timestamp': 1759161600
},
{
'index': '22',
'timestamp': 1759248000
},
{
'index': '0',
'timestamp': 1759334400
},
{
'index': '0',
'timestamp': 1759420800
},
{
'index': '22',
'timestamp': 1759507200
},
{
'index': '22',
'timestamp': 1759593600
},
{
'index': '23',
'timestamp': 1759680000
},
{
'index': '1',
'timestamp': 1759766400
},
{
'index': '0',
'timestamp': 1759852800
},
{
'index': '24',
'timestamp': 1759939200
}
],
'title': '资产名称 - 微信指数',
'type': 1058
},
'jumpInfo': {
'jumpType': 2,
'reportId': 'chart:chart:317451',
'userName': 'gh_935b85261f35@app',
'weappUrl': '/page/detail/detail?calculate_word=资产名称'
},
'source': {
'iconUrl': 'http://mmbiz.qpic.cn/mmbiz_png/uic3vZ5pI29sdd0tUo5t4pxmJMsMO9iaxM8EOxTX1VUAWjQibib0juc81kMkUgtB9IibIg58TZocULicPJ7lichq8E1VQ/640?wx_fmt=png&wxfrom=200',
'title': '微信指数'
},
'title': ''
}
],
'real_type': 262144,
'subType': 15048,
'type': 52
}
],
'direction': 2,
'enableQuestionClosely': False,
'experiment': [
{
'key': 's1s_all_prefetch_next_page',
'value': '2'
},
{
'key': 'mmsearch_exp_financial_style',
'value': '2'
}
],
'isAutoPlayVideo': 1,
'isDivide': 0,
'isHomePage': 0,
'lang': 'zh_CN',
'nomoreText': '没有更多的搜索结果',
'offset': 20,
'pageNumber': 1,
'query': '资产名称',
'resultType': 0,
'ret': 0,
'searchID': '1448000677067408154',
'timeStamp': 1760081917,
'code': 0,
'cost_money': 0.5,
'remain_money': 28.5
}

View File

@ -191,7 +191,9 @@ async def calculate_valuation(
monthly_amount (float): 月交易额万元 monthly_amount (float): 月交易额万元
heritage_level (str): 非遗等级 heritage_level (str): 非遗等级
''' '''
drp_result = drp_c.calculate_dynamic_pledge_rate(float(data.monthly_transaction_amount),data.heritage_asset_level) # 解析月交易额字符串为数值
monthly_amount = drp_c.parse_monthly_transaction_amount(data.monthly_transaction_amount or "")
drp_result = drp_c.calculate_dynamic_pledge_rate(monthly_amount, data.heritage_asset_level)
# 结构化日志:关键分值 # 结构化日志:关键分值
try: try:
@ -209,7 +211,17 @@ async def calculate_valuation(
# 创建估值评估记录 # 创建估值评估记录
result = await user_valuation_controller.create_valuation( result = await user_valuation_controller.create_valuation(
user_id=user_id, user_id=user_id,
data=data data=data,
calculation_result=calculation_result,
calculation_input={
'model_data': {
'economic_data': list(input_data.get('model_data', {}).get('economic_data', {}).keys()),
'cultural_data': list(input_data.get('model_data', {}).get('cultural_data', {}).keys()),
'risky_data': list(input_data.get('model_data', {}).get('risky_data', {}).keys()),
},
'market_data': list(input_data.get('market_data', {}).keys()),
},
drp_result=drp_result
) )
# 组装返回 # 组装返回
@ -319,8 +331,14 @@ async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str,
retus: retus:
Dict: 计算所需的参数字典 Dict: 计算所需的参数字典
""" """
# 导入计算器来转换传承人等级
from app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21 import LivingHeritageB21Calculator
# 活态传承系数B21 县官参数 # 活态传承系数B21 县官参数
inheritor_level_coefficient = data.inheritor_level # 传承人等级 living_heritage_calculator = LivingHeritageB21Calculator()
inheritor_level = data.inheritor_level or "市级传承人" # 设置默认值
inheritor_level_coefficient = living_heritage_calculator.calculate_inheritor_level_coefficient(inheritor_level)
offline_sessions = int(data.offline_activities) #线下传习次数 offline_sessions = int(data.offline_activities) #线下传习次数
# 以下调用API douyin\bilibili\kuaishou # 以下调用API douyin\bilibili\kuaishou
douyin_views = 0 douyin_views = 0
@ -377,7 +395,10 @@ async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str,
circulation = data.circulation or '限量' circulation = data.circulation or '限量'
# 时效性衰减C4 参数 用户选择距离最近一次市场活动(交易、报价、评估)的相距时间 # 时效性衰减C4 参数 用户选择距离最近一次市场活动(交易、报价、评估)的相距时间
recent_market_activity = data.last_market_activity or '2024-01-15' recent_market_activity = data.last_market_activity
# 如果为空、None或"0",设置默认值
if not recent_market_activity or recent_market_activity == "0":
recent_market_activity = '2024-01-15'
return { return {
# 计算市场竞价C1 # 计算市场竞价C1
# C1 的实现接受 transaction_data={'weighted_average_price': x} # C1 的实现接受 transaction_data={'weighted_average_price': x}

View File

@ -18,12 +18,25 @@ class UserValuationController:
def __init__(self): def __init__(self):
self.model = ValuationAssessment self.model = ValuationAssessment
async def create_valuation(self, user_id: int, data: UserValuationCreate) -> UserValuationDetail: async def create_valuation(self, user_id: int, data: UserValuationCreate, calculation_result: dict = None, calculation_input: dict = None, drp_result: float = None) -> UserValuationDetail:
"""用户创建估值评估""" """用户创建估值评估"""
valuation_data = data.model_dump() valuation_data = data.model_dump()
valuation_data['user_id'] = user_id valuation_data['user_id'] = user_id
valuation_data['status'] = 'pending' # 默认状态为待审核 valuation_data['status'] = 'pending' # 默认状态为待审核
# 添加计算结果到数据库
if calculation_result:
valuation_data['model_value_b'] = calculation_result.get('model_value_b')
valuation_data['market_value_c'] = calculation_result.get('market_value_c')
valuation_data['final_value_ab'] = calculation_result.get('final_value_ab')
valuation_data['calculation_result'] = calculation_result
if calculation_input:
valuation_data['calculation_input'] = calculation_input
if drp_result is not None:
valuation_data['dynamic_pledge_rate'] = drp_result
valuation = await self.model.create(**valuation_data) valuation = await self.model.create(**valuation_data)
return await self._to_user_detail(valuation) return await self._to_user_detail(valuation)
@ -141,6 +154,13 @@ class UserValuationController:
legal_risk=valuation.legal_risk, legal_risk=valuation.legal_risk,
base_pledge_rate=valuation.base_pledge_rate, base_pledge_rate=valuation.base_pledge_rate,
flow_correction=valuation.flow_correction, flow_correction=valuation.flow_correction,
# 添加计算结果字段
model_value_b=valuation.model_value_b,
market_value_c=valuation.market_value_c,
final_value_ab=valuation.final_value_ab,
dynamic_pledge_rate=valuation.dynamic_pledge_rate,
calculation_result=valuation.calculation_result,
calculation_input=valuation.calculation_input,
status=valuation.status, status=valuation.status,
admin_notes=valuation.admin_notes, admin_notes=valuation.admin_notes,
created_at=valuation.created_at, created_at=valuation.created_at,

View File

@ -68,6 +68,14 @@ class ValuationAssessment(Model):
base_pledge_rate = fields.CharField(max_length=50, null=True, description="基础质押率") base_pledge_rate = fields.CharField(max_length=50, null=True, description="基础质押率")
flow_correction = fields.CharField(max_length=50, null=True, description="流量修正系数") flow_correction = fields.CharField(max_length=50, null=True, description="流量修正系数")
# 计算结果字段
model_value_b = fields.FloatField(null=True, description="模型估值B万元")
market_value_c = fields.FloatField(null=True, description="市场估值C万元")
final_value_ab = fields.FloatField(null=True, description="最终估值AB万元")
dynamic_pledge_rate = fields.FloatField(null=True, description="动态质押率")
calculation_result = fields.JSONField(null=True, description="完整计算结果JSON")
calculation_input = fields.JSONField(null=True, description="计算输入参数JSON")
# 系统字段 # 系统字段
user = fields.ForeignKeyField("models.AppUser", related_name="valuations", description="提交用户") user = fields.ForeignKeyField("models.AppUser", related_name="valuations", description="提交用户")
status = fields.CharField(max_length=20, default="pending", description="评估状态: pending(待审核), approved(已通过), rejected(已拒绝)") status = fields.CharField(max_length=20, default="pending", description="评估状态: pending(待审核), approved(已通过), rejected(已拒绝)")

View File

@ -66,6 +66,14 @@ class ValuationAssessmentBase(BaseModel):
base_pledge_rate: Optional[str] = Field(None, description="基础质押率") base_pledge_rate: Optional[str] = Field(None, description="基础质押率")
flow_correction: Optional[str] = Field(None, description="流量修正系数") flow_correction: Optional[str] = Field(None, description="流量修正系数")
# 计算结果字段
model_value_b: Optional[float] = Field(None, description="模型估值B万元")
market_value_c: Optional[float] = Field(None, description="市场估值C万元")
final_value_ab: Optional[float] = Field(None, description="最终估值AB万元")
dynamic_pledge_rate: Optional[float] = Field(None, description="动态质押率")
calculation_result: Optional[Dict[str, Any]] = Field(None, description="完整计算结果JSON")
calculation_input: Optional[Dict[str, Any]] = Field(None, description="计算输入参数JSON")
class ValuationAssessmentCreate(ValuationAssessmentBase): class ValuationAssessmentCreate(ValuationAssessmentBase):
"""创建估值评估模型""" """创建估值评估模型"""

View File

@ -18,9 +18,9 @@ class Settings(BaseSettings):
DEBUG: bool = True DEBUG: bool = True
# 服务器配置 # 服务器配置
SERVER_HOST: str = "127.0.0.1" SERVER_HOST: str = "124.222.245.240"
SERVER_PORT: int = 9999 SERVER_PORT: int = 9999
BASE_URL: str = f"http://{SERVER_HOST}:{SERVER_PORT}" BASE_URL: str = f"http://{SERVER_HOST}:8080"
PROJECT_ROOT: str = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) PROJECT_ROOT: str = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
BASE_DIR: str = os.path.abspath(os.path.join(PROJECT_ROOT, os.pardir)) BASE_DIR: str = os.path.abspath(os.path.join(PROJECT_ROOT, os.pardir))

View File

@ -127,15 +127,6 @@ class APIConfig:
"description": "该接口用于获取指定B站视频的详细信息包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等", "description": "该接口用于获取指定B站视频的详细信息包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等",
"required_params": ["token", "bvid"], "required_params": ["token", "bvid"],
"optional_params": [] "optional_params": []
},
"jizhiliao_index_search": {
"path": "/fbmain/monitor/v3/web_search",
"method": "POST",
"description": "极致聊指数搜索",
"api_key": os.getenv("JIZHILIAO_API_KEY", "JZL089ef0b7d0315d96"),
"verifycode": os.getenv("JIZHILIAO_VERIFYCODE", ""),
"required_params": ["keyword", "mode", "BusinessType", "sub_search_type"],
"optional_params": ["key", "verifycode"]
} }
} }
}, },
@ -148,9 +139,11 @@ class APIConfig:
"web_search": { "web_search": {
"path": "/fbmain/monitor/v3/web_search", "path": "/fbmain/monitor/v3/web_search",
"method": "POST", "method": "POST",
"description": "获取微信指数", "description": "极致聊指数搜索",
"required_params": ["keyword", "key"], "api_key": os.getenv("JIZHILIAO_API_KEY", "JZL089ef0b7d0315d96"),
"optional_params": [] "verifycode": os.getenv("JIZHILIAO_VERIFYCODE", ""),
"required_params": ["keyword", "mode", "BusinessType", "sub_search_type"],
"optional_params": ["key", "verifycode"]
} }
} }
}, },

View File

@ -66,6 +66,47 @@ class DynamicPledgeRateCalculator:
else: else:
return 0.0 return 0.0
def parse_monthly_transaction_amount(self, monthly_transaction_str: str) -> float:
"""
解析月交易额字符串转换为数值万元
参数:
monthly_transaction_str (str): 月交易额字符串"月交易额<100万元""100-500万元"
返回:
float: 月交易额数值万元
"""
if not monthly_transaction_str:
return 0.0
# 清理字符串,移除空格和常见前缀
clean_str = monthly_transaction_str.strip().replace("月交易额", "").replace("万元", "")
# 处理不同的格式
if "<100" in clean_str or "小于100" in clean_str:
return 50.0 # 取中间值
elif "100-500" in clean_str or "100万-500万" in clean_str:
return 300.0 # 取中间值
elif ">500" in clean_str or "大于500" in clean_str or "≥500" in clean_str:
return 600.0 # 取一个合理值
elif "500-1000" in clean_str:
return 750.0 # 取中间值
elif ">1000" in clean_str or "大于1000" in clean_str:
return 1200.0 # 取一个合理值
else:
# 尝试提取数字
import re
numbers = re.findall(r'\d+', clean_str)
if numbers:
# 如果有多个数字,取平均值
if len(numbers) >= 2:
return (float(numbers[0]) + float(numbers[1])) / 2
else:
return float(numbers[0])
else:
# 默认值
return 50.0
def get_monthly_transaction_score(self, monthly_amount: float) -> float: def get_monthly_transaction_score(self, monthly_amount: float) -> float:
""" """
根据用户月交易额区间匹配评分 根据用户月交易额区间匹配评分

View File

@ -28,10 +28,10 @@ class TrafficFactorB12Calculator:
float: 流量因子B12 float: 流量因子B12
""" """
# 避免除零和对数计算错误 # 避免除零和对数计算错误
if industry_average_s2 <= 0: if industry_average_s2 == 0:
raise ValueError("行业均值S2必须大于0") raise ValueError("行业均值S2必须大于0")
if search_index_s1 <= 0: if search_index_s1 == 0:
# 如果搜索指数为0或负数使用最小值避免对数计算错误 # 如果搜索指数为0或负数使用最小值避免对数计算错误
search_index_s1 = 1.0 search_index_s1 = 1.0

View File

@ -45,6 +45,11 @@ class TemporalDecayC4Calculator:
# 处理输入日期 # 处理输入日期
if isinstance(target_date, str): if isinstance(target_date, str):
# 如果是数字字符串"0"或空字符串,使用默认日期
if target_date == "0" or target_date.strip() == "":
# 默认为一年前,返回"其他"
return "其他"
# 尝试不同的日期格式 # 尝试不同的日期格式
formats = ['%Y-%m-%d', '%Y/%m/%d', '%Y%m%d'] formats = ['%Y-%m-%d', '%Y/%m/%d', '%Y%m%d']
date_obj = None date_obj = None
@ -55,9 +60,15 @@ class TemporalDecayC4Calculator:
except ValueError: except ValueError:
continue continue
if date_obj is None: if date_obj is None:
raise ValueError(f"无法解析日期: {target_date}") # 如果无法解析日期,记录警告并返回默认值
print(f"警告: 无法解析日期 '{target_date}',使用默认值'其他'")
return "其他"
elif isinstance(target_date, datetime): elif isinstance(target_date, datetime):
date_obj = target_date.date() date_obj = target_date.date()
elif isinstance(target_date, (int, float)):
# 如果传入的是数字如0使用默认值
print(f"警告: 传入的日期参数是数字 {target_date},使用默认值'其他'")
return "其他"
else: else:
date_obj = target_date date_obj = target_date

View File

@ -0,0 +1,100 @@
"""
行业数据查询和计算工具
"""
import logging
from typing import Optional, Dict, Any
from app.models.industry import Industry
logger = logging.getLogger(__name__)
async def get_industry_data_by_name(industry_name: str) -> Optional[Dict[str, Any]]:
"""
根据行业名称查询行业数据
Args:
industry_name: 行业名称
Returns:
包含行业数据的字典如果未找到则返回None
{
'code': '行业代码',
'name': '行业名称',
'roe': 平均ROE,
'fix_num': 行业修正系数,
'remark': '备注'
}
"""
try:
industry = await Industry.filter(name=industry_name).first()
if industry:
return {
'code': industry.code,
'name': industry.name,
'roe': industry.roe,
'fix_num': industry.fix_num,
'remark': industry.remark
}
else:
logger.warning(f"未找到行业数据: {industry_name}")
return None
except Exception as e:
logger.error(f"查询行业数据失败: {industry_name}, 错误: {str(e)}")
return None
async def calculate_industry_average_s2(industry_name: str) -> float:
"""
计算行业均值S2
Args:
industry_name: 行业名称
Returns:
行业均值S2如果查询失败则返回0.0
"""
try:
industry_data = await get_industry_data_by_name(industry_name)
if industry_data:
# S2 = ROE * 修正系数
roe = industry_data.get('roe', 0.0)
fix_num = industry_data.get('fix_num', 0.0)
s2_value = roe * fix_num
# 确保S2值为正数避免对数计算错误
if s2_value <= 0:
logger.warning(f"行业 {industry_name} S2计算值为负数或零: ROE={roe}, 修正系数={fix_num}, S2={s2_value}使用默认值0.01")
s2_value = 0.01 # 使用小的正数避免对数计算错误
logger.info(f"行业 {industry_name} S2计算: ROE={roe}, 修正系数={fix_num}, S2={s2_value}")
return s2_value
else:
logger.warning(f"未找到行业 {industry_name} 的数据返回默认值0.01")
return 0.01 # 返回小的正数而不是0.0
except Exception as e:
logger.error(f"计算行业均值S2失败: {industry_name}, 错误: {str(e)}")
return 0.01 # 返回小的正数而不是0.0
async def get_all_industries() -> list:
"""
获取所有行业列表
Returns:
行业列表
"""
try:
industries = await Industry.all()
return [
{
'code': industry.code,
'name': industry.name,
'roe': industry.roe,
'fix_num': industry.fix_num,
'remark': industry.remark
}
for industry in industries
]
except Exception as e:
logger.error(f"获取行业列表失败: {str(e)}")
return []

View File

@ -237,11 +237,12 @@ class UniversalAPIManager:
prepared_params['token'] = api_key prepared_params['token'] = api_key
elif provider == 'dajiala': elif provider == 'dajiala':
# 微信指数 # 微信指数/极致聊接口需要 key 参数
# JustOneAPI需要token参数 endpoint_config = self._get_endpoint_config(provider, endpoint)
api_key = provider_config.get('api_key') if endpoint_config:
if api_key: api_key = endpoint_config.get('api_key')
prepared_params['key'] = api_key if api_key:
prepared_params['key'] = api_key
elif provider == 'jizhiliao': elif provider == 'jizhiliao':
# 极致聊接口需要 key默认从配置读取 # 极致聊接口需要 key默认从配置读取
@ -325,6 +326,9 @@ class UniversalAPIManager:
} }
response = self.session.post(url, json=prepared_params, headers=headers, timeout=timeout) response = self.session.post(url, json=prepared_params, headers=headers, timeout=timeout)
elif provider == 'dajiala':
# dajiala是微信指数/极致聊接口使用普通POST请求
response = self.session.post(url, json=prepared_params, timeout=timeout)
else: else:
response = self.session.post(url, json=prepared_params, timeout=timeout) response = self.session.post(url, json=prepared_params, timeout=timeout)
else: else:
@ -365,6 +369,7 @@ class UniversalAPIManager:
"sub_search_type": 0, "sub_search_type": 0,
"verifycode": "" "verifycode": ""
} }
return self.make_request('dajiala', 'web_search', data) return self.make_request('dajiala', 'web_search', data)
def video_views(self,platform_type,video_id): def video_views(self,platform_type,video_id):

View File

@ -0,0 +1,131 @@
"""
微信指数计算工具
用于处理微信指数API返回的数据并计算平均值
"""
from typing import Dict, Any, Optional, List
import logging
logger = logging.getLogger(__name__)
class WeChatIndexCalculator:
"""微信指数计算器"""
@staticmethod
def extract_index_data(api_response: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
从微信指数API响应中提取指数数据
Args:
api_response: 微信指数API的响应数据
Returns:
包含指数和时间戳的数据列表
"""
try:
# 检查响应状态
if api_response.get('code') != 0:
logger.error(f"微信指数API返回错误: {api_response}")
return []
# 提取数据路径: data[0].items[0].itemInfo.data
data_list = api_response.get('data', [])
if not data_list:
logger.warning("微信指数响应中没有数据")
return []
first_data = data_list[0]
items = first_data.get('items', [])
if not items:
logger.warning("微信指数响应中没有items数据")
return []
first_item = items[0]
item_info = first_item.get('itemInfo', {})
index_data = item_info.get('data', [])
if not index_data:
logger.warning("微信指数响应中没有指数数据")
return []
logger.info(f"成功提取到 {len(index_data)} 条微信指数数据")
return index_data
except Exception as e:
logger.error(f"提取微信指数数据时发生错误: {e}")
return []
@staticmethod
def calculate_30_day_average(index_data: List[Dict[str, Any]]) -> float:
"""
计算近30天微信指数平均值
Args:
index_data: 包含指数和时间戳的数据列表
Returns:
近30天的平均指数值
"""
try:
if not index_data:
logger.warning("没有指数数据用于计算平均值")
return 0.0
# 取最近30条数据如果数据不足30条则使用所有数据
recent_data = index_data[-30:] if len(index_data) >= 30 else index_data
# 提取指数值并转换为数字
index_values = []
for item in recent_data:
try:
index_str = item.get('index', '0')
index_value = float(index_str) if index_str else 0.0
index_values.append(index_value)
except (ValueError, TypeError) as e:
logger.warning(f"无法转换指数值 '{item.get('index')}': {e}")
index_values.append(0.0)
if not index_values:
logger.warning("没有有效的指数值用于计算")
return 0.0
# 计算平均值
total_sum = sum(index_values)
count = len(index_values)
average = total_sum / count
logger.info(f"计算微信指数平均值: {count}天数据,总和={total_sum},平均值={average:.2f}")
return round(average, 2)
except Exception as e:
logger.error(f"计算微信指数平均值时发生错误: {e}")
return 0.0
@classmethod
def process_wechat_index_response(cls, api_response: Dict[str, Any]) -> float:
"""
处理微信指数API响应返回近30天平均值
Args:
api_response: 微信指数API的完整响应
Returns:
近30天微信指数平均值
"""
try:
# 提取指数数据
index_data = cls.extract_index_data(api_response)
# 计算平均值
average_index = cls.calculate_30_day_average(index_data)
return average_index
except Exception as e:
logger.error(f"处理微信指数响应时发生错误: {e}")
return 0.0
# 创建全局实例
wechat_index_calculator = WeChatIndexCalculator()