93 lines
3.1 KiB
Python
93 lines
3.1 KiB
Python
import logging
|
|
from typing import List
|
|
|
|
from fastapi import APIRouter, Query, Body, HTTPException
|
|
|
|
from app.controllers.third_party_api import third_party_api_controller
|
|
from app.schemas.base import Success, Fail
|
|
from app.schemas.third_party_api import (
|
|
BaseAPIRequest,
|
|
ChinazAPIRequest,
|
|
XiaohongshuNoteRequest,
|
|
APIResponse,
|
|
APIProviderInfo,
|
|
APIEndpointInfo,
|
|
APIListResponse
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(tags=["内置接口"])
|
|
|
|
# 站长之家API端点
|
|
|
|
@router.post("/chinaz/copyright_software", summary="企业软件著作权查询")
|
|
async def query_copyright_software(request: ChinazAPIRequest):
|
|
"""查询企业软件著作权信息"""
|
|
try:
|
|
result = await third_party_api_controller.query_copyright_software(
|
|
company_name=request.company_name,
|
|
chinaz_ver=request.chinaz_ver
|
|
)
|
|
|
|
if result.success:
|
|
return Success(data=result.data, message=result.message)
|
|
else:
|
|
return Fail(message=result.message)
|
|
except Exception as e:
|
|
logger.error(f"查询企业软件著作权失败: {e}")
|
|
return Fail(message=f"查询企业软件著作权失败: {str(e)}")
|
|
|
|
|
|
@router.post("/chinaz/patent_info", summary="企业专利信息查询")
|
|
async def query_patent_info(request: ChinazAPIRequest):
|
|
"""查询企业专利信息"""
|
|
try:
|
|
result = await third_party_api_controller.query_patent_info(
|
|
company_name=request.company_name,
|
|
chinaz_ver=request.chinaz_ver
|
|
)
|
|
|
|
if result.success:
|
|
return Success(data=result.data, message=result.message)
|
|
else:
|
|
return Fail(message=result.message)
|
|
except Exception as e:
|
|
logger.error(f"查询企业专利信息失败: {e}")
|
|
return Fail(message=f"查询企业专利信息失败: {str(e)}")
|
|
|
|
|
|
@router.post("/chinaz/judicial_data", summary="司法综合数据查询")
|
|
async def query_judicial_data(request: ChinazAPIRequest):
|
|
"""查询司法综合数据"""
|
|
try:
|
|
result = await third_party_api_controller.query_judicial_data(
|
|
company_name=request.company_name,
|
|
chinaz_ver=request.chinaz_ver
|
|
)
|
|
|
|
if result.success:
|
|
return Success(data=result.data, message=result.message)
|
|
else:
|
|
return Fail(message=result.message)
|
|
except Exception as e:
|
|
logger.error(f"查询司法综合数据失败: {e}")
|
|
return Fail(message=f"查询司法综合数据失败: {str(e)}")
|
|
|
|
|
|
# 小红书API端点
|
|
@router.post("/xiaohongshu/note", summary="获取小红书笔记详情")
|
|
async def get_xiaohongshu_note(request: XiaohongshuNoteRequest):
|
|
"""获取小红书笔记详情"""
|
|
try:
|
|
result = await third_party_api_controller.get_xiaohongshu_note(
|
|
note_id=request.note_id
|
|
)
|
|
|
|
if result.success:
|
|
return Success(data=result.data, message=result.message)
|
|
else:
|
|
return Fail(message=result.message)
|
|
except Exception as e:
|
|
logger.error(f"获取小红书笔记失败: {e}")
|
|
return Fail(message=f"获取小红书笔记失败: {str(e)}") |