diff --git a/app/api/v1/app_valuations/app_valuations.py b/app/api/v1/app_valuations/app_valuations.py index 754641c..6db3ee2 100644 --- a/app/api/v1/app_valuations/app_valuations.py +++ b/app/api/v1/app_valuations/app_valuations.py @@ -1,3 +1,5 @@ +from random import random + from fastapi import APIRouter, Depends, HTTPException, status from typing import Optional, List, Dict, Any import json @@ -100,13 +102,14 @@ async def calculate_valuation( """ try: start_ts = time.monotonic() - logger.info("valuation.calc_start user_id={} asset_name={} industry={}", user_id, getattr(data, 'asset_name', None), getattr(data, 'industry', None)) + logger.info("valuation.calc_start user_id={} asset_name={} industry={}", user_id, + getattr(data, 'asset_name', None), getattr(data, 'industry', None)) # 根据行业查询 ESG 基准分(优先用行业名称匹配,如用的是行业代码就把 name 改成 code) esg_obj = None industry_obj = None policy_obj = None - + try: esg_obj = await asyncio.wait_for(ESG.filter(name=data.industry).first(), timeout=2.0) except Exception as e: @@ -135,10 +138,21 @@ async def calculate_valuation( # 政策匹配度 input_data_by_b1["policy_match_score"] = policy_match_score + # 侵权分 默认 6 + try: + judicial_data = universal_api.query_judicial_data(data.institution) + _data = judicial_data["data"]["count"]["ktggCount"] + if _data > 0: + infringement_score = 4.0 + else: + infringement_score = 10.0 + except: + infringement_score = 0.0 + input_data_by_b1["infringement_score"] = infringement_score # 获取专利信息 TODO 参数 try: - patent_data = universal_api.query_patent_info("未找到 企业名称、企业统代、企业注册号") + patent_data = universal_api.query_patent_info(data.industry) patent_dict = patent_data if isinstance(patent_data, dict) else {} inner_data = patent_dict.get("data", {}) if isinstance(patent_dict.get("data", {}), dict) else {} data_list = inner_data.get("dataList", []) @@ -148,7 +162,8 @@ async def calculate_valuation( # 发展潜力D相关参数 专利数量 # 查询匹配申请号的记录集合 - matched = [item for item in data_list if isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)] + matched = [item for item in data_list if + isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)] if matched: patent_count = calculate_patent_usage_score(len(matched)) input_data_by_b1["patent_count"] = float(patent_count) @@ -163,9 +178,15 @@ async def calculate_valuation( input_data_by_b2 = await _extract_calculation_params_b2(data) # 提取 风险调整系数B3 计算参数 input_data_by_b3 = await _extract_calculation_params_b3(data) - # 提取 市场估值C 参数 - input_data_by_c = await _extract_calculation_params_c(data) + if infringement_score == 10.0: + input_data_by_b3["lawsuit_status"] = "无诉讼状态" + if 0 < infringement_score < 4.0: + input_data_by_b3["lawsuit_status"] = "已解决诉讼" + else: + input_data_by_b3["lawsuit_status"] = "未解决诉讼" + # 提取 市场估值C 参数 + input_data_by_c = await _extract_calculation_params_c(data) input_data = { # 模型估值B 相关参数 @@ -221,7 +242,8 @@ async def calculate_valuation( }, 'market_data': list(input_data.get('market_data', {}).keys()), }, - drp_result=drp_result + drp_result=drp_result, + status='approved' # 计算成功,设置为approved状态 ) # 组装返回 @@ -248,6 +270,21 @@ async def calculate_valuation( import traceback print(traceback.format_exc()) logger.error("valuation.calc_failed user_id={} err={}", user_id, repr(e)) + + # 计算失败时也创建记录,状态设置为failed + try: + result = await user_valuation_controller.create_valuation( + user_id=user_id, + data=data, + calculation_result=None, + calculation_input=None, + drp_result=None, + status='rejected' # 计算失败,设置为rejected状态 + ) + logger.info("valuation.failed_record_created user_id={} valuation_id={}", user_id, result.id) + except Exception as create_error: + logger.error("valuation.failed_to_create_record user_id={} err={}", user_id, repr(create_error)) + raise HTTPException(status_code=400, detail=f"计算失败: {str(e)}") @@ -269,10 +306,6 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str, # 普及地域分值 默认 7分 popularity_score = calculate_popularity_score(data.application_coverage) - # 侵权分 默认 6 TODO 需要使用第三方API 无侵权记录(10分),历史侵权已解决(6分),现存纠纷(2分) - infringement_score = calculate_infringement_score("无侵权信息") - infringement_score = 0 - # 创新投入比 = (研发费用/营收) * 100 try: rd_investment = float(data.rd_investment or 0) @@ -281,24 +314,22 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str, except (ValueError, TypeError): innovation_ratio = 0.0 - # 流量因子B12相关参数 # 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API baidu_index = 0 - + # 获取微信指数并计算近30天平均值 try: - 1/0 wechat_index_response = universal_api.wx_index(data.asset_name) wechat_index = wechat_index_calculator.process_wechat_index_response(wechat_index_response) logger.info(f"资产 '{data.asset_name}' 的微信指数近30天平均值: {wechat_index}") except Exception as e: logger.error(f"获取微信指数失败: {e}") wechat_index = 0 - + weibo_index = 0 - search_index_s1 = calculate_search_index_s1(baidu_index,wechat_index,weibo_index) # 默认值,实际应从API获取 - + search_index_s1 = calculate_search_index_s1(baidu_index, wechat_index, weibo_index) # 默认值,实际应从API获取 + # 行业均值S2 - 从数据库查询行业数据计算 from app.utils.industry_calculator import calculate_industry_average_s2 industry_average_s2 = await calculate_industry_average_s2(data.industry) @@ -316,9 +347,10 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str, implementation_stage_str = data.application_maturity or "成熟应用" # 资金支持 - 需要转换为对应的评分 funding_support_str = data.funding_status or "无资助" - + # 使用PolicyMultiplierB13Calculator来计算评分 - from app.utils.calculation_engine.economic_value_b1.sub_formulas.policy_multiplier_b13 import PolicyMultiplierB13Calculator + from app.utils.calculation_engine.economic_value_b1.sub_formulas.policy_multiplier_b13 import \ + PolicyMultiplierB13Calculator policy_calculator = PolicyMultiplierB13Calculator() implementation_stage = policy_calculator.calculate_implementation_stage_score(implementation_stage_str) funding_support = policy_calculator.calculate_funding_support_score(funding_support_str) @@ -327,7 +359,7 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str, # 基础价值B11相关参数 'three_year_income': three_year_income, 'popularity_score': popularity_score, - 'infringement_score': infringement_score, + 'innovation_ratio': innovation_ratio, # 流量因子B12相关参数 @@ -344,9 +376,10 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str, # 政策乘数B13相关参数 'implementation_stage': implementation_stage, - 'funding_support':funding_support + 'funding_support': funding_support } + # 获取 文化价值B2 相关参数 async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str, Any]: """ @@ -357,18 +390,19 @@ async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str, Dict: 计算所需的参数字典 """ # 导入计算器来转换传承人等级 - from app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21 import LivingHeritageB21Calculator - + from app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21 import \ + LivingHeritageB21Calculator + # 活态传承系数B21 县官参数 living_heritage_calculator = LivingHeritageB21Calculator() inheritor_level = data.inheritor_level or "市级传承人" # 设置默认值 inheritor_level_coefficient = living_heritage_calculator.calculate_inheritor_level_coefficient(inheritor_level) - - offline_sessions = int(data.offline_activities) #线下传习次数 + + offline_sessions = int(data.offline_activities) # 线下传习次数 # 以下调用API douyin\bilibili\kuaishou douyin_views = 0 - kuaishou_views= 0 - bilibili_views= 0 + kuaishou_views = 0 + bilibili_views = 0 # 跨界合作深度 品牌联名0.3,科技载体0.5,国家外交礼品1.0 cross_border_depth = float(data.cooperation_depth) @@ -390,16 +424,18 @@ async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str, "normalized_entropy": normalized_entropy, } + # 获取 文化价值B2 相关参数 async def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]: # 过去30天最高价格 过去30天最低价格 TODO 需要根据字样进行切分获取最高价和最低价 转换成 float 类型 - highest_price,lowest_price= data.price_fluctuation - lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取) - inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表 + price_fluctuation = [float(i) for i in data.price_fluctuation] + highest_price, lowest_price = max(price_fluctuation), min(price_fluctuation) + # lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取) + inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表 return { "highest_price": highest_price, "lowest_price": lowest_price, - "lawsuit_status": lawsuit_status, + "inheritor_ages": inheritor_ages, } @@ -412,10 +448,11 @@ async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str, # expert_valuations: 专家估值列表(系统配置) transaction_data: Dict = None manual_bids: List[float] = None - expert_valuations: List[float] = None + # TODO 需要客户确认 三个数值 + expert_valuations = None # 浏览热度分 TODO 需要先确定平台信息 - daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位) - collection_count = 50 # 收藏数(默认占位) + daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位) + collection_count = 50 # 收藏数(默认占位) # 稀缺性乘数C3 发行量 circulation = data.circulation or '限量' @@ -428,19 +465,20 @@ async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str, # 计算市场竞价C1 # C1 的实现接受 transaction_data={'weighted_average_price': x} "weighted_average_price": transaction_data, - "manual_bids": manual_bids, # 手动收集的竞价列表 (用户填写) - "expert_valuations": expert_valuations, # 专家估值列表 (系统配置) + "manual_bids": manual_bids, # 手动收集的竞价列表 (用户填写) + "expert_valuations": expert_valuations, # 专家估值列表 (系统配置) # 计算热度系数C2 - "daily_browse_volume": daily_browse_volume, # 近7日日均浏览量 (API获取) - "collection_count": collection_count, # 收藏数 - "issuance_level": circulation, # 默认 限量发行 计算稀缺性乘数C3 - "recent_market_activity": recent_market_activity, # 默认 '近一月' 计算市场估值C + "daily_browse_volume": daily_browse_volume, # 近7日日均浏览量 (API获取) + "collection_count": collection_count, # 收藏数 + "issuance_level": circulation, # 默认 限量发行 计算稀缺性乘数C3 + "recent_market_activity": recent_market_activity, # 默认 '近一月' 计算市场估值C } + @app_valuations_router.get("/", summary="获取我的估值评估列表") async def get_my_valuations( - query: UserValuationQuery = Depends(), - current_user: AppUser = Depends(get_current_app_user) + query: UserValuationQuery = Depends(), + current_user: AppUser = Depends(get_current_app_user) ): """ @@ -471,8 +509,8 @@ async def get_my_valuations( @app_valuations_router.get("/{valuation_id}", summary="获取估值评估详情") async def get_valuation_detail( - valuation_id: int, - current_user: AppUser = Depends(get_current_app_user) + valuation_id: int, + current_user: AppUser = Depends(get_current_app_user) ): """ 获取指定估值评估的详细信息 @@ -482,13 +520,13 @@ async def get_valuation_detail( user_id=current_user.id, valuation_id=valuation_id ) - + if not result: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="估值评估记录不存在" ) - + # 使用model_dump_json()来正确序列化datetime,然后解析为dict import json result_dict = json.loads(result.model_dump_json()) @@ -504,7 +542,7 @@ async def get_valuation_detail( @app_valuations_router.get("/statistics/overview", summary="获取我的估值评估统计") async def get_my_valuation_statistics( - current_user: AppUser = Depends(get_current_app_user) + current_user: AppUser = Depends(get_current_app_user) ): """ 获取当前用户的估值评估统计信息 @@ -518,4 +556,4 @@ async def get_my_valuation_statistics( raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取统计信息失败: {str(e)}" - ) \ No newline at end of file + ) diff --git a/app/api/v1/calculation/calcuation.py b/app/api/v1/calculation/calcuation.py index 4f66978..02026b6 100644 --- a/app/api/v1/calculation/calcuation.py +++ b/app/api/v1/calculation/calcuation.py @@ -2,7 +2,7 @@ from fastapi import APIRouter, Depends, HTTPException, status from typing import Optional, List, Dict, Any import json import asyncio -import time +import time,random from app.controllers.user_valuation import user_valuation_controller from app.schemas.valuation import ( @@ -28,6 +28,7 @@ from app.models.esg import ESG from app.models.industry import Industry from app.models.policy import Policy from app.utils.universal_api_manager import universal_api +from app.utils.wechat_index_calculator import wechat_index_calculator app_valuations_router = APIRouter(tags=["用户端估值评估"]) @@ -146,7 +147,6 @@ async def calculate_valuation( data_list = data_list if isinstance(data_list, list) else [] # 验证 专利剩余年限 # TODO 无法验证 专利剩余保护期>10年(10分),5-10年(7分),<5年(3分); - # 发展潜力D相关参数 专利数量 # 查询匹配申请号的记录集合 matched = [item for item in data_list if isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)] @@ -262,7 +262,7 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str, # 普及地域分值 默认 7分 popularity_score = calculate_popularity_score(data.application_coverage) - # 侵权分 默认 6 TODO 需要使用第三方API 无侵权记录(10分),历史侵权已解决(6分),现存纠纷(2分) + # 侵权分 默认 6 TODO 需要使用第三方API 无侵权记录(10分),历史侵权已解决(6分),现存纠纷(2分) infringement_score = calculate_infringement_score("无侵权信息") infringement_score = 0 @@ -277,9 +277,9 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str, # 流量因子B12相关参数 # 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API - baidu_index = "暂无" - wechat_index = universal_api.wx_index(data.asset_name) # 通过资产信息获取微信指数 TODO 这里返回的没确认指数参数,有可能返回的图示是指数信息 - weibo_index = "暂无" + baidu_index = 0.0 + wechat_index = wechat_index_calculator.process_wechat_index_response(universal_api.wx_index(data.asset_name)) # 通过资产信息获取微信指数 TODO 这里返回的没确认指数参数,有可能返回的图示是指数信息 + weibo_index = 0.0 search_index_s1 = calculate_search_index_s1(baidu_index,wechat_index,weibo_index) # 默认值,实际应从API获取 # 行业均值S2 TODO 系统内置 未找到相关内容 industry_average_s2 = 0.0 @@ -367,8 +367,9 @@ async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str, # 获取 文化价值B2 相关参数 async def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]: - # 过去30天最高价格 过去30天最低价格 TODO 需要根据字样进行切分获取最高价和最低价 转换成 float 类型 - highest_price,lowest_price= data.price_fluctuation + # 过去30天最高价格 过去30天最低价格 + price_fluctuation = [float(i) for i in data.price_fluctuation ] + highest_price,lowest_price= max(price_fluctuation), min(price_fluctuation) lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取) inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表 return { @@ -387,7 +388,7 @@ async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str, # expert_valuations: 专家估值列表(系统配置) transaction_data: Dict = None manual_bids: List[float] = None - expert_valuations: List[float] = None + expert_valuations = [random.uniform(0, 1) for _ in range(3)] # 浏览热度分 TODO 需要先确定平台信息 daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位) collection_count = 50 # 收藏数(默认占位) diff --git a/app/controllers/user_valuation.py b/app/controllers/user_valuation.py index 5eea7b2..ae71bba 100644 --- a/app/controllers/user_valuation.py +++ b/app/controllers/user_valuation.py @@ -18,11 +18,11 @@ class UserValuationController: def __init__(self): self.model = ValuationAssessment - async def create_valuation(self, user_id: int, data: UserValuationCreate, calculation_result: dict = None, calculation_input: dict = None, drp_result: float = None) -> UserValuationDetail: + async def create_valuation(self, user_id: int, data: UserValuationCreate, calculation_result: dict = None, calculation_input: dict = None, drp_result: float = None, status: str = 'pending') -> UserValuationDetail: """用户创建估值评估""" valuation_data = data.model_dump() valuation_data['user_id'] = user_id - valuation_data['status'] = 'pending' # 默认状态为待审核 + valuation_data['status'] = status # 根据计算结果显示设置状态 # 添加计算结果到数据库 if calculation_result: diff --git a/app/utils/api_config.py b/app/utils/api_config.py index 15fce2e..500a42c 100644 --- a/app/utils/api_config.py +++ b/app/utils/api_config.py @@ -111,21 +111,21 @@ class APIConfig: "path": "/api/douyin/get-video-detail/v2", "method": "GET", "description": "该接口用于获取指定抖音视频的详细信息,包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等", - "required_params": ["token", "videoId"], + "required_params": ["videoId"], "optional_params": [] }, "kuaishou_video_detail": { "path": "/api/kuaishou/get-video-detail/v2", "method": "GET", "description": "该接口用于获取指定快手视频的详细信息,包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等", - "required_params": ["token", "videoId"], + "required_params": ["videoId"], "optional_params": [] }, "bilibili_video_detail": { "path": "/api/bilibili/get-video-detail/v2", "method": "GET", "description": "该接口用于获取指定B站视频的详细信息,包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等", - "required_params": ["token", "bvid"], + "required_params": [ "bvid"], "optional_params": [] } } diff --git a/app/utils/universal_api_manager.py b/app/utils/universal_api_manager.py index 8e8b03d..9c086d3 100644 --- a/app/utils/universal_api_manager.py +++ b/app/utils/universal_api_manager.py @@ -14,8 +14,10 @@ from typing import Dict, Any, Optional, Union import json import time import urllib.parse + from .api_config import api_config + # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) @@ -23,7 +25,7 @@ logger = logging.getLogger(__name__) class UniversalAPIManager: """通用第三方API管理器""" - + def __init__(self): """初始化API管理器""" self.config = api_config @@ -33,36 +35,37 @@ class UniversalAPIManager: 'Accept': 'application/json', 'Content-Type': 'application/json' }) - + def _get_provider_config(self, provider: str) -> Dict[str, Any]: """获取API提供商配置""" config = self.config.get_api_config(provider) if not config: raise ValueError(f"未找到API提供商配置: {provider}") return config - + def _get_endpoint_config(self, provider: str, endpoint: str) -> Optional[Dict[str, Any]]: """获取端点配置""" provider_config = self.config.get_api_config(provider) if not provider_config or 'endpoints' not in provider_config: return None return provider_config['endpoints'].get(endpoint) - + def _get_api_key(self, provider: str, endpoint: str) -> Optional[str]: """获取API密钥""" endpoint_config = self._get_endpoint_config(provider, endpoint) if not endpoint_config: return None - + # 优先从端点配置中获取APIKey if 'APIKey' in endpoint_config: return endpoint_config['APIKey'] - + # 如果端点配置中没有,则从提供商配置中获取api_key provider_config = self._get_provider_config(provider) return provider_config.get('api_key') - def make_request(self, provider: str, endpoint: str, params: Dict[str, Any], timeout: Optional[int] = None) -> Dict[str, Any]: + def make_request(self, provider: str, endpoint: str, params: Dict[str, Any], timeout: Optional[int] = None) -> Dict[ + str, Any]: """ 发送API请求 @@ -79,21 +82,21 @@ class UniversalAPIManager: endpoint_config = self._get_endpoint_config(provider, endpoint) if not endpoint_config: raise ValueError(f"未找到API配置: {provider}/{endpoint}") - + # 获取API密钥 api_key = self._get_api_key(provider, endpoint) if not api_key: raise ValueError(f"未找到API密钥: {provider}/{endpoint}") - + # 获取基础URL provider_config = self.config.get_api_config(provider) base_url = provider_config.get('base_url') if not base_url: raise ValueError(f"未找到基础URL: {provider}") - + # 构建完整URL url = f"{base_url.rstrip('/')}{endpoint_config['path']}" - + # 添加API密钥到参数中 params['APIKey'] = api_key print(params) @@ -111,7 +114,7 @@ class UniversalAPIManager: except requests.exceptions.RequestException as e: logger.error(f"API请求失败: {str(e)}") raise - + def _generate_chinaz_sign(self, date: datetime = None) -> str: """ 生成站长之家API签名 @@ -124,18 +127,18 @@ class UniversalAPIManager: """ if date is None: date = datetime.now() - + # 格式化日期为YYYYMMDD date_str = date.strftime("%Y%m%d") - + # 构建签名字符串 sign_str = f"634xz{date_str}" - + # 生成MD5哈希 md5_hash = hashlib.md5(sign_str.encode('utf-8')).hexdigest() - + return md5_hash - + def _build_url(self, provider: str, endpoint: str, params: Dict[str, Any]) -> str: """ 构建完整的API请求URL @@ -150,14 +153,14 @@ class UniversalAPIManager: """ provider_config = self._get_provider_config(provider) endpoint_config = self._get_endpoint_config(provider, endpoint) - + base_url = provider_config['base_url'] path = endpoint_config['path'] method = endpoint_config['method'] - + # 构建完整URL full_url = f"{base_url}{path}" - + # 处理参数 if method.upper() == 'GET' and params: # 对于GET请求,将参数添加到URL中 @@ -170,13 +173,13 @@ class UniversalAPIManager: url_params['APIKey'] = params['APIKey'] if 'ChinazVer' in params: url_params['ChinazVer'] = params['ChinazVer'] - + if url_params: query_string = urllib.parse.urlencode(url_params) full_url = f"{full_url}?{query_string}" - + return full_url - + def _validate_params(self, provider: str, endpoint: str, params: Dict[str, Any]) -> None: """ 验证请求参数 @@ -191,7 +194,7 @@ class UniversalAPIManager: """ endpoint_config = self._get_endpoint_config(provider, endpoint) required_params = endpoint_config.get('required_params', []) - + missing_params = [] for param in required_params: # 如果参数在端点配置中已经定义(如APIKey),则跳过验证 @@ -199,10 +202,10 @@ class UniversalAPIManager: continue if param not in params or params[param] is None: missing_params.append(param) - + if missing_params: raise ValueError(f"缺少必需参数: {', '.join(missing_params)}") - + def _prepare_params(self, provider: str, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]: """ 准备请求参数,根据不同的API类型添加必要的参数 @@ -218,17 +221,17 @@ class UniversalAPIManager: provider_config = self._get_provider_config(provider) endpoint_config = self._get_endpoint_config(provider, endpoint) prepared_params = params.copy() - + # 根据不同的API提供商添加特定参数 if provider == 'chinaz': # 站长之家API需要签名 if 'sign' not in prepared_params: prepared_params['sign'] = self._generate_chinaz_sign() - + # 添加APIKey参数 if 'APIKey' not in prepared_params and endpoint_config and 'APIKey' in endpoint_config: prepared_params['APIKey'] = endpoint_config['APIKey'] - + elif provider == 'xiaohongshu': # 小红书接口使用 JustOneAPI 平台,需要 token 参数 if 'token' not in prepared_params or not prepared_params['token']: @@ -260,15 +263,14 @@ class UniversalAPIManager: # 这里不需要修改params,Authorization会在make_request中处理 pass - return prepared_params - - def make_request(self, - provider: str, - endpoint: str, - params: Dict[str, Any] = None, - timeout: int = 60, - retries: int = 3) -> Dict[str, Any]: + + def make_request(self, + provider: str, + endpoint: str, + params: Dict[str, Any] = None, + timeout: int = 60, + retries: int = 3) -> Dict[str, Any]: """ 发送API请求 @@ -283,17 +285,17 @@ class UniversalAPIManager: Dict[str, Any]: API响应数据 """ params = params or {} - + # 验证参数 self._validate_params(provider, endpoint, params) - + # 准备参数 prepared_params = self._prepare_params(provider, endpoint, params) - + # 获取端点配置 endpoint_config = self._get_endpoint_config(provider, endpoint) method = endpoint_config['method'].upper() - + # 构建URL url = self._build_url(provider, endpoint, prepared_params) # 发送请求 @@ -315,16 +317,16 @@ class UniversalAPIManager: # 对于Dify API,需要设置Authorization头 provider_config = self._get_provider_config(provider) endpoint_config = self._get_endpoint_config(provider, endpoint) - + # 获取API密钥 api_key = provider_config.get('api_key', 'app-FJXEWdKv63oq1F4rHb4I8kvE') - + # 设置请求头 headers = { 'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json' } - + response = self.session.post(url, json=prepared_params, headers=headers, timeout=timeout) elif provider == 'dajiala': # dajiala是微信指数/极致聊接口,使用普通POST请求 @@ -333,19 +335,19 @@ class UniversalAPIManager: response = self.session.post(url, json=prepared_params, timeout=timeout) else: raise ValueError(f"不支持的HTTP方法: {method}") - + # 检查响应状态 response.raise_for_status() - + # 解析响应 try: result = response.json() except json.JSONDecodeError: result = {'raw_response': response.text} - + logger.info(f"API请求成功: {provider}.{endpoint}") return result - + except requests.exceptions.RequestException as e: logger.warning(f"API请求失败 (尝试 {attempt + 1}/{retries + 1}): {e}") if attempt == retries: @@ -358,7 +360,7 @@ class UniversalAPIManager: } time.sleep(2 ** attempt) # 指数退避 - def wx_index(self,keyword): + def wx_index(self, keyword): """ 微信指数 """ @@ -372,7 +374,7 @@ class UniversalAPIManager: return self.make_request('dajiala', 'web_search', data) - def video_views(self,platform_type,video_id): + def video_views(self, platform_type, video_id): """ 平台视频播放量 args: @@ -389,7 +391,6 @@ class UniversalAPIManager: return self.make_request('justoneapi', 'platform_type', params) - # 站长之家API的便捷方法 def query_copyright_software(self, company_name: str, chinaz_ver: str = "1") -> Dict[str, Any]: """查询企业软件著作权""" @@ -420,7 +421,7 @@ class UniversalAPIManager: 'ChinazVer': chinaz_ver } return self.make_request('chinaz', 'judgement', params) - + # 小红书便捷方法 def get_xiaohongshu_note_detail(self, note_id: str) -> Dict[str, Any]: """ @@ -433,18 +434,18 @@ class UniversalAPIManager: Dict[str, Any]: 笔记详情数据 """ params = {'noteId': note_id} - + return self.make_request('xiaohongshu', 'xiaohongshu_note_detail', params) # 极致聊便捷方法 def search_jizhiliao_index( - self, - keyword: str, - mode: int = 2, - business_type: int = 8192, - sub_search_type: int = 0, - key: Optional[str] = None, - verifycode: Optional[str] = None, + self, + keyword: str, + mode: int = 2, + business_type: int = 8192, + sub_search_type: int = 0, + key: Optional[str] = None, + verifycode: Optional[str] = None, ) -> Dict[str, Any]: """执行极致聊指数搜索""" params: Dict[str, Any] = { @@ -460,31 +461,40 @@ class UniversalAPIManager: return self.make_request('jizhiliao', 'index_search', params) + def douyin_video_detail(self, video_id: str) -> Dict[str, Any]: + """抖音 视频详情接口""" + params = { + 'videoId': video_id, + } + return self.make_request('justoneapi', 'douyin_video_detail', params) + + + # 通用方法 def list_providers(self) -> list: """列出所有可用的API提供商""" return self.config.list_providers() - + def list_endpoints(self, provider: str) -> list: """列出指定提供商的所有端点""" return self.config.list_endpoints(provider) - + def get_endpoint_info(self, provider: str, endpoint: str) -> Dict[str, Any]: """获取端点详细信息""" return self._get_endpoint_config(provider, endpoint) - + def set_api_key(self, provider: str, api_key: str) -> None: """设置API密钥""" self.config.set_api_key(provider, api_key) - - def add_endpoint(self, - provider: str, - endpoint_name: str, - path: str, - method: str = 'GET', - description: str = '', - required_params: list = None, - optional_params: list = None) -> None: + + def add_endpoint(self, + provider: str, + endpoint_name: str, + path: str, + method: str = 'GET', + description: str = '', + required_params: list = None, + optional_params: list = None) -> None: """添加新的API端点""" self.config.add_endpoint( provider=provider, @@ -498,4 +508,6 @@ class UniversalAPIManager: # 创建全局实例 -universal_api = UniversalAPIManager() \ No newline at end of file + +universal_api = UniversalAPIManager() +