""" 微信指数计算工具 用于处理微信指数API返回的数据并计算平均值 """ from typing import Dict, Any, Optional, List import logging logger = logging.getLogger(__name__) class WeChatIndexCalculator: """微信指数计算器""" @staticmethod def extract_index_data(api_response: Dict[str, Any]) -> List[Dict[str, Any]]: """ 从微信指数API响应中提取指数数据 Args: api_response: 微信指数API的响应数据 Returns: 包含指数和时间戳的数据列表 """ try: # 检查响应状态 if api_response.get('code') != 0: logger.error(f"微信指数API返回错误: {api_response}") return [] # 提取数据路径: data[0].items[0].itemInfo.data data_list = api_response.get('data', []) if not data_list: logger.warning("微信指数响应中没有数据") return [] first_data = data_list[0] items = first_data.get('items', []) if not items: logger.warning("微信指数响应中没有items数据") return [] first_item = items[0] item_info = first_item.get('itemInfo', {}) index_data = item_info.get('data', []) if not index_data: logger.warning("微信指数响应中没有指数数据") return [] logger.info(f"成功提取到 {len(index_data)} 条微信指数数据") return index_data except Exception as e: logger.error(f"提取微信指数数据时发生错误: {e}") return [] @staticmethod def calculate_30_day_average(index_data: List[Dict[str, Any]]) -> float: """ 计算近30天微信指数平均值 Args: index_data: 包含指数和时间戳的数据列表 Returns: 近30天的平均指数值 """ try: if not index_data: logger.warning("没有指数数据用于计算平均值") return 0.0 # 取最近30条数据(如果数据不足30条,则使用所有数据) recent_data = index_data[-30:] if len(index_data) >= 30 else index_data # 提取指数值并转换为数字 index_values = [] for item in recent_data: try: index_str = item.get('index', '0') index_value = float(index_str) if index_str else 0.0 index_values.append(index_value) except (ValueError, TypeError) as e: logger.warning(f"无法转换指数值 '{item.get('index')}': {e}") index_values.append(0.0) if not index_values: logger.warning("没有有效的指数值用于计算") return 0.0 # 计算平均值 total_sum = sum(index_values) count = len(index_values) average = total_sum / count logger.info(f"计算微信指数平均值: {count}天数据,总和={total_sum},平均值={average:.2f}") return round(average, 2) except Exception as e: logger.error(f"计算微信指数平均值时发生错误: {e}") return 0.0 @classmethod def process_wechat_index_response(cls, api_response: Dict[str, Any]) -> float: """ 处理微信指数API响应,返回近30天平均值 Args: api_response: 微信指数API的完整响应 Returns: 近30天微信指数平均值 """ try: # 提取指数数据 index_data = cls.extract_index_data(api_response) # 计算平均值 average_index = cls.calculate_30_day_average(index_data) return average_index except Exception as e: logger.error(f"处理微信指数响应时发生错误: {e}") return 0.0 # 创建全局实例 wechat_index_calculator = WeChatIndexCalculator()