This commit is contained in:
若拙_233 2025-10-10 22:53:23 +08:00
commit 664db4e491
15 changed files with 1183 additions and 188 deletions

View File

@ -1,5 +1,10 @@
from random import random
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from typing import Optional from typing import Optional, List, Dict, Any
import json
import asyncio
import time
from app.controllers.user_valuation import user_valuation_controller from app.controllers.user_valuation import user_valuation_controller
from app.schemas.valuation import ( from app.schemas.valuation import (
@ -9,93 +14,471 @@ from app.schemas.valuation import (
UserValuationOut, UserValuationOut,
UserValuationDetail UserValuationDetail
) )
from app.models.user import AppUser
from app.schemas.base import Success, SuccessExtra from app.schemas.base import Success, SuccessExtra
from app.utils.app_user_jwt import get_current_app_user from app.utils.app_user_jwt import get_current_app_user_id, get_current_app_user
from app.utils.calculation_engine import FinalValueACalculator
from app.utils.calculation_engine.drp import DynamicPledgeRateCalculator
from app.utils.calculation_engine.economic_value_b1.sub_formulas.basic_value_b11 import calculate_popularity_score, \
calculate_infringement_score, calculate_patent_usage_score
from app.utils.calculation_engine.economic_value_b1.sub_formulas.traffic_factor_b12 import calculate_search_index_s1
from app.log.log import logger
from app.models.esg import ESG
from app.models.industry import Industry
from app.models.policy import Policy
from app.models.user import AppUser
from app.utils.universal_api_manager import universal_api
from app.utils.wechat_index_calculator import wechat_index_calculator
app_valuations_router = APIRouter(tags=["用户端估值评估"]) app_valuations_router = APIRouter(tags=["用户端估值评估"])
@app_valuations_router.post("/", summary="创建估值评估") @app_valuations_router.post("/", summary="创建估值评估")
async def create_valuation( async def calculate_valuation(
data: UserValuationCreate, data: UserValuationCreate,
current_user: AppUser = Depends(get_current_app_user) user_id: int = Depends(get_current_app_user_id)
): ):
""" """
用户创建估值评估申请 计算估值评估
根据用户提交的估值评估数据调用计算引擎进行经济价值B1计算
请求示例JSON (仅包含用户填写部分):
{
"asset_name": "传统刺绣工艺",
"institution": "某文化传承机构",
"industry": "传统手工艺",
// 财务状况 (用户填写)
"annual_revenue": "500", // 近12个月机构营收/万元
"rd_investment": "50", // 近12个月机构研发投入/万元
"three_year_income": [400, 450, 500], // 近三年机构收益/万元
// 非遗等级与技术 (用户填写)
"inheritor_level": "国家级传承人", // 非遗传承人等级
"inheritor_ages": [45, 60, 75], // 传承人年龄列表
"heritage_level": "国家级", // 非遗等级
"patent_application_no": "CN202310123456.7", // 专利申请号
"patent_remaining_years": "15", // 专利剩余年限
"historical_evidence": { // 历史证明证据及数量
"历史文献": 3,
"考古发现": 2,
"传承谱系": 5
},
"pattern_images": ["demo.jpg"], // 非遗纹样图片
// 非遗应用与推广 (用户填写)
"application_maturity": "成熟应用", // 应用成熟度
"application_coverage": "全国覆盖", // 应用覆盖范围
"cooperation_depth": "0.5", // 跨界合作深度
"offline_activities": "12", // 近12个月线下宣讲活动次数
"online_accounts": [ // 线上宣传账号信息
{"platform": "抖音", "account": "传统刺绣大师"},
{"platform": "微博", "account": "非遗传承人"}
],
// 市场信息 (用户填写)
"sales_volume": "1000", // 近12个月销售量
"link_views": "5000", // 近12个月链接浏览量
"circulation": "限量", // 发行量
"last_market_activity": "2024-01-15", // 最近一次市场活动时间
"price_fluctuation": [95.0, 105.0], // 近30天价格波动区间
"manual_bids": [48000.0, 50000.0, 52000.0], // 手动收集的竞价列表
// 政策相关 (用户填写)
"funding_status": "国家级资助", // 资金支持情况
"implementation_stage": "成熟应用" // 实施阶段
}
API获取参数 (系统自动获取无需用户填写):
- 搜索指数: 百度微信微博搜索指数
- 社交媒体数据: 点赞数评论数转发数粉丝数
- 交易数据: 近3个月加权平均价格
- 热度数据: 近7日日均浏览量收藏数
- ESG评分: 根据行业自动匹配
- 行业系数: 根据行业ROE计算
- 政策匹配度: 根据行业自动匹配
- 专利验证: 通过API验证专利有效性
- 侵权记录: 通过API查询侵权诉讼历史
""" """
try: try:
start_ts = time.monotonic()
logger.info("valuation.calc_start user_id={} asset_name={} industry={}", user_id,
getattr(data, 'asset_name', None), getattr(data, 'industry', None))
# 根据行业查询 ESG 基准分(优先用行业名称匹配,如用的是行业代码就把 name 改成 code
esg_obj = None
industry_obj = None
policy_obj = None
try:
esg_obj = await asyncio.wait_for(ESG.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
logger.warning("valuation.esg_fetch_timeout industry={} err={}", data.industry, repr(e))
esg_score = float(getattr(esg_obj, 'number', 0.0) or 0.0)
# 根据行业查询 行业修正系数与ROE
try:
industry_obj = await asyncio.wait_for(Industry.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
logger.warning("valuation.industry_fetch_timeout industry={} err={}", data.industry, repr(e))
fix_num_score = getattr(industry_obj, 'fix_num', 0.0) or 0.0
# 根据行业查询 政策匹配度
try:
policy_obj = await asyncio.wait_for(Policy.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
logger.warning("valuation.policy_fetch_timeout industry={} err={}", data.industry, repr(e))
policy_match_score = getattr(policy_obj, 'score', 0.0) or 0.0
# 提取 经济价值B1 计算参数
input_data_by_b1 = await _extract_calculation_params_b1(data)
# ESG关联价值 ESG分 (0-10分)
input_data_by_b1["esg_score"] = esg_score
# 行业修正系数I
input_data_by_b1["industry_coefficient"] = fix_num_score
# 政策匹配度
input_data_by_b1["policy_match_score"] = policy_match_score
# 侵权分 默认 6
try:
judicial_data = universal_api.query_judicial_data(data.institution)
_data = judicial_data["data"]["count"]["ktggCount"]
if _data > 0:
infringement_score = 4.0
else:
infringement_score = 10.0
except:
infringement_score = 0.0
input_data_by_b1["infringement_score"] = infringement_score
# 获取专利信息 TODO 参数
try:
patent_data = universal_api.query_patent_info(data.industry)
patent_dict = patent_data if isinstance(patent_data, dict) else {}
inner_data = patent_dict.get("data", {}) if isinstance(patent_dict.get("data", {}), dict) else {}
data_list = inner_data.get("dataList", [])
data_list = data_list if isinstance(data_list, list) else []
# 验证 专利剩余年限
# TODO 无法验证 专利剩余保护期>10年(10分)5-10年(7分)<5年(3分)
# 发展潜力D相关参数 专利数量
# 查询匹配申请号的记录集合
matched = [item for item in data_list if
isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)]
if matched:
patent_count = calculate_patent_usage_score(len(matched))
input_data_by_b1["patent_count"] = float(patent_count)
else:
input_data_by_b1["patent_count"] = 0.0
input_data_by_b1["patent_score"] = 0
except Exception as e:
logger.warning("valuation.patent_api_error err={}", repr(e))
input_data_by_b1["patent_count"] = 0.0
# 提取 文化价值B2 计算参数
input_data_by_b2 = await _extract_calculation_params_b2(data)
# 提取 风险调整系数B3 计算参数
input_data_by_b3 = await _extract_calculation_params_b3(data)
if infringement_score == 10.0:
input_data_by_b3["lawsuit_status"] = "无诉讼状态"
if 0 < infringement_score < 4.0:
input_data_by_b3["lawsuit_status"] = "已解决诉讼"
else:
input_data_by_b3["lawsuit_status"] = "未解决诉讼"
# 提取 市场估值C 参数
input_data_by_c = await _extract_calculation_params_c(data)
input_data = {
# 模型估值B 相关参数
"model_data": {
# 经济价值B1 参数
"economic_data": input_data_by_b1,
# 文化价值B2 参数
"cultural_data": input_data_by_b2,
# 风险调整参数 B3
"risky_data": input_data_by_b3,
},
# 市场估值C 参数
"market_data": input_data_by_c,
}
calculator = FinalValueACalculator()
# 计算最终估值A统一计算
calculation_result = calculator.calculate_complete_final_value_a(input_data)
# 计算动态质押
drp_c = DynamicPledgeRateCalculator()
'''
monthly_amount (float): 月交易额万元
heritage_level (str): 非遗等级
'''
# 解析月交易额字符串为数值
monthly_amount = drp_c.parse_monthly_transaction_amount(data.monthly_transaction_amount or "")
drp_result = drp_c.calculate_dynamic_pledge_rate(monthly_amount, data.heritage_asset_level)
# 结构化日志:关键分值
try:
duration_ms = int((time.monotonic() - start_ts) * 1000)
logger.info(
"valuation.calc_done user_id={} duration_ms={} model_value_b={} market_value_c={} final_value_ab={}",
user_id,
duration_ms,
calculation_result.get('model_value_b'),
calculation_result.get('market_value_c'),
calculation_result.get('final_value_ab'),
)
except Exception:
pass
# 创建估值评估记录
result = await user_valuation_controller.create_valuation( result = await user_valuation_controller.create_valuation(
user_id=current_user.id, user_id=user_id,
data=data data=data,
calculation_result=calculation_result,
calculation_input={
'model_data': {
'economic_data': list(input_data.get('model_data', {}).get('economic_data', {}).keys()),
'cultural_data': list(input_data.get('model_data', {}).get('cultural_data', {}).keys()),
'risky_data': list(input_data.get('model_data', {}).get('risky_data', {}).keys()),
},
'market_data': list(input_data.get('market_data', {}).keys()),
},
drp_result=drp_result,
status='approved' # 计算成功设置为approved状态
) )
# 使用model_dump_json()来正确序列化datetime然后解析为dict
import json # 组装返回
result_dict = json.loads(result.model_dump_json()) result_dict = json.loads(result.model_dump_json())
# 开始计算 估值 信息 # "calculation_result": {
# 1 # 经济价值B1模块: EconomicValueB1Calculator | BasicValueB11Calculator | TrafficFactorB12Calculator | PolicyMultiplierB13Calculator # "model_value_b": 660.1534497474814,
# 1.1 EconomicValueB1Calculator # "market_value_c": 8800.0,
# input_data = { # "final_value_ab": 3102.107414823237
# # 基础价值B11相关参数 # }
# 'three_year_income': data.three_year_income, result_dict['calculation_result'] = calculation_result
# 'patent_score': data.pa, # 专利分
# 'popularity_score': data.popularity_score, # 普及地域分值
# 'infringement_score': data.infringement_score, # 侵权分
# 'innovation_ratio': data.innovation_ratio,
# 'esg_score':data.esg_score,
# 'industry_coefficient':data.industry_coefficient,
# # 流量因子B12相关参数 result_dict['calculation_input'] = {
# 'search_index_s1': 4500.0, 'model_data': {
# 'industry_average_s2': 5000.0, 'economic_data': list(input_data.get('model_data', {}).get('economic_data', {}).keys()),
# # 'social_media_spread_s3': social_media_spread_s3, 'cultural_data': list(input_data.get('model_data', {}).get('cultural_data', {}).keys()),
# 'likes': 4, # 点赞 'risky_data': list(input_data.get('model_data', {}).get('risky_data', {}).keys()),
# 'comments': 5, # 评论 },
# 'shares': 6, # 转发 'market_data': list(input_data.get('market_data', {}).keys()),
# 'followers': 7, # 粉丝数 }
# 'click_count': 1000,# 点击量 return Success(data=result_dict, msg="估值计算完成")
# 'view_count': 100, # 内容浏览量
# # 政策乘数B13相关参数
# 'policy_match_score': 10.0, # 政策匹配度
# 'implementation_stage': 10.0, # 实施阶段评分
# 'funding_support': 10.0 # 资金支持度
# }
# 1.2 BasicValueB11Calculator
# 1.3 TrafficFactorB12Calculator
# 1.4 PolicyMultiplierB13Calculator
# 2 # 文化价值B2模块: CulturalValueB2Calculator | LivingHeritageB21Calculator | PatternGeneB22Calculator
# 2.1 CulturalValueB2Calculator
# 2.2 LivingHeritageB21Calculator
# 2.3 PatternGeneB22Calculator
# 3 # 风险调整系数B3模块: RiskAdjustmentB3Calculator
# 3.1 RiskAdjustmentB3Calculator
# 4 # 市场估值C模块: MarketValueCCalculator | MarketBiddingC1Calculator | HeatCoefficientC2Calculator | ScarcityMultiplierC3Calculator | TemporalDecayC4Calculator
# 4.1 MarketValueCCalculator
# 4.2 MarketBiddingC1Calculator
# 4.3 HeatCoefficientC2Calculator
# 4.4 ScarcityMultiplierC3Calculator
# 4.5 TemporalDecayC4Calculator
# 5 # 最终估值A模块: FinalValueACalculator
# 5.1 FinalValueACalculator
return Success(data=result_dict, msg="估值评估申请提交成功")
except Exception as e: except Exception as e:
raise HTTPException( import traceback
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, print(traceback.format_exc())
detail=f"创建估值评估失败: {str(e)}" logger.error("valuation.calc_failed user_id={} err={}", user_id, repr(e))
)
# 计算失败时也创建记录状态设置为failed
try:
result = await user_valuation_controller.create_valuation(
user_id=user_id,
data=data,
calculation_result=None,
calculation_input=None,
drp_result=None,
status='rejected' # 计算失败设置为rejected状态
)
logger.info("valuation.failed_record_created user_id={} valuation_id={}", user_id, result.id)
except Exception as create_error:
logger.error("valuation.failed_to_create_record user_id={} err={}", user_id, repr(create_error))
raise HTTPException(status_code=400, detail=f"计算失败: {str(e)}")
async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str, Any]:
"""
从用户提交的数据中提取计算所需的参数
Args:
data: 用户提交的估值评估数据
Returns:
Dict: 计算所需的参数字典
"""
# 基础价值B11相关参数
# 财务价值所需数据 从近三年收益计算
three_year_income = data.three_year_income or [0, 0, 0]
# 法律强度L相关参数
# 普及地域分值 默认 7分
popularity_score = calculate_popularity_score(data.application_coverage)
# 创新投入比 = (研发费用/营收) * 100
try:
rd_investment = float(data.rd_investment or 0)
annual_revenue = float(data.annual_revenue or 1) # 避免除零
innovation_ratio = (rd_investment / annual_revenue) * 100 if annual_revenue > 0 else 0
except (ValueError, TypeError):
innovation_ratio = 0.0
# 流量因子B12相关参数
# 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API
baidu_index = 0
# 获取微信指数并计算近30天平均值
try:
wechat_index_response = universal_api.wx_index(data.asset_name)
wechat_index = wechat_index_calculator.process_wechat_index_response(wechat_index_response)
logger.info(f"资产 '{data.asset_name}' 的微信指数近30天平均值: {wechat_index}")
except Exception as e:
logger.error(f"获取微信指数失败: {e}")
wechat_index = 0
weibo_index = 0
search_index_s1 = calculate_search_index_s1(baidu_index, wechat_index, weibo_index) # 默认值实际应从API获取
# 行业均值S2 - 从数据库查询行业数据计算
from app.utils.industry_calculator import calculate_industry_average_s2
industry_average_s2 = await calculate_industry_average_s2(data.industry)
# 社交媒体传播度S3 - TODO 需要使用第三方API,click_count view_count 未找到对应参数
# likes: 点赞数(API获取)
# comments: 评论数(API获取)
# shares: 转发数(API获取)
# followers: 粉丝数
# click_count: 商品链接点击量(用户填写) sales_volume 使用 sales_volume 销售量
# view_count: 内容浏览量(用户填写) link_views
# 政策乘数B13相关参数
# 政策契合度评分P - 根据行业和资助情况计算
# 实施阶段 - 需要转换为对应的评分
implementation_stage_str = data.application_maturity or "成熟应用"
# 资金支持 - 需要转换为对应的评分
funding_support_str = data.funding_status or "无资助"
# 使用PolicyMultiplierB13Calculator来计算评分
from app.utils.calculation_engine.economic_value_b1.sub_formulas.policy_multiplier_b13 import \
PolicyMultiplierB13Calculator
policy_calculator = PolicyMultiplierB13Calculator()
implementation_stage = policy_calculator.calculate_implementation_stage_score(implementation_stage_str)
funding_support = policy_calculator.calculate_funding_support_score(funding_support_str)
return {
# 基础价值B11相关参数
'three_year_income': three_year_income,
'popularity_score': popularity_score,
'innovation_ratio': innovation_ratio,
# 流量因子B12相关参数
'search_index_s1': search_index_s1,
'industry_average_s2': industry_average_s2,
'social_media_spread_s3': 0.0,
# 这些社交数据暂未来源置为0后续接入API再填充
'likes': 0,
'comments': 0,
'shares': 0,
# followers 非当前计算用键,先移除避免干扰
# click_count 与 view_count 目前未参与计算,先移除
# 政策乘数B13相关参数
'implementation_stage': implementation_stage,
'funding_support': funding_support
}
# 获取 文化价值B2 相关参数
async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str, Any]:
"""
argrg:
data: 用户提交的估值评估数据
retus:
Dict: 计算所需的参数字典
"""
# 导入计算器来转换传承人等级
from app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21 import \
LivingHeritageB21Calculator
# 活态传承系数B21 县官参数
living_heritage_calculator = LivingHeritageB21Calculator()
inheritor_level = data.inheritor_level or "市级传承人" # 设置默认值
inheritor_level_coefficient = living_heritage_calculator.calculate_inheritor_level_coefficient(inheritor_level)
offline_sessions = int(data.offline_activities) # 线下传习次数
# 以下调用API douyin\bilibili\kuaishou
douyin_views = 0
kuaishou_views = 0
bilibili_views = 0
# 跨界合作深度 品牌联名0.3科技载体0.5国家外交礼品1.0
cross_border_depth = float(data.cooperation_depth)
# 纹样基因值B22相关参数
# 以下三项需由后续模型/服务计算;此处提供默认可计算占位
historical_inheritance = 0.8
structure_complexity = 0.75
normalized_entropy = 0.85
return {
"inheritor_level_coefficient": inheritor_level_coefficient,
"offline_sessions": offline_sessions,
"douyin_views": douyin_views,
"kuaishou_views": kuaishou_views,
"bilibili_views": bilibili_views,
"cross_border_depth": cross_border_depth,
"historical_inheritance": historical_inheritance,
"structure_complexity": structure_complexity,
"normalized_entropy": normalized_entropy,
}
# 获取 文化价值B2 相关参数
async def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]:
# 过去30天最高价格 过去30天最低价格 TODO 需要根据字样进行切分获取最高价和最低价 转换成 float 类型
price_fluctuation = [float(i) for i in data.price_fluctuation]
highest_price, lowest_price = max(price_fluctuation), min(price_fluctuation)
# lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取)
inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表
return {
"highest_price": highest_price,
"lowest_price": lowest_price,
"inheritor_ages": inheritor_ages,
}
# 获取 市场估值C 相关参数
async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str, Any]:
# 市场竞价C1 TODO 暂无
# transaction_data: 交易数据字典(API获取)
# manual_bids: 手动收集的竞价列表(用户填写)
# expert_valuations: 专家估值列表(系统配置)
transaction_data: Dict = None
manual_bids: List[float] = None
# TODO 需要客户确认 三个数值
expert_valuations = None
# 浏览热度分 TODO 需要先确定平台信息
daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位)
collection_count = 50 # 收藏数(默认占位)
# 稀缺性乘数C3 发行量
circulation = data.circulation or '限量'
# 时效性衰减C4 参数 用户选择距离最近一次市场活动(交易、报价、评估)的相距时间
recent_market_activity = data.last_market_activity
# 如果为空、None或"0",设置默认值
if not recent_market_activity or recent_market_activity == "0":
recent_market_activity = '2024-01-15'
return {
# 计算市场竞价C1
# C1 的实现接受 transaction_data={'weighted_average_price': x}
"weighted_average_price": transaction_data,
"manual_bids": manual_bids, # 手动收集的竞价列表 (用户填写)
"expert_valuations": expert_valuations, # 专家估值列表 (系统配置)
# 计算热度系数C2
"daily_browse_volume": daily_browse_volume, # 近7日日均浏览量 (API获取)
"collection_count": collection_count, # 收藏数
"issuance_level": circulation, # 默认 限量发行 计算稀缺性乘数C3
"recent_market_activity": recent_market_activity, # 默认 '近一月' 计算市场估值C
}
@app_valuations_router.get("/", summary="获取我的估值评估列表") @app_valuations_router.get("/", summary="获取我的估值评估列表")
async def get_my_valuations( async def get_my_valuations(
query: UserValuationQuery = Depends(), query: UserValuationQuery = Depends(),
current_user: AppUser = Depends(get_current_app_user) current_user: AppUser = Depends(get_current_app_user)
): ):
""" """
@ -126,8 +509,8 @@ async def get_my_valuations(
@app_valuations_router.get("/{valuation_id}", summary="获取估值评估详情") @app_valuations_router.get("/{valuation_id}", summary="获取估值评估详情")
async def get_valuation_detail( async def get_valuation_detail(
valuation_id: int, valuation_id: int,
current_user: AppUser = Depends(get_current_app_user) current_user: AppUser = Depends(get_current_app_user)
): ):
""" """
获取指定估值评估的详细信息 获取指定估值评估的详细信息
@ -137,13 +520,13 @@ async def get_valuation_detail(
user_id=current_user.id, user_id=current_user.id,
valuation_id=valuation_id valuation_id=valuation_id
) )
if not result: if not result:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, status_code=status.HTTP_404_NOT_FOUND,
detail="估值评估记录不存在" detail="估值评估记录不存在"
) )
# 使用model_dump_json()来正确序列化datetime然后解析为dict # 使用model_dump_json()来正确序列化datetime然后解析为dict
import json import json
result_dict = json.loads(result.model_dump_json()) result_dict = json.loads(result.model_dump_json())
@ -159,7 +542,7 @@ async def get_valuation_detail(
@app_valuations_router.get("/statistics/overview", summary="获取我的估值评估统计") @app_valuations_router.get("/statistics/overview", summary="获取我的估值评估统计")
async def get_my_valuation_statistics( async def get_my_valuation_statistics(
current_user: AppUser = Depends(get_current_app_user) current_user: AppUser = Depends(get_current_app_user)
): ):
""" """
获取当前用户的估值评估统计信息 获取当前用户的估值评估统计信息
@ -173,4 +556,4 @@ async def get_my_valuation_statistics(
raise HTTPException( raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取统计信息失败: {str(e)}" detail=f"获取统计信息失败: {str(e)}"
) )

View File

@ -0,0 +1,261 @@
{
'category': [
{
'type': 0,
'word': '全部'
},
{
'type': 7,
'word': '视频号'
},
{
'type': 2,
'word': '文章'
},
{
'type': 1,
'word': '公众号'
},
{
'type': 262208,
'word': '小程序'
},
{
'extra_kvs': [
{
'key': 'modelName',
'value': 'hunyuan-t1-turbos'
},
{
'key': 'modelType',
'value': 't1'
},
{
'key': 'fastModelType',
'value': 'fast'
}
],
'type': 53,
'word': 'AI搜索'
},
{
'type': 9,
'word': '直播'
},
{
'type': 1024,
'word': '读书'
},
{
'type': 512,
'word': '听一听'
},
{
'type': 384,
'word': '表情'
},
{
'type': 16777728,
'word': '百科'
},
{
'type': 16384,
'word': '新闻'
},
{
'selected': 1,
'type': 8192,
'word': '微信指数'
},
{
'type': 33562630,
'word': '划线'
},
{
'type': 8,
'word': '朋友圈'
}
],
'continueFlag': 0,
'cookies': '{"box_offset":0,"businessType":8192,"cookies_buffer":"UisIYhABGIBAIgzotYTkuqflkI3np7BQAYIBBRAAogEAsgECGACaAwRCAHIA","doc_offset":0,"dup_bf":"","isHomepage":0,"page_cnt":1,"query":"资产名称","scene":98}\n',
'data': [
{
'boxID': '0x40000-15048-',
'boxPos': 1,
'boxPosMerge': 1,
'count': 1,
'items': [
{
'docID': '1-7123206500956315942',
'itemInfo': {
'data': [
{
'index': '113',
'timestamp': 1757433600
},
{
'index': '51',
'timestamp': 1757520000
},
{
'index': '112',
'timestamp': 1757606400
},
{
'index': '26',
'timestamp': 1757692800
},
{
'index': '24',
'timestamp': 1757779200
},
{
'index': '118',
'timestamp': 1757865600
},
{
'index': '46',
'timestamp': 1757952000
},
{
'index': '90',
'timestamp': 1758038400
},
{
'index': '200',
'timestamp': 1758124800
},
{
'index': '70',
'timestamp': 1758211200
},
{
'index': '23',
'timestamp': 1758297600
},
{
'index': '1',
'timestamp': 1758384000
},
{
'index': '44',
'timestamp': 1758470400
},
{
'index': '93',
'timestamp': 1758556800
},
{
'index': '91',
'timestamp': 1758643200
},
{
'index': '6',
'timestamp': 1758729600
},
{
'index': '46',
'timestamp': 1758816000
},
{
'index': '1',
'timestamp': 1758902400
},
{
'index': '66',
'timestamp': 1758988800
},
{
'index': '45',
'timestamp': 1759075200
},
{
'index': '23',
'timestamp': 1759161600
},
{
'index': '22',
'timestamp': 1759248000
},
{
'index': '0',
'timestamp': 1759334400
},
{
'index': '0',
'timestamp': 1759420800
},
{
'index': '22',
'timestamp': 1759507200
},
{
'index': '22',
'timestamp': 1759593600
},
{
'index': '23',
'timestamp': 1759680000
},
{
'index': '1',
'timestamp': 1759766400
},
{
'index': '0',
'timestamp': 1759852800
},
{
'index': '24',
'timestamp': 1759939200
}
],
'title': '资产名称 - 微信指数',
'type': 1058
},
'jumpInfo': {
'jumpType': 2,
'reportId': 'chart:chart:317451',
'userName': 'gh_935b85261f35@app',
'weappUrl': '/page/detail/detail?calculate_word=资产名称'
},
'source': {
'iconUrl': 'http://mmbiz.qpic.cn/mmbiz_png/uic3vZ5pI29sdd0tUo5t4pxmJMsMO9iaxM8EOxTX1VUAWjQibib0juc81kMkUgtB9IibIg58TZocULicPJ7lichq8E1VQ/640?wx_fmt=png&wxfrom=200',
'title': '微信指数'
},
'title': ''
}
],
'real_type': 262144,
'subType': 15048,
'type': 52
}
],
'direction': 2,
'enableQuestionClosely': False,
'experiment': [
{
'key': 's1s_all_prefetch_next_page',
'value': '2'
},
{
'key': 'mmsearch_exp_financial_style',
'value': '2'
}
],
'isAutoPlayVideo': 1,
'isDivide': 0,
'isHomePage': 0,
'lang': 'zh_CN',
'nomoreText': '没有更多的搜索结果',
'offset': 20,
'pageNumber': 1,
'query': '资产名称',
'resultType': 0,
'ret': 0,
'searchID': '1448000677067408154',
'timeStamp': 1760081917,
'code': 0,
'cost_money': 0.5,
'remain_money': 28.5
}

View File

@ -2,7 +2,7 @@ from fastapi import APIRouter, Depends, HTTPException, status
from typing import Optional, List, Dict, Any from typing import Optional, List, Dict, Any
import json import json
import asyncio import asyncio
import time import time,random
from app.controllers.user_valuation import user_valuation_controller from app.controllers.user_valuation import user_valuation_controller
from app.schemas.valuation import ( from app.schemas.valuation import (
@ -28,6 +28,7 @@ from app.models.esg import ESG
from app.models.industry import Industry from app.models.industry import Industry
from app.models.policy import Policy from app.models.policy import Policy
from app.utils.universal_api_manager import universal_api from app.utils.universal_api_manager import universal_api
from app.utils.wechat_index_calculator import wechat_index_calculator
app_valuations_router = APIRouter(tags=["用户端估值评估"]) app_valuations_router = APIRouter(tags=["用户端估值评估"])
@ -146,7 +147,6 @@ async def calculate_valuation(
data_list = data_list if isinstance(data_list, list) else [] data_list = data_list if isinstance(data_list, list) else []
# 验证 专利剩余年限 # 验证 专利剩余年限
# TODO 无法验证 专利剩余保护期>10年(10分)5-10年(7分)<5年(3分) # TODO 无法验证 专利剩余保护期>10年(10分)5-10年(7分)<5年(3分)
# 发展潜力D相关参数 专利数量 # 发展潜力D相关参数 专利数量
# 查询匹配申请号的记录集合 # 查询匹配申请号的记录集合
matched = [item for item in data_list if isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)] matched = [item for item in data_list if isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)]
@ -191,7 +191,9 @@ async def calculate_valuation(
monthly_amount (float): 月交易额万元 monthly_amount (float): 月交易额万元
heritage_level (str): 非遗等级 heritage_level (str): 非遗等级
''' '''
drp_result = drp_c.calculate_dynamic_pledge_rate(float(data.monthly_transaction_amount),data.heritage_asset_level) # 解析月交易额字符串为数值
monthly_amount = drp_c.parse_monthly_transaction_amount(data.monthly_transaction_amount or "")
drp_result = drp_c.calculate_dynamic_pledge_rate(monthly_amount, data.heritage_asset_level)
# 结构化日志:关键分值 # 结构化日志:关键分值
try: try:
@ -209,7 +211,17 @@ async def calculate_valuation(
# 创建估值评估记录 # 创建估值评估记录
result = await user_valuation_controller.create_valuation( result = await user_valuation_controller.create_valuation(
user_id=user_id, user_id=user_id,
data=data data=data,
calculation_result=calculation_result,
calculation_input={
'model_data': {
'economic_data': list(input_data.get('model_data', {}).get('economic_data', {}).keys()),
'cultural_data': list(input_data.get('model_data', {}).get('cultural_data', {}).keys()),
'risky_data': list(input_data.get('model_data', {}).get('risky_data', {}).keys()),
},
'market_data': list(input_data.get('market_data', {}).keys()),
},
drp_result=drp_result
) )
# 组装返回 # 组装返回
@ -250,7 +262,7 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
# 普及地域分值 默认 7分 # 普及地域分值 默认 7分
popularity_score = calculate_popularity_score(data.application_coverage) popularity_score = calculate_popularity_score(data.application_coverage)
# 侵权分 默认 6 TODO 需要使用第三方API 无侵权记录(10分),历史侵权已解决(6分),现存纠纷(2分) # 侵权分 默认 6 TODO 需要使用第三方API 无侵权记录(10分),历史侵权已解决(6分),现存纠纷(2分)
infringement_score = calculate_infringement_score("无侵权信息") infringement_score = calculate_infringement_score("无侵权信息")
infringement_score = 0 infringement_score = 0
@ -265,9 +277,9 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
# 流量因子B12相关参数 # 流量因子B12相关参数
# 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API # 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API
baidu_index = "暂无" baidu_index = 0.0
wechat_index = universal_api.wx_index(data.asset_name) # 通过资产信息获取微信指数 TODO 这里返回的没确认指数参数,有可能返回的图示是指数信息 wechat_index = wechat_index_calculator.process_wechat_index_response(universal_api.wx_index(data.asset_name)) # 通过资产信息获取微信指数 TODO 这里返回的没确认指数参数,有可能返回的图示是指数信息
weibo_index = "暂无" weibo_index = 0.0
search_index_s1 = calculate_search_index_s1(baidu_index,wechat_index,weibo_index) # 默认值实际应从API获取 search_index_s1 = calculate_search_index_s1(baidu_index,wechat_index,weibo_index) # 默认值实际应从API获取
# 行业均值S2 TODO 系统内置 未找到相关内容 # 行业均值S2 TODO 系统内置 未找到相关内容
industry_average_s2 = 0.0 industry_average_s2 = 0.0
@ -319,8 +331,14 @@ async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str,
retus: retus:
Dict: 计算所需的参数字典 Dict: 计算所需的参数字典
""" """
# 导入计算器来转换传承人等级
from app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21 import LivingHeritageB21Calculator
# 活态传承系数B21 县官参数 # 活态传承系数B21 县官参数
inheritor_level_coefficient = data.inheritor_level # 传承人等级 living_heritage_calculator = LivingHeritageB21Calculator()
inheritor_level = data.inheritor_level or "市级传承人" # 设置默认值
inheritor_level_coefficient = living_heritage_calculator.calculate_inheritor_level_coefficient(inheritor_level)
offline_sessions = int(data.offline_activities) #线下传习次数 offline_sessions = int(data.offline_activities) #线下传习次数
# 以下调用API douyin\bilibili\kuaishou # 以下调用API douyin\bilibili\kuaishou
douyin_views = 0 douyin_views = 0
@ -349,8 +367,9 @@ async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str,
# 获取 文化价值B2 相关参数 # 获取 文化价值B2 相关参数
async def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]: async def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]:
# 过去30天最高价格 过去30天最低价格 TODO 需要根据字样进行切分获取最高价和最低价 转换成 float 类型 # 过去30天最高价格 过去30天最低价格
highest_price,lowest_price= data.price_fluctuation price_fluctuation = [float(i) for i in data.price_fluctuation ]
highest_price,lowest_price= max(price_fluctuation), min(price_fluctuation)
lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取) lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取)
inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表 inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表
return { return {
@ -369,7 +388,7 @@ async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str,
# expert_valuations: 专家估值列表(系统配置) # expert_valuations: 专家估值列表(系统配置)
transaction_data: Dict = None transaction_data: Dict = None
manual_bids: List[float] = None manual_bids: List[float] = None
expert_valuations: List[float] = None expert_valuations = [random.uniform(0, 1) for _ in range(3)]
# 浏览热度分 TODO 需要先确定平台信息 # 浏览热度分 TODO 需要先确定平台信息
daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位) daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位)
collection_count = 50 # 收藏数(默认占位) collection_count = 50 # 收藏数(默认占位)
@ -377,7 +396,10 @@ async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str,
circulation = data.circulation or '限量' circulation = data.circulation or '限量'
# 时效性衰减C4 参数 用户选择距离最近一次市场活动(交易、报价、评估)的相距时间 # 时效性衰减C4 参数 用户选择距离最近一次市场活动(交易、报价、评估)的相距时间
recent_market_activity = data.last_market_activity or '2024-01-15' recent_market_activity = data.last_market_activity
# 如果为空、None或"0",设置默认值
if not recent_market_activity or recent_market_activity == "0":
recent_market_activity = '2024-01-15'
return { return {
# 计算市场竞价C1 # 计算市场竞价C1
# C1 的实现接受 transaction_data={'weighted_average_price': x} # C1 的实现接受 transaction_data={'weighted_average_price': x}

View File

@ -18,11 +18,24 @@ class UserValuationController:
def __init__(self): def __init__(self):
self.model = ValuationAssessment self.model = ValuationAssessment
async def create_valuation(self, user_id: int, data: UserValuationCreate) -> UserValuationDetail: async def create_valuation(self, user_id: int, data: UserValuationCreate, calculation_result: dict = None, calculation_input: dict = None, drp_result: float = None, status: str = 'pending') -> UserValuationDetail:
"""用户创建估值评估""" """用户创建估值评估"""
valuation_data = data.model_dump() valuation_data = data.model_dump()
valuation_data['user_id'] = user_id valuation_data['user_id'] = user_id
valuation_data['status'] = 'pending' # 默认状态为待审核 valuation_data['status'] = status # 根据计算结果显示设置状态
# 添加计算结果到数据库
if calculation_result:
valuation_data['model_value_b'] = calculation_result.get('model_value_b')
valuation_data['market_value_c'] = calculation_result.get('market_value_c')
valuation_data['final_value_ab'] = calculation_result.get('final_value_ab')
valuation_data['calculation_result'] = calculation_result
if calculation_input:
valuation_data['calculation_input'] = calculation_input
if drp_result is not None:
valuation_data['dynamic_pledge_rate'] = drp_result
valuation = await self.model.create(**valuation_data) valuation = await self.model.create(**valuation_data)
return await self._to_user_detail(valuation) return await self._to_user_detail(valuation)
@ -141,6 +154,13 @@ class UserValuationController:
legal_risk=valuation.legal_risk, legal_risk=valuation.legal_risk,
base_pledge_rate=valuation.base_pledge_rate, base_pledge_rate=valuation.base_pledge_rate,
flow_correction=valuation.flow_correction, flow_correction=valuation.flow_correction,
# 添加计算结果字段
model_value_b=valuation.model_value_b,
market_value_c=valuation.market_value_c,
final_value_ab=valuation.final_value_ab,
dynamic_pledge_rate=valuation.dynamic_pledge_rate,
calculation_result=valuation.calculation_result,
calculation_input=valuation.calculation_input,
status=valuation.status, status=valuation.status,
admin_notes=valuation.admin_notes, admin_notes=valuation.admin_notes,
created_at=valuation.created_at, created_at=valuation.created_at,

View File

@ -68,6 +68,14 @@ class ValuationAssessment(Model):
base_pledge_rate = fields.CharField(max_length=50, null=True, description="基础质押率") base_pledge_rate = fields.CharField(max_length=50, null=True, description="基础质押率")
flow_correction = fields.CharField(max_length=50, null=True, description="流量修正系数") flow_correction = fields.CharField(max_length=50, null=True, description="流量修正系数")
# 计算结果字段
model_value_b = fields.FloatField(null=True, description="模型估值B万元")
market_value_c = fields.FloatField(null=True, description="市场估值C万元")
final_value_ab = fields.FloatField(null=True, description="最终估值AB万元")
dynamic_pledge_rate = fields.FloatField(null=True, description="动态质押率")
calculation_result = fields.JSONField(null=True, description="完整计算结果JSON")
calculation_input = fields.JSONField(null=True, description="计算输入参数JSON")
# 系统字段 # 系统字段
user = fields.ForeignKeyField("models.AppUser", related_name="valuations", description="提交用户") user = fields.ForeignKeyField("models.AppUser", related_name="valuations", description="提交用户")
status = fields.CharField(max_length=20, default="pending", description="评估状态: pending(待审核), approved(已通过), rejected(已拒绝)") status = fields.CharField(max_length=20, default="pending", description="评估状态: pending(待审核), approved(已通过), rejected(已拒绝)")

View File

@ -65,6 +65,14 @@ class ValuationAssessmentBase(BaseModel):
legal_risk: Optional[str] = Field(None, description="法律风险-侵权诉讼历史") legal_risk: Optional[str] = Field(None, description="法律风险-侵权诉讼历史")
base_pledge_rate: Optional[str] = Field(None, description="基础质押率") base_pledge_rate: Optional[str] = Field(None, description="基础质押率")
flow_correction: Optional[str] = Field(None, description="流量修正系数") flow_correction: Optional[str] = Field(None, description="流量修正系数")
# 计算结果字段
model_value_b: Optional[float] = Field(None, description="模型估值B万元")
market_value_c: Optional[float] = Field(None, description="市场估值C万元")
final_value_ab: Optional[float] = Field(None, description="最终估值AB万元")
dynamic_pledge_rate: Optional[float] = Field(None, description="动态质押率")
calculation_result: Optional[Dict[str, Any]] = Field(None, description="完整计算结果JSON")
calculation_input: Optional[Dict[str, Any]] = Field(None, description="计算输入参数JSON")
class ValuationAssessmentCreate(ValuationAssessmentBase): class ValuationAssessmentCreate(ValuationAssessmentBase):

View File

@ -18,9 +18,9 @@ class Settings(BaseSettings):
DEBUG: bool = True DEBUG: bool = True
# 服务器配置 # 服务器配置
SERVER_HOST: str = "127.0.0.1" SERVER_HOST: str = "124.222.245.240"
SERVER_PORT: int = 9999 SERVER_PORT: int = 9999
BASE_URL: str = f"http://{SERVER_HOST}:{SERVER_PORT}" BASE_URL: str = f"http://{SERVER_HOST}:8080"
PROJECT_ROOT: str = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) PROJECT_ROOT: str = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
BASE_DIR: str = os.path.abspath(os.path.join(PROJECT_ROOT, os.pardir)) BASE_DIR: str = os.path.abspath(os.path.join(PROJECT_ROOT, os.pardir))

View File

@ -48,7 +48,7 @@ class APIConfig:
"optional_params": ["sign", "searchType"] "optional_params": ["sign", "searchType"]
}, },
"judgement": { "judgement": {
"APIKey": os.getenv("CHINAZ_JUDGEMENT_API_KEY", "YOUR_API_KEY"), "APIKey": os.getenv("CHINAZ_JUDGEMENT_API_KEY", "apiuser_quantity_11970d0811cda56260af2e16f1803a8f_b3966d7a0c6e46f2a57fea19f56945db"),
"path": "/v1/1036/judgementdetailv4", "path": "/v1/1036/judgementdetailv4",
"method": "POST", "method": "POST",
"description": "司法综合数据查询", "description": "司法综合数据查询",
@ -111,31 +111,22 @@ class APIConfig:
"path": "/api/douyin/get-video-detail/v2", "path": "/api/douyin/get-video-detail/v2",
"method": "GET", "method": "GET",
"description": "该接口用于获取指定抖音视频的详细信息,包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等", "description": "该接口用于获取指定抖音视频的详细信息,包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等",
"required_params": ["token", "videoId"], "required_params": ["videoId"],
"optional_params": [] "optional_params": []
}, },
"kuaishou_video_detail": { "kuaishou_video_detail": {
"path": "/api/kuaishou/get-video-detail/v2", "path": "/api/kuaishou/get-video-detail/v2",
"method": "GET", "method": "GET",
"description": "该接口用于获取指定快手视频的详细信息,包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等", "description": "该接口用于获取指定快手视频的详细信息,包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等",
"required_params": ["token", "videoId"], "required_params": ["videoId"],
"optional_params": [] "optional_params": []
}, },
"bilibili_video_detail": { "bilibili_video_detail": {
"path": "/api/bilibili/get-video-detail/v2", "path": "/api/bilibili/get-video-detail/v2",
"method": "GET", "method": "GET",
"description": "该接口用于获取指定B站视频的详细信息包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等", "description": "该接口用于获取指定B站视频的详细信息包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等",
"required_params": ["token", "bvid"], "required_params": [ "bvid"],
"optional_params": [] "optional_params": []
},
"jizhiliao_index_search": {
"path": "/fbmain/monitor/v3/web_search",
"method": "POST",
"description": "极致聊指数搜索",
"api_key": os.getenv("JIZHILIAO_API_KEY", "JZL089ef0b7d0315d96"),
"verifycode": os.getenv("JIZHILIAO_VERIFYCODE", ""),
"required_params": ["keyword", "mode", "BusinessType", "sub_search_type"],
"optional_params": ["key", "verifycode"]
} }
} }
}, },
@ -148,9 +139,11 @@ class APIConfig:
"web_search": { "web_search": {
"path": "/fbmain/monitor/v3/web_search", "path": "/fbmain/monitor/v3/web_search",
"method": "POST", "method": "POST",
"description": "获取微信指数", "description": "极致聊指数搜索",
"required_params": ["keyword", "key"], "api_key": os.getenv("JIZHILIAO_API_KEY", "JZL089ef0b7d0315d96"),
"optional_params": [] "verifycode": os.getenv("JIZHILIAO_VERIFYCODE", ""),
"required_params": ["keyword", "mode", "BusinessType", "sub_search_type"],
"optional_params": ["key", "verifycode"]
} }
} }
}, },

View File

@ -66,6 +66,47 @@ class DynamicPledgeRateCalculator:
else: else:
return 0.0 return 0.0
def parse_monthly_transaction_amount(self, monthly_transaction_str: str) -> float:
"""
解析月交易额字符串转换为数值万元
参数:
monthly_transaction_str (str): 月交易额字符串"月交易额<100万元""100-500万元"
返回:
float: 月交易额数值万元
"""
if not monthly_transaction_str:
return 0.0
# 清理字符串,移除空格和常见前缀
clean_str = monthly_transaction_str.strip().replace("月交易额", "").replace("万元", "")
# 处理不同的格式
if "<100" in clean_str or "小于100" in clean_str:
return 50.0 # 取中间值
elif "100-500" in clean_str or "100万-500万" in clean_str:
return 300.0 # 取中间值
elif ">500" in clean_str or "大于500" in clean_str or "≥500" in clean_str:
return 600.0 # 取一个合理值
elif "500-1000" in clean_str:
return 750.0 # 取中间值
elif ">1000" in clean_str or "大于1000" in clean_str:
return 1200.0 # 取一个合理值
else:
# 尝试提取数字
import re
numbers = re.findall(r'\d+', clean_str)
if numbers:
# 如果有多个数字,取平均值
if len(numbers) >= 2:
return (float(numbers[0]) + float(numbers[1])) / 2
else:
return float(numbers[0])
else:
# 默认值
return 50.0
def get_monthly_transaction_score(self, monthly_amount: float) -> float: def get_monthly_transaction_score(self, monthly_amount: float) -> float:
""" """
根据用户月交易额区间匹配评分 根据用户月交易额区间匹配评分

View File

@ -28,10 +28,10 @@ class TrafficFactorB12Calculator:
float: 流量因子B12 float: 流量因子B12
""" """
# 避免除零和对数计算错误 # 避免除零和对数计算错误
if industry_average_s2 <= 0: if industry_average_s2 == 0:
raise ValueError("行业均值S2必须大于0") raise ValueError("行业均值S2必须大于0")
if search_index_s1 <= 0: if search_index_s1 == 0:
# 如果搜索指数为0或负数使用最小值避免对数计算错误 # 如果搜索指数为0或负数使用最小值避免对数计算错误
search_index_s1 = 1.0 search_index_s1 = 1.0

View File

@ -45,6 +45,11 @@ class TemporalDecayC4Calculator:
# 处理输入日期 # 处理输入日期
if isinstance(target_date, str): if isinstance(target_date, str):
# 如果是数字字符串"0"或空字符串,使用默认日期
if target_date == "0" or target_date.strip() == "":
# 默认为一年前,返回"其他"
return "其他"
# 尝试不同的日期格式 # 尝试不同的日期格式
formats = ['%Y-%m-%d', '%Y/%m/%d', '%Y%m%d'] formats = ['%Y-%m-%d', '%Y/%m/%d', '%Y%m%d']
date_obj = None date_obj = None
@ -55,9 +60,15 @@ class TemporalDecayC4Calculator:
except ValueError: except ValueError:
continue continue
if date_obj is None: if date_obj is None:
raise ValueError(f"无法解析日期: {target_date}") # 如果无法解析日期,记录警告并返回默认值
print(f"警告: 无法解析日期 '{target_date}',使用默认值'其他'")
return "其他"
elif isinstance(target_date, datetime): elif isinstance(target_date, datetime):
date_obj = target_date.date() date_obj = target_date.date()
elif isinstance(target_date, (int, float)):
# 如果传入的是数字如0使用默认值
print(f"警告: 传入的日期参数是数字 {target_date},使用默认值'其他'")
return "其他"
else: else:
date_obj = target_date date_obj = target_date

View File

@ -0,0 +1,100 @@
"""
行业数据查询和计算工具
"""
import logging
from typing import Optional, Dict, Any
from app.models.industry import Industry
logger = logging.getLogger(__name__)
async def get_industry_data_by_name(industry_name: str) -> Optional[Dict[str, Any]]:
"""
根据行业名称查询行业数据
Args:
industry_name: 行业名称
Returns:
包含行业数据的字典如果未找到则返回None
{
'code': '行业代码',
'name': '行业名称',
'roe': 平均ROE,
'fix_num': 行业修正系数,
'remark': '备注'
}
"""
try:
industry = await Industry.filter(name=industry_name).first()
if industry:
return {
'code': industry.code,
'name': industry.name,
'roe': industry.roe,
'fix_num': industry.fix_num,
'remark': industry.remark
}
else:
logger.warning(f"未找到行业数据: {industry_name}")
return None
except Exception as e:
logger.error(f"查询行业数据失败: {industry_name}, 错误: {str(e)}")
return None
async def calculate_industry_average_s2(industry_name: str) -> float:
"""
计算行业均值S2
Args:
industry_name: 行业名称
Returns:
行业均值S2如果查询失败则返回0.0
"""
try:
industry_data = await get_industry_data_by_name(industry_name)
if industry_data:
# S2 = ROE * 修正系数
roe = industry_data.get('roe', 0.0)
fix_num = industry_data.get('fix_num', 0.0)
s2_value = roe * fix_num
# 确保S2值为正数避免对数计算错误
if s2_value <= 0:
logger.warning(f"行业 {industry_name} S2计算值为负数或零: ROE={roe}, 修正系数={fix_num}, S2={s2_value}使用默认值0.01")
s2_value = 0.01 # 使用小的正数避免对数计算错误
logger.info(f"行业 {industry_name} S2计算: ROE={roe}, 修正系数={fix_num}, S2={s2_value}")
return s2_value
else:
logger.warning(f"未找到行业 {industry_name} 的数据返回默认值0.01")
return 0.01 # 返回小的正数而不是0.0
except Exception as e:
logger.error(f"计算行业均值S2失败: {industry_name}, 错误: {str(e)}")
return 0.01 # 返回小的正数而不是0.0
async def get_all_industries() -> list:
"""
获取所有行业列表
Returns:
行业列表
"""
try:
industries = await Industry.all()
return [
{
'code': industry.code,
'name': industry.name,
'roe': industry.roe,
'fix_num': industry.fix_num,
'remark': industry.remark
}
for industry in industries
]
except Exception as e:
logger.error(f"获取行业列表失败: {str(e)}")
return []

View File

@ -14,8 +14,10 @@ from typing import Dict, Any, Optional, Union
import json import json
import time import time
import urllib.parse import urllib.parse
from .api_config import api_config from .api_config import api_config
# 配置日志 # 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -23,7 +25,7 @@ logger = logging.getLogger(__name__)
class UniversalAPIManager: class UniversalAPIManager:
"""通用第三方API管理器""" """通用第三方API管理器"""
def __init__(self): def __init__(self):
"""初始化API管理器""" """初始化API管理器"""
self.config = api_config self.config = api_config
@ -33,36 +35,37 @@ class UniversalAPIManager:
'Accept': 'application/json', 'Accept': 'application/json',
'Content-Type': 'application/json' 'Content-Type': 'application/json'
}) })
def _get_provider_config(self, provider: str) -> Dict[str, Any]: def _get_provider_config(self, provider: str) -> Dict[str, Any]:
"""获取API提供商配置""" """获取API提供商配置"""
config = self.config.get_api_config(provider) config = self.config.get_api_config(provider)
if not config: if not config:
raise ValueError(f"未找到API提供商配置: {provider}") raise ValueError(f"未找到API提供商配置: {provider}")
return config return config
def _get_endpoint_config(self, provider: str, endpoint: str) -> Optional[Dict[str, Any]]: def _get_endpoint_config(self, provider: str, endpoint: str) -> Optional[Dict[str, Any]]:
"""获取端点配置""" """获取端点配置"""
provider_config = self.config.get_api_config(provider) provider_config = self.config.get_api_config(provider)
if not provider_config or 'endpoints' not in provider_config: if not provider_config or 'endpoints' not in provider_config:
return None return None
return provider_config['endpoints'].get(endpoint) return provider_config['endpoints'].get(endpoint)
def _get_api_key(self, provider: str, endpoint: str) -> Optional[str]: def _get_api_key(self, provider: str, endpoint: str) -> Optional[str]:
"""获取API密钥""" """获取API密钥"""
endpoint_config = self._get_endpoint_config(provider, endpoint) endpoint_config = self._get_endpoint_config(provider, endpoint)
if not endpoint_config: if not endpoint_config:
return None return None
# 优先从端点配置中获取APIKey # 优先从端点配置中获取APIKey
if 'APIKey' in endpoint_config: if 'APIKey' in endpoint_config:
return endpoint_config['APIKey'] return endpoint_config['APIKey']
# 如果端点配置中没有则从提供商配置中获取api_key # 如果端点配置中没有则从提供商配置中获取api_key
provider_config = self._get_provider_config(provider) provider_config = self._get_provider_config(provider)
return provider_config.get('api_key') return provider_config.get('api_key')
def make_request(self, provider: str, endpoint: str, params: Dict[str, Any], timeout: Optional[int] = None) -> Dict[str, Any]: def make_request(self, provider: str, endpoint: str, params: Dict[str, Any], timeout: Optional[int] = None) -> Dict[
str, Any]:
""" """
发送API请求 发送API请求
@ -79,21 +82,21 @@ class UniversalAPIManager:
endpoint_config = self._get_endpoint_config(provider, endpoint) endpoint_config = self._get_endpoint_config(provider, endpoint)
if not endpoint_config: if not endpoint_config:
raise ValueError(f"未找到API配置: {provider}/{endpoint}") raise ValueError(f"未找到API配置: {provider}/{endpoint}")
# 获取API密钥 # 获取API密钥
api_key = self._get_api_key(provider, endpoint) api_key = self._get_api_key(provider, endpoint)
if not api_key: if not api_key:
raise ValueError(f"未找到API密钥: {provider}/{endpoint}") raise ValueError(f"未找到API密钥: {provider}/{endpoint}")
# 获取基础URL # 获取基础URL
provider_config = self.config.get_api_config(provider) provider_config = self.config.get_api_config(provider)
base_url = provider_config.get('base_url') base_url = provider_config.get('base_url')
if not base_url: if not base_url:
raise ValueError(f"未找到基础URL: {provider}") raise ValueError(f"未找到基础URL: {provider}")
# 构建完整URL # 构建完整URL
url = f"{base_url.rstrip('/')}{endpoint_config['path']}" url = f"{base_url.rstrip('/')}{endpoint_config['path']}"
# 添加API密钥到参数中 # 添加API密钥到参数中
params['APIKey'] = api_key params['APIKey'] = api_key
print(params) print(params)
@ -111,7 +114,7 @@ class UniversalAPIManager:
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"API请求失败: {str(e)}") logger.error(f"API请求失败: {str(e)}")
raise raise
def _generate_chinaz_sign(self, date: datetime = None) -> str: def _generate_chinaz_sign(self, date: datetime = None) -> str:
""" """
生成站长之家API签名 生成站长之家API签名
@ -124,18 +127,18 @@ class UniversalAPIManager:
""" """
if date is None: if date is None:
date = datetime.now() date = datetime.now()
# 格式化日期为YYYYMMDD # 格式化日期为YYYYMMDD
date_str = date.strftime("%Y%m%d") date_str = date.strftime("%Y%m%d")
# 构建签名字符串 # 构建签名字符串
sign_str = f"634xz{date_str}" sign_str = f"634xz{date_str}"
# 生成MD5哈希 # 生成MD5哈希
md5_hash = hashlib.md5(sign_str.encode('utf-8')).hexdigest() md5_hash = hashlib.md5(sign_str.encode('utf-8')).hexdigest()
return md5_hash return md5_hash
def _build_url(self, provider: str, endpoint: str, params: Dict[str, Any]) -> str: def _build_url(self, provider: str, endpoint: str, params: Dict[str, Any]) -> str:
""" """
构建完整的API请求URL 构建完整的API请求URL
@ -150,14 +153,14 @@ class UniversalAPIManager:
""" """
provider_config = self._get_provider_config(provider) provider_config = self._get_provider_config(provider)
endpoint_config = self._get_endpoint_config(provider, endpoint) endpoint_config = self._get_endpoint_config(provider, endpoint)
base_url = provider_config['base_url'] base_url = provider_config['base_url']
path = endpoint_config['path'] path = endpoint_config['path']
method = endpoint_config['method'] method = endpoint_config['method']
# 构建完整URL # 构建完整URL
full_url = f"{base_url}{path}" full_url = f"{base_url}{path}"
# 处理参数 # 处理参数
if method.upper() == 'GET' and params: if method.upper() == 'GET' and params:
# 对于GET请求将参数添加到URL中 # 对于GET请求将参数添加到URL中
@ -170,13 +173,13 @@ class UniversalAPIManager:
url_params['APIKey'] = params['APIKey'] url_params['APIKey'] = params['APIKey']
if 'ChinazVer' in params: if 'ChinazVer' in params:
url_params['ChinazVer'] = params['ChinazVer'] url_params['ChinazVer'] = params['ChinazVer']
if url_params: if url_params:
query_string = urllib.parse.urlencode(url_params) query_string = urllib.parse.urlencode(url_params)
full_url = f"{full_url}?{query_string}" full_url = f"{full_url}?{query_string}"
return full_url return full_url
def _validate_params(self, provider: str, endpoint: str, params: Dict[str, Any]) -> None: def _validate_params(self, provider: str, endpoint: str, params: Dict[str, Any]) -> None:
""" """
验证请求参数 验证请求参数
@ -191,7 +194,7 @@ class UniversalAPIManager:
""" """
endpoint_config = self._get_endpoint_config(provider, endpoint) endpoint_config = self._get_endpoint_config(provider, endpoint)
required_params = endpoint_config.get('required_params', []) required_params = endpoint_config.get('required_params', [])
missing_params = [] missing_params = []
for param in required_params: for param in required_params:
# 如果参数在端点配置中已经定义如APIKey则跳过验证 # 如果参数在端点配置中已经定义如APIKey则跳过验证
@ -199,10 +202,10 @@ class UniversalAPIManager:
continue continue
if param not in params or params[param] is None: if param not in params or params[param] is None:
missing_params.append(param) missing_params.append(param)
if missing_params: if missing_params:
raise ValueError(f"缺少必需参数: {', '.join(missing_params)}") raise ValueError(f"缺少必需参数: {', '.join(missing_params)}")
def _prepare_params(self, provider: str, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]: def _prepare_params(self, provider: str, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
""" """
准备请求参数根据不同的API类型添加必要的参数 准备请求参数根据不同的API类型添加必要的参数
@ -218,17 +221,17 @@ class UniversalAPIManager:
provider_config = self._get_provider_config(provider) provider_config = self._get_provider_config(provider)
endpoint_config = self._get_endpoint_config(provider, endpoint) endpoint_config = self._get_endpoint_config(provider, endpoint)
prepared_params = params.copy() prepared_params = params.copy()
# 根据不同的API提供商添加特定参数 # 根据不同的API提供商添加特定参数
if provider == 'chinaz': if provider == 'chinaz':
# 站长之家API需要签名 # 站长之家API需要签名
if 'sign' not in prepared_params: if 'sign' not in prepared_params:
prepared_params['sign'] = self._generate_chinaz_sign() prepared_params['sign'] = self._generate_chinaz_sign()
# 添加APIKey参数 # 添加APIKey参数
if 'APIKey' not in prepared_params and endpoint_config and 'APIKey' in endpoint_config: if 'APIKey' not in prepared_params and endpoint_config and 'APIKey' in endpoint_config:
prepared_params['APIKey'] = endpoint_config['APIKey'] prepared_params['APIKey'] = endpoint_config['APIKey']
elif provider == 'xiaohongshu': elif provider == 'xiaohongshu':
# 小红书接口使用 JustOneAPI 平台,需要 token 参数 # 小红书接口使用 JustOneAPI 平台,需要 token 参数
if 'token' not in prepared_params or not prepared_params['token']: if 'token' not in prepared_params or not prepared_params['token']:
@ -237,11 +240,12 @@ class UniversalAPIManager:
prepared_params['token'] = api_key prepared_params['token'] = api_key
elif provider == 'dajiala': elif provider == 'dajiala':
# 微信指数 # 微信指数/极致聊接口需要 key 参数
# JustOneAPI需要token参数 endpoint_config = self._get_endpoint_config(provider, endpoint)
api_key = provider_config.get('api_key') if endpoint_config:
if api_key: api_key = endpoint_config.get('api_key')
prepared_params['key'] = api_key if api_key:
prepared_params['key'] = api_key
elif provider == 'jizhiliao': elif provider == 'jizhiliao':
# 极致聊接口需要 key默认从配置读取 # 极致聊接口需要 key默认从配置读取
@ -259,15 +263,14 @@ class UniversalAPIManager:
# 这里不需要修改paramsAuthorization会在make_request中处理 # 这里不需要修改paramsAuthorization会在make_request中处理
pass pass
return prepared_params return prepared_params
def make_request(self, def make_request(self,
provider: str, provider: str,
endpoint: str, endpoint: str,
params: Dict[str, Any] = None, params: Dict[str, Any] = None,
timeout: int = 60, timeout: int = 60,
retries: int = 3) -> Dict[str, Any]: retries: int = 3) -> Dict[str, Any]:
""" """
发送API请求 发送API请求
@ -282,17 +285,17 @@ class UniversalAPIManager:
Dict[str, Any]: API响应数据 Dict[str, Any]: API响应数据
""" """
params = params or {} params = params or {}
# 验证参数 # 验证参数
self._validate_params(provider, endpoint, params) self._validate_params(provider, endpoint, params)
# 准备参数 # 准备参数
prepared_params = self._prepare_params(provider, endpoint, params) prepared_params = self._prepare_params(provider, endpoint, params)
# 获取端点配置 # 获取端点配置
endpoint_config = self._get_endpoint_config(provider, endpoint) endpoint_config = self._get_endpoint_config(provider, endpoint)
method = endpoint_config['method'].upper() method = endpoint_config['method'].upper()
# 构建URL # 构建URL
url = self._build_url(provider, endpoint, prepared_params) url = self._build_url(provider, endpoint, prepared_params)
# 发送请求 # 发送请求
@ -314,34 +317,37 @@ class UniversalAPIManager:
# 对于Dify API需要设置Authorization头 # 对于Dify API需要设置Authorization头
provider_config = self._get_provider_config(provider) provider_config = self._get_provider_config(provider)
endpoint_config = self._get_endpoint_config(provider, endpoint) endpoint_config = self._get_endpoint_config(provider, endpoint)
# 获取API密钥 # 获取API密钥
api_key = provider_config.get('api_key', 'app-FJXEWdKv63oq1F4rHb4I8kvE') api_key = provider_config.get('api_key', 'app-FJXEWdKv63oq1F4rHb4I8kvE')
# 设置请求头 # 设置请求头
headers = { headers = {
'Authorization': f'Bearer {api_key}', 'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json' 'Content-Type': 'application/json'
} }
response = self.session.post(url, json=prepared_params, headers=headers, timeout=timeout) response = self.session.post(url, json=prepared_params, headers=headers, timeout=timeout)
elif provider == 'dajiala':
# dajiala是微信指数/极致聊接口使用普通POST请求
response = self.session.post(url, json=prepared_params, timeout=timeout)
else: else:
response = self.session.post(url, json=prepared_params, timeout=timeout) response = self.session.post(url, json=prepared_params, timeout=timeout)
else: else:
raise ValueError(f"不支持的HTTP方法: {method}") raise ValueError(f"不支持的HTTP方法: {method}")
# 检查响应状态 # 检查响应状态
response.raise_for_status() response.raise_for_status()
# 解析响应 # 解析响应
try: try:
result = response.json() result = response.json()
except json.JSONDecodeError: except json.JSONDecodeError:
result = {'raw_response': response.text} result = {'raw_response': response.text}
logger.info(f"API请求成功: {provider}.{endpoint}") logger.info(f"API请求成功: {provider}.{endpoint}")
return result return result
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.warning(f"API请求失败 (尝试 {attempt + 1}/{retries + 1}): {e}") logger.warning(f"API请求失败 (尝试 {attempt + 1}/{retries + 1}): {e}")
if attempt == retries: if attempt == retries:
@ -354,7 +360,7 @@ class UniversalAPIManager:
} }
time.sleep(2 ** attempt) # 指数退避 time.sleep(2 ** attempt) # 指数退避
def wx_index(self,keyword): def wx_index(self, keyword):
""" """
微信指数 微信指数
""" """
@ -365,9 +371,10 @@ class UniversalAPIManager:
"sub_search_type": 0, "sub_search_type": 0,
"verifycode": "" "verifycode": ""
} }
return self.make_request('dajiala', 'web_search', data) return self.make_request('dajiala', 'web_search', data)
def video_views(self,platform_type,video_id): def video_views(self, platform_type, video_id):
""" """
平台视频播放量 平台视频播放量
args: args:
@ -384,7 +391,6 @@ class UniversalAPIManager:
return self.make_request('justoneapi', 'platform_type', params) return self.make_request('justoneapi', 'platform_type', params)
# 站长之家API的便捷方法 # 站长之家API的便捷方法
def query_copyright_software(self, company_name: str, chinaz_ver: str = "1") -> Dict[str, Any]: def query_copyright_software(self, company_name: str, chinaz_ver: str = "1") -> Dict[str, Any]:
"""查询企业软件著作权""" """查询企业软件著作权"""
@ -415,7 +421,7 @@ class UniversalAPIManager:
'ChinazVer': chinaz_ver 'ChinazVer': chinaz_ver
} }
return self.make_request('chinaz', 'judgement', params) return self.make_request('chinaz', 'judgement', params)
# 小红书便捷方法 # 小红书便捷方法
def get_xiaohongshu_note_detail(self, note_id: str) -> Dict[str, Any]: def get_xiaohongshu_note_detail(self, note_id: str) -> Dict[str, Any]:
""" """
@ -428,18 +434,18 @@ class UniversalAPIManager:
Dict[str, Any]: 笔记详情数据 Dict[str, Any]: 笔记详情数据
""" """
params = {'noteId': note_id} params = {'noteId': note_id}
return self.make_request('xiaohongshu', 'xiaohongshu_note_detail', params) return self.make_request('xiaohongshu', 'xiaohongshu_note_detail', params)
# 极致聊便捷方法 # 极致聊便捷方法
def search_jizhiliao_index( def search_jizhiliao_index(
self, self,
keyword: str, keyword: str,
mode: int = 2, mode: int = 2,
business_type: int = 8192, business_type: int = 8192,
sub_search_type: int = 0, sub_search_type: int = 0,
key: Optional[str] = None, key: Optional[str] = None,
verifycode: Optional[str] = None, verifycode: Optional[str] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""执行极致聊指数搜索""" """执行极致聊指数搜索"""
params: Dict[str, Any] = { params: Dict[str, Any] = {
@ -455,31 +461,40 @@ class UniversalAPIManager:
return self.make_request('jizhiliao', 'index_search', params) return self.make_request('jizhiliao', 'index_search', params)
def douyin_video_detail(self, video_id: str) -> Dict[str, Any]:
"""抖音 视频详情接口"""
params = {
'videoId': video_id,
}
return self.make_request('justoneapi', 'douyin_video_detail', params)
# 通用方法 # 通用方法
def list_providers(self) -> list: def list_providers(self) -> list:
"""列出所有可用的API提供商""" """列出所有可用的API提供商"""
return self.config.list_providers() return self.config.list_providers()
def list_endpoints(self, provider: str) -> list: def list_endpoints(self, provider: str) -> list:
"""列出指定提供商的所有端点""" """列出指定提供商的所有端点"""
return self.config.list_endpoints(provider) return self.config.list_endpoints(provider)
def get_endpoint_info(self, provider: str, endpoint: str) -> Dict[str, Any]: def get_endpoint_info(self, provider: str, endpoint: str) -> Dict[str, Any]:
"""获取端点详细信息""" """获取端点详细信息"""
return self._get_endpoint_config(provider, endpoint) return self._get_endpoint_config(provider, endpoint)
def set_api_key(self, provider: str, api_key: str) -> None: def set_api_key(self, provider: str, api_key: str) -> None:
"""设置API密钥""" """设置API密钥"""
self.config.set_api_key(provider, api_key) self.config.set_api_key(provider, api_key)
def add_endpoint(self, def add_endpoint(self,
provider: str, provider: str,
endpoint_name: str, endpoint_name: str,
path: str, path: str,
method: str = 'GET', method: str = 'GET',
description: str = '', description: str = '',
required_params: list = None, required_params: list = None,
optional_params: list = None) -> None: optional_params: list = None) -> None:
"""添加新的API端点""" """添加新的API端点"""
self.config.add_endpoint( self.config.add_endpoint(
provider=provider, provider=provider,
@ -493,4 +508,6 @@ class UniversalAPIManager:
# 创建全局实例 # 创建全局实例
universal_api = UniversalAPIManager()
universal_api = UniversalAPIManager()

View File

@ -0,0 +1,131 @@
"""
微信指数计算工具
用于处理微信指数API返回的数据并计算平均值
"""
from typing import Dict, Any, Optional, List
import logging
logger = logging.getLogger(__name__)
class WeChatIndexCalculator:
"""微信指数计算器"""
@staticmethod
def extract_index_data(api_response: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
从微信指数API响应中提取指数数据
Args:
api_response: 微信指数API的响应数据
Returns:
包含指数和时间戳的数据列表
"""
try:
# 检查响应状态
if api_response.get('code') != 0:
logger.error(f"微信指数API返回错误: {api_response}")
return []
# 提取数据路径: data[0].items[0].itemInfo.data
data_list = api_response.get('data', [])
if not data_list:
logger.warning("微信指数响应中没有数据")
return []
first_data = data_list[0]
items = first_data.get('items', [])
if not items:
logger.warning("微信指数响应中没有items数据")
return []
first_item = items[0]
item_info = first_item.get('itemInfo', {})
index_data = item_info.get('data', [])
if not index_data:
logger.warning("微信指数响应中没有指数数据")
return []
logger.info(f"成功提取到 {len(index_data)} 条微信指数数据")
return index_data
except Exception as e:
logger.error(f"提取微信指数数据时发生错误: {e}")
return []
@staticmethod
def calculate_30_day_average(index_data: List[Dict[str, Any]]) -> float:
"""
计算近30天微信指数平均值
Args:
index_data: 包含指数和时间戳的数据列表
Returns:
近30天的平均指数值
"""
try:
if not index_data:
logger.warning("没有指数数据用于计算平均值")
return 0.0
# 取最近30条数据如果数据不足30条则使用所有数据
recent_data = index_data[-30:] if len(index_data) >= 30 else index_data
# 提取指数值并转换为数字
index_values = []
for item in recent_data:
try:
index_str = item.get('index', '0')
index_value = float(index_str) if index_str else 0.0
index_values.append(index_value)
except (ValueError, TypeError) as e:
logger.warning(f"无法转换指数值 '{item.get('index')}': {e}")
index_values.append(0.0)
if not index_values:
logger.warning("没有有效的指数值用于计算")
return 0.0
# 计算平均值
total_sum = sum(index_values)
count = len(index_values)
average = total_sum / count
logger.info(f"计算微信指数平均值: {count}天数据,总和={total_sum},平均值={average:.2f}")
return round(average, 2)
except Exception as e:
logger.error(f"计算微信指数平均值时发生错误: {e}")
return 0.0
@classmethod
def process_wechat_index_response(cls, api_response: Dict[str, Any]) -> float:
"""
处理微信指数API响应返回近30天平均值
Args:
api_response: 微信指数API的完整响应
Returns:
近30天微信指数平均值
"""
try:
# 提取指数数据
index_data = cls.extract_index_data(api_response)
# 计算平均值
average_index = cls.calculate_30_day_average(index_data)
return average_index
except Exception as e:
logger.error(f"处理微信指数响应时发生错误: {e}")
return 0.0
# 创建全局实例
wechat_index_calculator = WeChatIndexCalculator()

View File

@ -7,7 +7,7 @@ import vue from '@vitejs/plugin-vue'
import Unocss from 'unocss/vite' import Unocss from 'unocss/vite'
// rollup打包分析插件 // rollup打包分析插件
import visualizer from 'rollup-plugin-visualizer' import { visualizer } from 'rollup-plugin-visualizer'
// 压缩 // 压缩
import viteCompression from 'vite-plugin-compression' import viteCompression from 'vite-plugin-compression'