guzhi/app/utils/universal_api_manager.py
邹方成 107e90cbcb feat(third_party_api): 添加Dify工作流支持并优化API管理
重构UniversalAPIManager以支持Dify工作流API
添加DifyWorkflowRequest模型和控制器方法
优化API密钥管理和请求处理逻辑
修改JWT验证方式从HTTPBearer到Header
更新API配置以支持新的Dify端点
2025-10-10 13:04:10 +08:00

496 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
通用第三方API管理器
支持多种不同类型的第三方API包括需要签名和不需要签名的API
"""
import os
import hashlib
import requests
import logging
from datetime import datetime
from typing import Dict, Any, Optional, Union
import json
import time
import urllib.parse
from .api_config import api_config
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class UniversalAPIManager:
"""通用第三方API管理器"""
def __init__(self):
"""初始化API管理器"""
self.config = api_config
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Universal-API-Manager/1.0',
'Accept': 'application/json',
'Content-Type': 'application/json'
})
def _get_provider_config(self, provider: str) -> Dict[str, Any]:
"""获取API提供商配置"""
config = self.config.get_api_config(provider)
if not config:
raise ValueError(f"未找到API提供商配置: {provider}")
return config
def _get_endpoint_config(self, provider: str, endpoint: str) -> Optional[Dict[str, Any]]:
"""获取端点配置"""
provider_config = self.config.get_api_config(provider)
if not provider_config or 'endpoints' not in provider_config:
return None
return provider_config['endpoints'].get(endpoint)
def _get_api_key(self, provider: str, endpoint: str) -> Optional[str]:
"""获取API密钥"""
endpoint_config = self._get_endpoint_config(provider, endpoint)
if not endpoint_config:
return None
# 优先从端点配置中获取APIKey
if 'APIKey' in endpoint_config:
return endpoint_config['APIKey']
# 如果端点配置中没有则从提供商配置中获取api_key
provider_config = self._get_provider_config(provider)
return provider_config.get('api_key')
def make_request(self, provider: str, endpoint: str, params: Dict[str, Any], timeout: Optional[int] = None) -> Dict[str, Any]:
"""
发送API请求
Args:
provider: API提供商
endpoint: API端点
params: 请求参数
timeout: 超时时间(秒)
Returns:
Dict[str, Any]: API响应
"""
# 获取API配置
endpoint_config = self._get_endpoint_config(provider, endpoint)
if not endpoint_config:
raise ValueError(f"未找到API配置: {provider}/{endpoint}")
# 获取API密钥
api_key = self._get_api_key(provider, endpoint)
if not api_key:
raise ValueError(f"未找到API密钥: {provider}/{endpoint}")
# 获取基础URL
provider_config = self.config.get_api_config(provider)
base_url = provider_config.get('base_url')
if not base_url:
raise ValueError(f"未找到基础URL: {provider}")
# 构建完整URL
url = f"{base_url.rstrip('/')}{endpoint_config['path']}"
# 添加API密钥到参数中
params['APIKey'] = api_key
print(params)
# 发送请求
try:
response = self.session.request(
method=endpoint_config['method'],
url=url,
json=params if endpoint_config['method'] == 'POST' else None,
params=params if endpoint_config['method'] == 'GET' else None,
timeout=timeout or provider_config.get('timeout', 30)
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"API请求失败: {str(e)}")
raise
def _generate_chinaz_sign(self, date: datetime = None) -> str:
"""
生成站长之家API签名
Args:
date: 指定日期,默认为当前日期
Returns:
str: 32位MD5签名
"""
if date is None:
date = datetime.now()
# 格式化日期为YYYYMMDD
date_str = date.strftime("%Y%m%d")
# 构建签名字符串
sign_str = f"634xz{date_str}"
# 生成MD5哈希
md5_hash = hashlib.md5(sign_str.encode('utf-8')).hexdigest()
return md5_hash
def _build_url(self, provider: str, endpoint: str, params: Dict[str, Any]) -> str:
"""
构建完整的API请求URL
Args:
provider: API提供商
endpoint: 端点名称
params: 请求参数
Returns:
str: 完整的URL
"""
provider_config = self._get_provider_config(provider)
endpoint_config = self._get_endpoint_config(provider, endpoint)
base_url = provider_config['base_url']
path = endpoint_config['path']
method = endpoint_config['method']
# 构建完整URL
full_url = f"{base_url}{path}"
# 处理参数
if method.upper() == 'GET' and params:
# 对于GET请求将参数添加到URL中
query_string = urllib.parse.urlencode(params)
full_url = f"{full_url}?{query_string}"
elif provider == 'chinaz' and method.upper() == 'POST':
# 对于Chinaz POST请求只将APIKey和ChinazVer添加到URL中
url_params = {}
if 'APIKey' in params:
url_params['APIKey'] = params['APIKey']
if 'ChinazVer' in params:
url_params['ChinazVer'] = params['ChinazVer']
if url_params:
query_string = urllib.parse.urlencode(url_params)
full_url = f"{full_url}?{query_string}"
return full_url
def _validate_params(self, provider: str, endpoint: str, params: Dict[str, Any]) -> None:
"""
验证请求参数
Args:
provider: API提供商
endpoint: 端点名称
params: 请求参数
Raises:
ValueError: 缺少必需参数时抛出异常
"""
endpoint_config = self._get_endpoint_config(provider, endpoint)
required_params = endpoint_config.get('required_params', [])
missing_params = []
for param in required_params:
# 如果参数在端点配置中已经定义如APIKey则跳过验证
if param in endpoint_config:
continue
if param not in params or params[param] is None:
missing_params.append(param)
if missing_params:
raise ValueError(f"缺少必需参数: {', '.join(missing_params)}")
def _prepare_params(self, provider: str, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""
准备请求参数根据不同的API类型添加必要的参数
Args:
provider: API提供商
endpoint: 端点名称
params: 原始参数
Returns:
Dict[str, Any]: 处理后的参数
"""
provider_config = self._get_provider_config(provider)
endpoint_config = self._get_endpoint_config(provider, endpoint)
prepared_params = params.copy()
# 根据不同的API提供商添加特定参数
if provider == 'chinaz':
# 站长之家API需要签名
if 'sign' not in prepared_params:
prepared_params['sign'] = self._generate_chinaz_sign()
# 添加APIKey参数
if 'APIKey' not in prepared_params and endpoint_config and 'APIKey' in endpoint_config:
prepared_params['APIKey'] = endpoint_config['APIKey']
elif provider == 'xiaohongshu':
# 小红书接口使用 JustOneAPI 平台,需要 token 参数
if 'token' not in prepared_params or not prepared_params['token']:
api_key = provider_config.get('api_key')
if api_key:
prepared_params['token'] = api_key
elif provider == 'dajiala':
# 微信指数
# JustOneAPI需要token参数
api_key = provider_config.get('api_key')
if api_key:
prepared_params['key'] = api_key
elif provider == 'jizhiliao':
# 极致聊接口需要 key默认从配置读取
if 'key' not in prepared_params or not prepared_params['key']:
api_key = provider_config.get('api_key')
if api_key:
prepared_params['key'] = api_key
if 'verifycode' not in prepared_params:
default_verifycode = provider_config.get('verifycode')
if default_verifycode is not None:
prepared_params['verifycode'] = default_verifycode
elif provider == 'dify':
# Dify API需要在请求头中设置Authorization
# 这里不需要修改paramsAuthorization会在make_request中处理
pass
return prepared_params
def make_request(self,
provider: str,
endpoint: str,
params: Dict[str, Any] = None,
timeout: int = 60,
retries: int = 3) -> Dict[str, Any]:
"""
发送API请求
Args:
provider: API提供商名称
endpoint: 端点名称
params: 请求参数
timeout: 超时时间(秒)
retries: 重试次数
Returns:
Dict[str, Any]: API响应数据
"""
params = params or {}
# 验证参数
self._validate_params(provider, endpoint, params)
# 准备参数
prepared_params = self._prepare_params(provider, endpoint, params)
# 获取端点配置
endpoint_config = self._get_endpoint_config(provider, endpoint)
method = endpoint_config['method'].upper()
# 构建URL
url = self._build_url(provider, endpoint, prepared_params)
# 发送请求
for attempt in range(retries + 1):
try:
if method == 'GET':
response = self.session.get(url, timeout=timeout)
elif method == 'POST':
if provider == 'chinaz':
# 对于Chinaz API从prepared_params中移除URL参数只发送body参数
body_params = prepared_params.copy()
body_params.pop('APIKey', None)
body_params.pop('ChinazVer', None)
# Chinaz API使用application/x-www-form-urlencoded格式
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
response = self.session.post(url, data=body_params, headers=headers, timeout=timeout)
print(response.json())
elif provider == 'dify':
# 对于Dify API需要设置Authorization头
provider_config = self._get_provider_config(provider)
endpoint_config = self._get_endpoint_config(provider, endpoint)
# 获取API密钥
api_key = provider_config.get('api_key', 'app-FJXEWdKv63oq1F4rHb4I8kvE')
# 设置请求头
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
response = self.session.post(url, json=prepared_params, headers=headers, timeout=timeout)
else:
response = self.session.post(url, json=prepared_params, timeout=timeout)
else:
raise ValueError(f"不支持的HTTP方法: {method}")
# 检查响应状态
response.raise_for_status()
# 解析响应
try:
result = response.json()
except json.JSONDecodeError:
result = {'raw_response': response.text}
logger.info(f"API请求成功: {provider}.{endpoint}")
return result
except requests.exceptions.RequestException as e:
logger.warning(f"API请求失败 (尝试 {attempt + 1}/{retries + 1}): {e}")
if attempt == retries:
logger.error(f"API请求最终失败: {provider}.{endpoint}")
return {
'error': True,
'message': f"API请求失败: {str(e)}",
'provider': provider,
'endpoint': endpoint
}
time.sleep(2 ** attempt) # 指数退避
def wx_index(self,keyword):
"""
微信指数
"""
data = {
"keyword": keyword,
"mode": 2,
"BusinessType": 8192,
"sub_search_type": 0,
"verifycode": ""
}
return self.make_request('dajiala', 'web_search', data)
def video_views(self,platform_type,video_id):
"""
平台视频播放量
args:
platform_type: douyin\bilibili\kuaishou
"""
if platform_type == 'bilibili':
params = {
"bvid": video_id
}
else:
params = {
"videoId": video_id
}
return self.make_request('justoneapi', 'platform_type', params)
# 站长之家API的便捷方法
def query_copyright_software(self, company_name: str, chinaz_ver: str = "1") -> Dict[str, Any]:
"""查询企业软件著作权"""
params = {
'entName': company_name,
'pageNo': '1',
'range': '100',
'ChinazVer': chinaz_ver
}
return self.make_request('chinaz', 'copyright_software', params)
def query_patent_info(self, company_name: str, chinaz_ver: str = "1") -> Dict[str, Any]:
"""查询企业专利信息"""
params = {
'searchKey': company_name,
'pageNo': '1',
'range': '100',
'searchType': '1',
'ChinazVer': chinaz_ver
}
return self.make_request('chinaz', 'patent', params)
def query_judicial_data(self, company_name: str, chinaz_ver: str = "1") -> Dict[str, Any]:
"""查询司法综合数据"""
params = {
'q': company_name,
'pageNo': '1',
'ChinazVer': chinaz_ver
}
return self.make_request('chinaz', 'judgement', params)
# 小红书便捷方法
def get_xiaohongshu_note_detail(self, note_id: str) -> Dict[str, Any]:
"""
获取小红书笔记详情
Args:
note_id: 笔记ID
Returns:
Dict[str, Any]: 笔记详情数据
"""
params = {'noteId': note_id}
return self.make_request('xiaohongshu', 'xiaohongshu_note_detail', params)
# 极致聊便捷方法
def search_jizhiliao_index(
self,
keyword: str,
mode: int = 2,
business_type: int = 8192,
sub_search_type: int = 0,
key: Optional[str] = None,
verifycode: Optional[str] = None,
) -> Dict[str, Any]:
"""执行极致聊指数搜索"""
params: Dict[str, Any] = {
'keyword': keyword,
'mode': mode,
'BusinessType': business_type,
'sub_search_type': sub_search_type,
}
if key is not None:
params['key'] = key
if verifycode is not None:
params['verifycode'] = verifycode
return self.make_request('jizhiliao', 'index_search', params)
# 通用方法
def list_providers(self) -> list:
"""列出所有可用的API提供商"""
return self.config.list_providers()
def list_endpoints(self, provider: str) -> list:
"""列出指定提供商的所有端点"""
return self.config.list_endpoints(provider)
def get_endpoint_info(self, provider: str, endpoint: str) -> Dict[str, Any]:
"""获取端点详细信息"""
return self._get_endpoint_config(provider, endpoint)
def set_api_key(self, provider: str, api_key: str) -> None:
"""设置API密钥"""
self.config.set_api_key(provider, api_key)
def add_endpoint(self,
provider: str,
endpoint_name: str,
path: str,
method: str = 'GET',
description: str = '',
required_params: list = None,
optional_params: list = None) -> None:
"""添加新的API端点"""
self.config.add_endpoint(
provider=provider,
endpoint_name=endpoint_name,
path=path,
method=method,
description=description,
required_params=required_params,
optional_params=optional_params
)
# 创建全局实例
universal_api = UniversalAPIManager()