guzhi/app/api/v1/third_party_api/third_party_api.py
2025-10-05 15:27:12 +08:00

114 lines
4.0 KiB
Python

import logging
from typing import List
from fastapi import APIRouter, Query, Body, HTTPException
from app.controllers.third_party_api import third_party_api_controller
from app.schemas.base import Success, Fail
from app.schemas.third_party_api import (
BaseAPIRequest,
ChinazAPIRequest,
XiaohongshuNoteRequest,
JizhiliaoSearchRequest,
APIResponse,
APIProviderInfo,
APIEndpointInfo,
APIListResponse
)
logger = logging.getLogger(__name__)
router = APIRouter(tags=["内置接口"])
# 站长之家API端点
@router.post("/chinaz/copyright_software", summary="企业软件著作权查询")
async def query_copyright_software(request: ChinazAPIRequest):
"""查询企业软件著作权信息"""
try:
result = await third_party_api_controller.query_copyright_software(
company_name=request.company_name,
chinaz_ver=request.chinaz_ver
)
if result.success:
return Success(data=result.data, message=result.message)
else:
return Fail(message=result.message)
except Exception as e:
logger.error(f"查询企业软件著作权失败: {e}")
return Fail(message=f"查询企业软件著作权失败: {str(e)}")
@router.post("/chinaz/patent_info", summary="企业专利信息查询")
async def query_patent_info(request: ChinazAPIRequest):
"""查询企业专利信息"""
try:
result = await third_party_api_controller.query_patent_info(
company_name=request.company_name,
chinaz_ver=request.chinaz_ver
)
if result.success:
return Success(data=result.data, message=result.message)
else:
return Fail(message=result.message)
except Exception as e:
logger.error(f"查询企业专利信息失败: {e}")
return Fail(message=f"查询企业专利信息失败: {str(e)}")
@router.post("/chinaz/judicial_data", summary="司法综合数据查询")
async def query_judicial_data(request: ChinazAPIRequest):
"""查询司法综合数据"""
try:
result = await third_party_api_controller.query_judicial_data(
company_name=request.company_name,
chinaz_ver=request.chinaz_ver
)
if result.success:
return Success(data=result.data, message=result.message)
else:
return Fail(message=result.message)
except Exception as e:
logger.error(f"查询司法综合数据失败: {e}")
return Fail(message=f"查询司法综合数据失败: {str(e)}")
# 小红书API端点
@router.post("/xiaohongshu/note", summary="获取小红书笔记详情")
async def get_xiaohongshu_note(request: XiaohongshuNoteRequest):
"""获取小红书笔记详情"""
try:
result = await third_party_api_controller.get_xiaohongshu_note(
note_id=request.note_id
)
if result.success:
return Success(data=result.data, message=result.message)
else:
return Fail(message=result.message)
except Exception as e:
logger.error(f"获取小红书笔记失败: {e}")
return Fail(message=f"获取小红书笔记失败: {str(e)}")
@router.post("/jizhiliao/index_search", summary="极致聊指数搜索")
async def jizhiliao_index_search(request: JizhiliaoSearchRequest):
"""调用极致聊指数搜索接口"""
try:
params = request.model_dump(by_alias=True, exclude_none=True, exclude={"timeout"})
timeout = request.timeout if request.timeout is not None else 30
result = await third_party_api_controller.search_jizhiliao_index(
params=params,
timeout=timeout
)
if result.success:
return Success(data=result.data, message=result.message)
else:
return Fail(message=result.message)
except Exception as e:
logger.error(f"极致聊指数搜索失败: {e}")
return Fail(message=f"极致聊指数搜索失败: {str(e)}")