refactor(valuation): 重构估值计算参数提取逻辑 fix: 修复行业均值S2计算中的除零错误 feat(api): 新增微信指数计算工具和行业数据查询工具 feat(schema): 在估值模型中添加计算结果字段 refactor: 优化动态质押率计算中的月交易额解析 fix: 处理日期解析异常时返回默认值 docs: 更新API文档中的估值计算请求示例
131 lines
4.3 KiB
Python
131 lines
4.3 KiB
Python
"""
|
||
微信指数计算工具
|
||
用于处理微信指数API返回的数据并计算平均值
|
||
"""
|
||
|
||
from typing import Dict, Any, Optional, List
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class WeChatIndexCalculator:
|
||
"""微信指数计算器"""
|
||
|
||
@staticmethod
|
||
def extract_index_data(api_response: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||
"""
|
||
从微信指数API响应中提取指数数据
|
||
|
||
Args:
|
||
api_response: 微信指数API的响应数据
|
||
|
||
Returns:
|
||
包含指数和时间戳的数据列表
|
||
"""
|
||
try:
|
||
# 检查响应状态
|
||
if api_response.get('code') != 0:
|
||
logger.error(f"微信指数API返回错误: {api_response}")
|
||
return []
|
||
|
||
# 提取数据路径: data[0].items[0].itemInfo.data
|
||
data_list = api_response.get('data', [])
|
||
if not data_list:
|
||
logger.warning("微信指数响应中没有数据")
|
||
return []
|
||
|
||
first_data = data_list[0]
|
||
items = first_data.get('items', [])
|
||
if not items:
|
||
logger.warning("微信指数响应中没有items数据")
|
||
return []
|
||
|
||
first_item = items[0]
|
||
item_info = first_item.get('itemInfo', {})
|
||
index_data = item_info.get('data', [])
|
||
|
||
if not index_data:
|
||
logger.warning("微信指数响应中没有指数数据")
|
||
return []
|
||
|
||
logger.info(f"成功提取到 {len(index_data)} 条微信指数数据")
|
||
return index_data
|
||
|
||
except Exception as e:
|
||
logger.error(f"提取微信指数数据时发生错误: {e}")
|
||
return []
|
||
|
||
@staticmethod
|
||
def calculate_30_day_average(index_data: List[Dict[str, Any]]) -> float:
|
||
"""
|
||
计算近30天微信指数平均值
|
||
|
||
Args:
|
||
index_data: 包含指数和时间戳的数据列表
|
||
|
||
Returns:
|
||
近30天的平均指数值
|
||
"""
|
||
try:
|
||
if not index_data:
|
||
logger.warning("没有指数数据用于计算平均值")
|
||
return 0.0
|
||
|
||
# 取最近30条数据(如果数据不足30条,则使用所有数据)
|
||
recent_data = index_data[-30:] if len(index_data) >= 30 else index_data
|
||
|
||
# 提取指数值并转换为数字
|
||
index_values = []
|
||
for item in recent_data:
|
||
try:
|
||
index_str = item.get('index', '0')
|
||
index_value = float(index_str) if index_str else 0.0
|
||
index_values.append(index_value)
|
||
except (ValueError, TypeError) as e:
|
||
logger.warning(f"无法转换指数值 '{item.get('index')}': {e}")
|
||
index_values.append(0.0)
|
||
|
||
if not index_values:
|
||
logger.warning("没有有效的指数值用于计算")
|
||
return 0.0
|
||
|
||
# 计算平均值
|
||
total_sum = sum(index_values)
|
||
count = len(index_values)
|
||
average = total_sum / count
|
||
|
||
logger.info(f"计算微信指数平均值: {count}天数据,总和={total_sum},平均值={average:.2f}")
|
||
return round(average, 2)
|
||
|
||
except Exception as e:
|
||
logger.error(f"计算微信指数平均值时发生错误: {e}")
|
||
return 0.0
|
||
|
||
@classmethod
|
||
def process_wechat_index_response(cls, api_response: Dict[str, Any]) -> float:
|
||
"""
|
||
处理微信指数API响应,返回近30天平均值
|
||
|
||
Args:
|
||
api_response: 微信指数API的完整响应
|
||
|
||
Returns:
|
||
近30天微信指数平均值
|
||
"""
|
||
try:
|
||
# 提取指数数据
|
||
index_data = cls.extract_index_data(api_response)
|
||
|
||
# 计算平均值
|
||
average_index = cls.calculate_30_day_average(index_data)
|
||
|
||
return average_index
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理微信指数响应时发生错误: {e}")
|
||
return 0.0
|
||
|
||
|
||
# 创建全局实例
|
||
wechat_index_calculator = WeChatIndexCalculator() |