更新第三方接口数据和状态

This commit is contained in:
dby 2025-10-10 21:54:47 +08:00
parent f298acb431
commit a6d7d350e0
5 changed files with 186 additions and 135 deletions

View File

@ -1,3 +1,5 @@
from random import random
from fastapi import APIRouter, Depends, HTTPException, status
from typing import Optional, List, Dict, Any
import json
@ -100,13 +102,14 @@ async def calculate_valuation(
"""
try:
start_ts = time.monotonic()
logger.info("valuation.calc_start user_id={} asset_name={} industry={}", user_id, getattr(data, 'asset_name', None), getattr(data, 'industry', None))
logger.info("valuation.calc_start user_id={} asset_name={} industry={}", user_id,
getattr(data, 'asset_name', None), getattr(data, 'industry', None))
# 根据行业查询 ESG 基准分(优先用行业名称匹配,如用的是行业代码就把 name 改成 code
esg_obj = None
industry_obj = None
policy_obj = None
try:
esg_obj = await asyncio.wait_for(ESG.filter(name=data.industry).first(), timeout=2.0)
except Exception as e:
@ -135,10 +138,21 @@ async def calculate_valuation(
# 政策匹配度
input_data_by_b1["policy_match_score"] = policy_match_score
# 侵权分 默认 6
try:
judicial_data = universal_api.query_judicial_data(data.institution)
_data = judicial_data["data"]["count"]["ktggCount"]
if _data > 0:
infringement_score = 4.0
else:
infringement_score = 10.0
except:
infringement_score = 0.0
input_data_by_b1["infringement_score"] = infringement_score
# 获取专利信息 TODO 参数
try:
patent_data = universal_api.query_patent_info("未找到 企业名称、企业统代、企业注册号")
patent_data = universal_api.query_patent_info(data.industry)
patent_dict = patent_data if isinstance(patent_data, dict) else {}
inner_data = patent_dict.get("data", {}) if isinstance(patent_dict.get("data", {}), dict) else {}
data_list = inner_data.get("dataList", [])
@ -148,7 +162,8 @@ async def calculate_valuation(
# 发展潜力D相关参数 专利数量
# 查询匹配申请号的记录集合
matched = [item for item in data_list if isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)]
matched = [item for item in data_list if
isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)]
if matched:
patent_count = calculate_patent_usage_score(len(matched))
input_data_by_b1["patent_count"] = float(patent_count)
@ -163,9 +178,15 @@ async def calculate_valuation(
input_data_by_b2 = await _extract_calculation_params_b2(data)
# 提取 风险调整系数B3 计算参数
input_data_by_b3 = await _extract_calculation_params_b3(data)
# 提取 市场估值C 参数
input_data_by_c = await _extract_calculation_params_c(data)
if infringement_score == 10.0:
input_data_by_b3["lawsuit_status"] = "无诉讼状态"
if 0 < infringement_score < 4.0:
input_data_by_b3["lawsuit_status"] = "已解决诉讼"
else:
input_data_by_b3["lawsuit_status"] = "未解决诉讼"
# 提取 市场估值C 参数
input_data_by_c = await _extract_calculation_params_c(data)
input_data = {
# 模型估值B 相关参数
@ -221,7 +242,8 @@ async def calculate_valuation(
},
'market_data': list(input_data.get('market_data', {}).keys()),
},
drp_result=drp_result
drp_result=drp_result,
status='approved' # 计算成功设置为approved状态
)
# 组装返回
@ -248,6 +270,21 @@ async def calculate_valuation(
import traceback
print(traceback.format_exc())
logger.error("valuation.calc_failed user_id={} err={}", user_id, repr(e))
# 计算失败时也创建记录状态设置为failed
try:
result = await user_valuation_controller.create_valuation(
user_id=user_id,
data=data,
calculation_result=None,
calculation_input=None,
drp_result=None,
status='rejected' # 计算失败设置为rejected状态
)
logger.info("valuation.failed_record_created user_id={} valuation_id={}", user_id, result.id)
except Exception as create_error:
logger.error("valuation.failed_to_create_record user_id={} err={}", user_id, repr(create_error))
raise HTTPException(status_code=400, detail=f"计算失败: {str(e)}")
@ -269,10 +306,6 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
# 普及地域分值 默认 7分
popularity_score = calculate_popularity_score(data.application_coverage)
# 侵权分 默认 6 TODO 需要使用第三方API 无侵权记录(10分),历史侵权已解决(6分),现存纠纷(2分)
infringement_score = calculate_infringement_score("无侵权信息")
infringement_score = 0
# 创新投入比 = (研发费用/营收) * 100
try:
rd_investment = float(data.rd_investment or 0)
@ -281,24 +314,22 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
except (ValueError, TypeError):
innovation_ratio = 0.0
# 流量因子B12相关参数
# 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API
baidu_index = 0
# 获取微信指数并计算近30天平均值
try:
1/0
wechat_index_response = universal_api.wx_index(data.asset_name)
wechat_index = wechat_index_calculator.process_wechat_index_response(wechat_index_response)
logger.info(f"资产 '{data.asset_name}' 的微信指数近30天平均值: {wechat_index}")
except Exception as e:
logger.error(f"获取微信指数失败: {e}")
wechat_index = 0
weibo_index = 0
search_index_s1 = calculate_search_index_s1(baidu_index,wechat_index,weibo_index) # 默认值实际应从API获取
search_index_s1 = calculate_search_index_s1(baidu_index, wechat_index, weibo_index) # 默认值实际应从API获取
# 行业均值S2 - 从数据库查询行业数据计算
from app.utils.industry_calculator import calculate_industry_average_s2
industry_average_s2 = await calculate_industry_average_s2(data.industry)
@ -316,9 +347,10 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
implementation_stage_str = data.application_maturity or "成熟应用"
# 资金支持 - 需要转换为对应的评分
funding_support_str = data.funding_status or "无资助"
# 使用PolicyMultiplierB13Calculator来计算评分
from app.utils.calculation_engine.economic_value_b1.sub_formulas.policy_multiplier_b13 import PolicyMultiplierB13Calculator
from app.utils.calculation_engine.economic_value_b1.sub_formulas.policy_multiplier_b13 import \
PolicyMultiplierB13Calculator
policy_calculator = PolicyMultiplierB13Calculator()
implementation_stage = policy_calculator.calculate_implementation_stage_score(implementation_stage_str)
funding_support = policy_calculator.calculate_funding_support_score(funding_support_str)
@ -327,7 +359,7 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
# 基础价值B11相关参数
'three_year_income': three_year_income,
'popularity_score': popularity_score,
'infringement_score': infringement_score,
'innovation_ratio': innovation_ratio,
# 流量因子B12相关参数
@ -344,9 +376,10 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
# 政策乘数B13相关参数
'implementation_stage': implementation_stage,
'funding_support':funding_support
'funding_support': funding_support
}
# 获取 文化价值B2 相关参数
async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str, Any]:
"""
@ -357,18 +390,19 @@ async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str,
Dict: 计算所需的参数字典
"""
# 导入计算器来转换传承人等级
from app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21 import LivingHeritageB21Calculator
from app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21 import \
LivingHeritageB21Calculator
# 活态传承系数B21 县官参数
living_heritage_calculator = LivingHeritageB21Calculator()
inheritor_level = data.inheritor_level or "市级传承人" # 设置默认值
inheritor_level_coefficient = living_heritage_calculator.calculate_inheritor_level_coefficient(inheritor_level)
offline_sessions = int(data.offline_activities) #线下传习次数
offline_sessions = int(data.offline_activities) # 线下传习次数
# 以下调用API douyin\bilibili\kuaishou
douyin_views = 0
kuaishou_views= 0
bilibili_views= 0
kuaishou_views = 0
bilibili_views = 0
# 跨界合作深度 品牌联名0.3科技载体0.5国家外交礼品1.0
cross_border_depth = float(data.cooperation_depth)
@ -390,16 +424,18 @@ async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str,
"normalized_entropy": normalized_entropy,
}
# 获取 文化价值B2 相关参数
async def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]:
# 过去30天最高价格 过去30天最低价格 TODO 需要根据字样进行切分获取最高价和最低价 转换成 float 类型
highest_price,lowest_price= data.price_fluctuation
lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取)
inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表
price_fluctuation = [float(i) for i in data.price_fluctuation]
highest_price, lowest_price = max(price_fluctuation), min(price_fluctuation)
# lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取)
inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表
return {
"highest_price": highest_price,
"lowest_price": lowest_price,
"lawsuit_status": lawsuit_status,
"inheritor_ages": inheritor_ages,
}
@ -412,10 +448,11 @@ async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str,
# expert_valuations: 专家估值列表(系统配置)
transaction_data: Dict = None
manual_bids: List[float] = None
expert_valuations: List[float] = None
# TODO 需要客户确认 三个数值
expert_valuations = None
# 浏览热度分 TODO 需要先确定平台信息
daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位)
collection_count = 50 # 收藏数(默认占位)
daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位)
collection_count = 50 # 收藏数(默认占位)
# 稀缺性乘数C3 发行量
circulation = data.circulation or '限量'
@ -428,19 +465,20 @@ async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str,
# 计算市场竞价C1
# C1 的实现接受 transaction_data={'weighted_average_price': x}
"weighted_average_price": transaction_data,
"manual_bids": manual_bids, # 手动收集的竞价列表 (用户填写)
"expert_valuations": expert_valuations, # 专家估值列表 (系统配置)
"manual_bids": manual_bids, # 手动收集的竞价列表 (用户填写)
"expert_valuations": expert_valuations, # 专家估值列表 (系统配置)
# 计算热度系数C2
"daily_browse_volume": daily_browse_volume, # 近7日日均浏览量 (API获取)
"collection_count": collection_count, # 收藏数
"issuance_level": circulation, # 默认 限量发行 计算稀缺性乘数C3
"recent_market_activity": recent_market_activity, # 默认 '近一月' 计算市场估值C
"daily_browse_volume": daily_browse_volume, # 近7日日均浏览量 (API获取)
"collection_count": collection_count, # 收藏数
"issuance_level": circulation, # 默认 限量发行 计算稀缺性乘数C3
"recent_market_activity": recent_market_activity, # 默认 '近一月' 计算市场估值C
}
@app_valuations_router.get("/", summary="获取我的估值评估列表")
async def get_my_valuations(
query: UserValuationQuery = Depends(),
current_user: AppUser = Depends(get_current_app_user)
query: UserValuationQuery = Depends(),
current_user: AppUser = Depends(get_current_app_user)
):
"""
@ -471,8 +509,8 @@ async def get_my_valuations(
@app_valuations_router.get("/{valuation_id}", summary="获取估值评估详情")
async def get_valuation_detail(
valuation_id: int,
current_user: AppUser = Depends(get_current_app_user)
valuation_id: int,
current_user: AppUser = Depends(get_current_app_user)
):
"""
获取指定估值评估的详细信息
@ -482,13 +520,13 @@ async def get_valuation_detail(
user_id=current_user.id,
valuation_id=valuation_id
)
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="估值评估记录不存在"
)
# 使用model_dump_json()来正确序列化datetime然后解析为dict
import json
result_dict = json.loads(result.model_dump_json())
@ -504,7 +542,7 @@ async def get_valuation_detail(
@app_valuations_router.get("/statistics/overview", summary="获取我的估值评估统计")
async def get_my_valuation_statistics(
current_user: AppUser = Depends(get_current_app_user)
current_user: AppUser = Depends(get_current_app_user)
):
"""
获取当前用户的估值评估统计信息
@ -518,4 +556,4 @@ async def get_my_valuation_statistics(
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取统计信息失败: {str(e)}"
)
)

View File

@ -2,7 +2,7 @@ from fastapi import APIRouter, Depends, HTTPException, status
from typing import Optional, List, Dict, Any
import json
import asyncio
import time
import time,random
from app.controllers.user_valuation import user_valuation_controller
from app.schemas.valuation import (
@ -28,6 +28,7 @@ from app.models.esg import ESG
from app.models.industry import Industry
from app.models.policy import Policy
from app.utils.universal_api_manager import universal_api
from app.utils.wechat_index_calculator import wechat_index_calculator
app_valuations_router = APIRouter(tags=["用户端估值评估"])
@ -146,7 +147,6 @@ async def calculate_valuation(
data_list = data_list if isinstance(data_list, list) else []
# 验证 专利剩余年限
# TODO 无法验证 专利剩余保护期>10年(10分)5-10年(7分)<5年(3分)
# 发展潜力D相关参数 专利数量
# 查询匹配申请号的记录集合
matched = [item for item in data_list if isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)]
@ -262,7 +262,7 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
# 普及地域分值 默认 7分
popularity_score = calculate_popularity_score(data.application_coverage)
# 侵权分 默认 6 TODO 需要使用第三方API 无侵权记录(10分),历史侵权已解决(6分),现存纠纷(2分)
# 侵权分 默认 6 TODO 需要使用第三方API 无侵权记录(10分),历史侵权已解决(6分),现存纠纷(2分)
infringement_score = calculate_infringement_score("无侵权信息")
infringement_score = 0
@ -277,9 +277,9 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
# 流量因子B12相关参数
# 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API
baidu_index = "暂无"
wechat_index = universal_api.wx_index(data.asset_name) # 通过资产信息获取微信指数 TODO 这里返回的没确认指数参数,有可能返回的图示是指数信息
weibo_index = "暂无"
baidu_index = 0.0
wechat_index = wechat_index_calculator.process_wechat_index_response(universal_api.wx_index(data.asset_name)) # 通过资产信息获取微信指数 TODO 这里返回的没确认指数参数,有可能返回的图示是指数信息
weibo_index = 0.0
search_index_s1 = calculate_search_index_s1(baidu_index,wechat_index,weibo_index) # 默认值实际应从API获取
# 行业均值S2 TODO 系统内置 未找到相关内容
industry_average_s2 = 0.0
@ -367,8 +367,9 @@ async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str,
# 获取 文化价值B2 相关参数
async def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]:
# 过去30天最高价格 过去30天最低价格 TODO 需要根据字样进行切分获取最高价和最低价 转换成 float 类型
highest_price,lowest_price= data.price_fluctuation
# 过去30天最高价格 过去30天最低价格
price_fluctuation = [float(i) for i in data.price_fluctuation ]
highest_price,lowest_price= max(price_fluctuation), min(price_fluctuation)
lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取)
inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表
return {
@ -387,7 +388,7 @@ async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str,
# expert_valuations: 专家估值列表(系统配置)
transaction_data: Dict = None
manual_bids: List[float] = None
expert_valuations: List[float] = None
expert_valuations = [random.uniform(0, 1) for _ in range(3)]
# 浏览热度分 TODO 需要先确定平台信息
daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位)
collection_count = 50 # 收藏数(默认占位)

View File

@ -18,11 +18,11 @@ class UserValuationController:
def __init__(self):
self.model = ValuationAssessment
async def create_valuation(self, user_id: int, data: UserValuationCreate, calculation_result: dict = None, calculation_input: dict = None, drp_result: float = None) -> UserValuationDetail:
async def create_valuation(self, user_id: int, data: UserValuationCreate, calculation_result: dict = None, calculation_input: dict = None, drp_result: float = None, status: str = 'pending') -> UserValuationDetail:
"""用户创建估值评估"""
valuation_data = data.model_dump()
valuation_data['user_id'] = user_id
valuation_data['status'] = 'pending' # 默认状态为待审核
valuation_data['status'] = status # 根据计算结果显示设置状态
# 添加计算结果到数据库
if calculation_result:

View File

@ -111,21 +111,21 @@ class APIConfig:
"path": "/api/douyin/get-video-detail/v2",
"method": "GET",
"description": "该接口用于获取指定抖音视频的详细信息,包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等",
"required_params": ["token", "videoId"],
"required_params": ["videoId"],
"optional_params": []
},
"kuaishou_video_detail": {
"path": "/api/kuaishou/get-video-detail/v2",
"method": "GET",
"description": "该接口用于获取指定快手视频的详细信息,包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等",
"required_params": ["token", "videoId"],
"required_params": ["videoId"],
"optional_params": []
},
"bilibili_video_detail": {
"path": "/api/bilibili/get-video-detail/v2",
"method": "GET",
"description": "该接口用于获取指定B站视频的详细信息包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等",
"required_params": ["token", "bvid"],
"required_params": [ "bvid"],
"optional_params": []
}
}

View File

@ -14,8 +14,10 @@ from typing import Dict, Any, Optional, Union
import json
import time
import urllib.parse
from .api_config import api_config
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@ -23,7 +25,7 @@ logger = logging.getLogger(__name__)
class UniversalAPIManager:
"""通用第三方API管理器"""
def __init__(self):
"""初始化API管理器"""
self.config = api_config
@ -33,36 +35,37 @@ class UniversalAPIManager:
'Accept': 'application/json',
'Content-Type': 'application/json'
})
def _get_provider_config(self, provider: str) -> Dict[str, Any]:
"""获取API提供商配置"""
config = self.config.get_api_config(provider)
if not config:
raise ValueError(f"未找到API提供商配置: {provider}")
return config
def _get_endpoint_config(self, provider: str, endpoint: str) -> Optional[Dict[str, Any]]:
"""获取端点配置"""
provider_config = self.config.get_api_config(provider)
if not provider_config or 'endpoints' not in provider_config:
return None
return provider_config['endpoints'].get(endpoint)
def _get_api_key(self, provider: str, endpoint: str) -> Optional[str]:
"""获取API密钥"""
endpoint_config = self._get_endpoint_config(provider, endpoint)
if not endpoint_config:
return None
# 优先从端点配置中获取APIKey
if 'APIKey' in endpoint_config:
return endpoint_config['APIKey']
# 如果端点配置中没有则从提供商配置中获取api_key
provider_config = self._get_provider_config(provider)
return provider_config.get('api_key')
def make_request(self, provider: str, endpoint: str, params: Dict[str, Any], timeout: Optional[int] = None) -> Dict[str, Any]:
def make_request(self, provider: str, endpoint: str, params: Dict[str, Any], timeout: Optional[int] = None) -> Dict[
str, Any]:
"""
发送API请求
@ -79,21 +82,21 @@ class UniversalAPIManager:
endpoint_config = self._get_endpoint_config(provider, endpoint)
if not endpoint_config:
raise ValueError(f"未找到API配置: {provider}/{endpoint}")
# 获取API密钥
api_key = self._get_api_key(provider, endpoint)
if not api_key:
raise ValueError(f"未找到API密钥: {provider}/{endpoint}")
# 获取基础URL
provider_config = self.config.get_api_config(provider)
base_url = provider_config.get('base_url')
if not base_url:
raise ValueError(f"未找到基础URL: {provider}")
# 构建完整URL
url = f"{base_url.rstrip('/')}{endpoint_config['path']}"
# 添加API密钥到参数中
params['APIKey'] = api_key
print(params)
@ -111,7 +114,7 @@ class UniversalAPIManager:
except requests.exceptions.RequestException as e:
logger.error(f"API请求失败: {str(e)}")
raise
def _generate_chinaz_sign(self, date: datetime = None) -> str:
"""
生成站长之家API签名
@ -124,18 +127,18 @@ class UniversalAPIManager:
"""
if date is None:
date = datetime.now()
# 格式化日期为YYYYMMDD
date_str = date.strftime("%Y%m%d")
# 构建签名字符串
sign_str = f"634xz{date_str}"
# 生成MD5哈希
md5_hash = hashlib.md5(sign_str.encode('utf-8')).hexdigest()
return md5_hash
def _build_url(self, provider: str, endpoint: str, params: Dict[str, Any]) -> str:
"""
构建完整的API请求URL
@ -150,14 +153,14 @@ class UniversalAPIManager:
"""
provider_config = self._get_provider_config(provider)
endpoint_config = self._get_endpoint_config(provider, endpoint)
base_url = provider_config['base_url']
path = endpoint_config['path']
method = endpoint_config['method']
# 构建完整URL
full_url = f"{base_url}{path}"
# 处理参数
if method.upper() == 'GET' and params:
# 对于GET请求将参数添加到URL中
@ -170,13 +173,13 @@ class UniversalAPIManager:
url_params['APIKey'] = params['APIKey']
if 'ChinazVer' in params:
url_params['ChinazVer'] = params['ChinazVer']
if url_params:
query_string = urllib.parse.urlencode(url_params)
full_url = f"{full_url}?{query_string}"
return full_url
def _validate_params(self, provider: str, endpoint: str, params: Dict[str, Any]) -> None:
"""
验证请求参数
@ -191,7 +194,7 @@ class UniversalAPIManager:
"""
endpoint_config = self._get_endpoint_config(provider, endpoint)
required_params = endpoint_config.get('required_params', [])
missing_params = []
for param in required_params:
# 如果参数在端点配置中已经定义如APIKey则跳过验证
@ -199,10 +202,10 @@ class UniversalAPIManager:
continue
if param not in params or params[param] is None:
missing_params.append(param)
if missing_params:
raise ValueError(f"缺少必需参数: {', '.join(missing_params)}")
def _prepare_params(self, provider: str, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""
准备请求参数根据不同的API类型添加必要的参数
@ -218,17 +221,17 @@ class UniversalAPIManager:
provider_config = self._get_provider_config(provider)
endpoint_config = self._get_endpoint_config(provider, endpoint)
prepared_params = params.copy()
# 根据不同的API提供商添加特定参数
if provider == 'chinaz':
# 站长之家API需要签名
if 'sign' not in prepared_params:
prepared_params['sign'] = self._generate_chinaz_sign()
# 添加APIKey参数
if 'APIKey' not in prepared_params and endpoint_config and 'APIKey' in endpoint_config:
prepared_params['APIKey'] = endpoint_config['APIKey']
elif provider == 'xiaohongshu':
# 小红书接口使用 JustOneAPI 平台,需要 token 参数
if 'token' not in prepared_params or not prepared_params['token']:
@ -260,15 +263,14 @@ class UniversalAPIManager:
# 这里不需要修改paramsAuthorization会在make_request中处理
pass
return prepared_params
def make_request(self,
provider: str,
endpoint: str,
params: Dict[str, Any] = None,
timeout: int = 60,
retries: int = 3) -> Dict[str, Any]:
def make_request(self,
provider: str,
endpoint: str,
params: Dict[str, Any] = None,
timeout: int = 60,
retries: int = 3) -> Dict[str, Any]:
"""
发送API请求
@ -283,17 +285,17 @@ class UniversalAPIManager:
Dict[str, Any]: API响应数据
"""
params = params or {}
# 验证参数
self._validate_params(provider, endpoint, params)
# 准备参数
prepared_params = self._prepare_params(provider, endpoint, params)
# 获取端点配置
endpoint_config = self._get_endpoint_config(provider, endpoint)
method = endpoint_config['method'].upper()
# 构建URL
url = self._build_url(provider, endpoint, prepared_params)
# 发送请求
@ -315,16 +317,16 @@ class UniversalAPIManager:
# 对于Dify API需要设置Authorization头
provider_config = self._get_provider_config(provider)
endpoint_config = self._get_endpoint_config(provider, endpoint)
# 获取API密钥
api_key = provider_config.get('api_key', 'app-FJXEWdKv63oq1F4rHb4I8kvE')
# 设置请求头
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
response = self.session.post(url, json=prepared_params, headers=headers, timeout=timeout)
elif provider == 'dajiala':
# dajiala是微信指数/极致聊接口使用普通POST请求
@ -333,19 +335,19 @@ class UniversalAPIManager:
response = self.session.post(url, json=prepared_params, timeout=timeout)
else:
raise ValueError(f"不支持的HTTP方法: {method}")
# 检查响应状态
response.raise_for_status()
# 解析响应
try:
result = response.json()
except json.JSONDecodeError:
result = {'raw_response': response.text}
logger.info(f"API请求成功: {provider}.{endpoint}")
return result
except requests.exceptions.RequestException as e:
logger.warning(f"API请求失败 (尝试 {attempt + 1}/{retries + 1}): {e}")
if attempt == retries:
@ -358,7 +360,7 @@ class UniversalAPIManager:
}
time.sleep(2 ** attempt) # 指数退避
def wx_index(self,keyword):
def wx_index(self, keyword):
"""
微信指数
"""
@ -372,7 +374,7 @@ class UniversalAPIManager:
return self.make_request('dajiala', 'web_search', data)
def video_views(self,platform_type,video_id):
def video_views(self, platform_type, video_id):
"""
平台视频播放量
args:
@ -389,7 +391,6 @@ class UniversalAPIManager:
return self.make_request('justoneapi', 'platform_type', params)
# 站长之家API的便捷方法
def query_copyright_software(self, company_name: str, chinaz_ver: str = "1") -> Dict[str, Any]:
"""查询企业软件著作权"""
@ -420,7 +421,7 @@ class UniversalAPIManager:
'ChinazVer': chinaz_ver
}
return self.make_request('chinaz', 'judgement', params)
# 小红书便捷方法
def get_xiaohongshu_note_detail(self, note_id: str) -> Dict[str, Any]:
"""
@ -433,18 +434,18 @@ class UniversalAPIManager:
Dict[str, Any]: 笔记详情数据
"""
params = {'noteId': note_id}
return self.make_request('xiaohongshu', 'xiaohongshu_note_detail', params)
# 极致聊便捷方法
def search_jizhiliao_index(
self,
keyword: str,
mode: int = 2,
business_type: int = 8192,
sub_search_type: int = 0,
key: Optional[str] = None,
verifycode: Optional[str] = None,
self,
keyword: str,
mode: int = 2,
business_type: int = 8192,
sub_search_type: int = 0,
key: Optional[str] = None,
verifycode: Optional[str] = None,
) -> Dict[str, Any]:
"""执行极致聊指数搜索"""
params: Dict[str, Any] = {
@ -460,31 +461,40 @@ class UniversalAPIManager:
return self.make_request('jizhiliao', 'index_search', params)
def douyin_video_detail(self, video_id: str) -> Dict[str, Any]:
"""抖音 视频详情接口"""
params = {
'videoId': video_id,
}
return self.make_request('justoneapi', 'douyin_video_detail', params)
# 通用方法
def list_providers(self) -> list:
"""列出所有可用的API提供商"""
return self.config.list_providers()
def list_endpoints(self, provider: str) -> list:
"""列出指定提供商的所有端点"""
return self.config.list_endpoints(provider)
def get_endpoint_info(self, provider: str, endpoint: str) -> Dict[str, Any]:
"""获取端点详细信息"""
return self._get_endpoint_config(provider, endpoint)
def set_api_key(self, provider: str, api_key: str) -> None:
"""设置API密钥"""
self.config.set_api_key(provider, api_key)
def add_endpoint(self,
provider: str,
endpoint_name: str,
path: str,
method: str = 'GET',
description: str = '',
required_params: list = None,
optional_params: list = None) -> None:
def add_endpoint(self,
provider: str,
endpoint_name: str,
path: str,
method: str = 'GET',
description: str = '',
required_params: list = None,
optional_params: list = None) -> None:
"""添加新的API端点"""
self.config.add_endpoint(
provider=provider,
@ -498,4 +508,6 @@ class UniversalAPIManager:
# 创建全局实例
universal_api = UniversalAPIManager()
universal_api = UniversalAPIManager()