Compare commits

...

2 Commits

Author SHA1 Message Date
dby
967e0e429e Merge branch 'main' of https://git.1024tool.vip/zfc/guzhi 2025-10-10 21:55:06 +08:00
dby
a6d7d350e0 更新第三方接口数据和状态 2025-10-10 21:54:47 +08:00
5 changed files with 186 additions and 135 deletions

View File

@ -1,3 +1,5 @@
from random import random
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from typing import Optional, List, Dict, Any from typing import Optional, List, Dict, Any
import json import json
@ -100,7 +102,8 @@ async def calculate_valuation(
""" """
try: try:
start_ts = time.monotonic() start_ts = time.monotonic()
logger.info("valuation.calc_start user_id={} asset_name={} industry={}", user_id, getattr(data, 'asset_name', None), getattr(data, 'industry', None)) logger.info("valuation.calc_start user_id={} asset_name={} industry={}", user_id,
getattr(data, 'asset_name', None), getattr(data, 'industry', None))
# 根据行业查询 ESG 基准分(优先用行业名称匹配,如用的是行业代码就把 name 改成 code # 根据行业查询 ESG 基准分(优先用行业名称匹配,如用的是行业代码就把 name 改成 code
esg_obj = None esg_obj = None
@ -135,10 +138,21 @@ async def calculate_valuation(
# 政策匹配度 # 政策匹配度
input_data_by_b1["policy_match_score"] = policy_match_score input_data_by_b1["policy_match_score"] = policy_match_score
# 侵权分 默认 6
try:
judicial_data = universal_api.query_judicial_data(data.institution)
_data = judicial_data["data"]["count"]["ktggCount"]
if _data > 0:
infringement_score = 4.0
else:
infringement_score = 10.0
except:
infringement_score = 0.0
input_data_by_b1["infringement_score"] = infringement_score
# 获取专利信息 TODO 参数 # 获取专利信息 TODO 参数
try: try:
patent_data = universal_api.query_patent_info("未找到 企业名称、企业统代、企业注册号") patent_data = universal_api.query_patent_info(data.industry)
patent_dict = patent_data if isinstance(patent_data, dict) else {} patent_dict = patent_data if isinstance(patent_data, dict) else {}
inner_data = patent_dict.get("data", {}) if isinstance(patent_dict.get("data", {}), dict) else {} inner_data = patent_dict.get("data", {}) if isinstance(patent_dict.get("data", {}), dict) else {}
data_list = inner_data.get("dataList", []) data_list = inner_data.get("dataList", [])
@ -148,7 +162,8 @@ async def calculate_valuation(
# 发展潜力D相关参数 专利数量 # 发展潜力D相关参数 专利数量
# 查询匹配申请号的记录集合 # 查询匹配申请号的记录集合
matched = [item for item in data_list if isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)] matched = [item for item in data_list if
isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)]
if matched: if matched:
patent_count = calculate_patent_usage_score(len(matched)) patent_count = calculate_patent_usage_score(len(matched))
input_data_by_b1["patent_count"] = float(patent_count) input_data_by_b1["patent_count"] = float(patent_count)
@ -163,10 +178,16 @@ async def calculate_valuation(
input_data_by_b2 = await _extract_calculation_params_b2(data) input_data_by_b2 = await _extract_calculation_params_b2(data)
# 提取 风险调整系数B3 计算参数 # 提取 风险调整系数B3 计算参数
input_data_by_b3 = await _extract_calculation_params_b3(data) input_data_by_b3 = await _extract_calculation_params_b3(data)
if infringement_score == 10.0:
input_data_by_b3["lawsuit_status"] = "无诉讼状态"
if 0 < infringement_score < 4.0:
input_data_by_b3["lawsuit_status"] = "已解决诉讼"
else:
input_data_by_b3["lawsuit_status"] = "未解决诉讼"
# 提取 市场估值C 参数 # 提取 市场估值C 参数
input_data_by_c = await _extract_calculation_params_c(data) input_data_by_c = await _extract_calculation_params_c(data)
input_data = { input_data = {
# 模型估值B 相关参数 # 模型估值B 相关参数
"model_data": { "model_data": {
@ -221,7 +242,8 @@ async def calculate_valuation(
}, },
'market_data': list(input_data.get('market_data', {}).keys()), 'market_data': list(input_data.get('market_data', {}).keys()),
}, },
drp_result=drp_result drp_result=drp_result,
status='approved' # 计算成功设置为approved状态
) )
# 组装返回 # 组装返回
@ -248,6 +270,21 @@ async def calculate_valuation(
import traceback import traceback
print(traceback.format_exc()) print(traceback.format_exc())
logger.error("valuation.calc_failed user_id={} err={}", user_id, repr(e)) logger.error("valuation.calc_failed user_id={} err={}", user_id, repr(e))
# 计算失败时也创建记录状态设置为failed
try:
result = await user_valuation_controller.create_valuation(
user_id=user_id,
data=data,
calculation_result=None,
calculation_input=None,
drp_result=None,
status='rejected' # 计算失败设置为rejected状态
)
logger.info("valuation.failed_record_created user_id={} valuation_id={}", user_id, result.id)
except Exception as create_error:
logger.error("valuation.failed_to_create_record user_id={} err={}", user_id, repr(create_error))
raise HTTPException(status_code=400, detail=f"计算失败: {str(e)}") raise HTTPException(status_code=400, detail=f"计算失败: {str(e)}")
@ -269,10 +306,6 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
# 普及地域分值 默认 7分 # 普及地域分值 默认 7分
popularity_score = calculate_popularity_score(data.application_coverage) popularity_score = calculate_popularity_score(data.application_coverage)
# 侵权分 默认 6 TODO 需要使用第三方API 无侵权记录(10分),历史侵权已解决(6分),现存纠纷(2分)
infringement_score = calculate_infringement_score("无侵权信息")
infringement_score = 0
# 创新投入比 = (研发费用/营收) * 100 # 创新投入比 = (研发费用/营收) * 100
try: try:
rd_investment = float(data.rd_investment or 0) rd_investment = float(data.rd_investment or 0)
@ -281,14 +314,12 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
except (ValueError, TypeError): except (ValueError, TypeError):
innovation_ratio = 0.0 innovation_ratio = 0.0
# 流量因子B12相关参数 # 流量因子B12相关参数
# 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API # 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API
baidu_index = 0 baidu_index = 0
# 获取微信指数并计算近30天平均值 # 获取微信指数并计算近30天平均值
try: try:
1/0
wechat_index_response = universal_api.wx_index(data.asset_name) wechat_index_response = universal_api.wx_index(data.asset_name)
wechat_index = wechat_index_calculator.process_wechat_index_response(wechat_index_response) wechat_index = wechat_index_calculator.process_wechat_index_response(wechat_index_response)
logger.info(f"资产 '{data.asset_name}' 的微信指数近30天平均值: {wechat_index}") logger.info(f"资产 '{data.asset_name}' 的微信指数近30天平均值: {wechat_index}")
@ -318,7 +349,8 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
funding_support_str = data.funding_status or "无资助" funding_support_str = data.funding_status or "无资助"
# 使用PolicyMultiplierB13Calculator来计算评分 # 使用PolicyMultiplierB13Calculator来计算评分
from app.utils.calculation_engine.economic_value_b1.sub_formulas.policy_multiplier_b13 import PolicyMultiplierB13Calculator from app.utils.calculation_engine.economic_value_b1.sub_formulas.policy_multiplier_b13 import \
PolicyMultiplierB13Calculator
policy_calculator = PolicyMultiplierB13Calculator() policy_calculator = PolicyMultiplierB13Calculator()
implementation_stage = policy_calculator.calculate_implementation_stage_score(implementation_stage_str) implementation_stage = policy_calculator.calculate_implementation_stage_score(implementation_stage_str)
funding_support = policy_calculator.calculate_funding_support_score(funding_support_str) funding_support = policy_calculator.calculate_funding_support_score(funding_support_str)
@ -327,7 +359,7 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
# 基础价值B11相关参数 # 基础价值B11相关参数
'three_year_income': three_year_income, 'three_year_income': three_year_income,
'popularity_score': popularity_score, 'popularity_score': popularity_score,
'infringement_score': infringement_score,
'innovation_ratio': innovation_ratio, 'innovation_ratio': innovation_ratio,
# 流量因子B12相关参数 # 流量因子B12相关参数
@ -347,6 +379,7 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
'funding_support': funding_support 'funding_support': funding_support
} }
# 获取 文化价值B2 相关参数 # 获取 文化价值B2 相关参数
async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str, Any]: async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str, Any]:
""" """
@ -357,7 +390,8 @@ async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str,
Dict: 计算所需的参数字典 Dict: 计算所需的参数字典
""" """
# 导入计算器来转换传承人等级 # 导入计算器来转换传承人等级
from app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21 import LivingHeritageB21Calculator from app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21 import \
LivingHeritageB21Calculator
# 活态传承系数B21 县官参数 # 活态传承系数B21 县官参数
living_heritage_calculator = LivingHeritageB21Calculator() living_heritage_calculator = LivingHeritageB21Calculator()
@ -390,16 +424,18 @@ async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str,
"normalized_entropy": normalized_entropy, "normalized_entropy": normalized_entropy,
} }
# 获取 文化价值B2 相关参数 # 获取 文化价值B2 相关参数
async def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]: async def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]:
# 过去30天最高价格 过去30天最低价格 TODO 需要根据字样进行切分获取最高价和最低价 转换成 float 类型 # 过去30天最高价格 过去30天最低价格 TODO 需要根据字样进行切分获取最高价和最低价 转换成 float 类型
highest_price,lowest_price= data.price_fluctuation price_fluctuation = [float(i) for i in data.price_fluctuation]
lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取) highest_price, lowest_price = max(price_fluctuation), min(price_fluctuation)
# lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取)
inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表 inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表
return { return {
"highest_price": highest_price, "highest_price": highest_price,
"lowest_price": lowest_price, "lowest_price": lowest_price,
"lawsuit_status": lawsuit_status,
"inheritor_ages": inheritor_ages, "inheritor_ages": inheritor_ages,
} }
@ -412,7 +448,8 @@ async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str,
# expert_valuations: 专家估值列表(系统配置) # expert_valuations: 专家估值列表(系统配置)
transaction_data: Dict = None transaction_data: Dict = None
manual_bids: List[float] = None manual_bids: List[float] = None
expert_valuations: List[float] = None # TODO 需要客户确认 三个数值
expert_valuations = None
# 浏览热度分 TODO 需要先确定平台信息 # 浏览热度分 TODO 需要先确定平台信息
daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位) daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位)
collection_count = 50 # 收藏数(默认占位) collection_count = 50 # 收藏数(默认占位)
@ -437,6 +474,7 @@ async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str,
"recent_market_activity": recent_market_activity, # 默认 '近一月' 计算市场估值C "recent_market_activity": recent_market_activity, # 默认 '近一月' 计算市场估值C
} }
@app_valuations_router.get("/", summary="获取我的估值评估列表") @app_valuations_router.get("/", summary="获取我的估值评估列表")
async def get_my_valuations( async def get_my_valuations(
query: UserValuationQuery = Depends(), query: UserValuationQuery = Depends(),

View File

@ -2,7 +2,7 @@ from fastapi import APIRouter, Depends, HTTPException, status
from typing import Optional, List, Dict, Any from typing import Optional, List, Dict, Any
import json import json
import asyncio import asyncio
import time import time,random
from app.controllers.user_valuation import user_valuation_controller from app.controllers.user_valuation import user_valuation_controller
from app.schemas.valuation import ( from app.schemas.valuation import (
@ -28,6 +28,7 @@ from app.models.esg import ESG
from app.models.industry import Industry from app.models.industry import Industry
from app.models.policy import Policy from app.models.policy import Policy
from app.utils.universal_api_manager import universal_api from app.utils.universal_api_manager import universal_api
from app.utils.wechat_index_calculator import wechat_index_calculator
app_valuations_router = APIRouter(tags=["用户端估值评估"]) app_valuations_router = APIRouter(tags=["用户端估值评估"])
@ -146,7 +147,6 @@ async def calculate_valuation(
data_list = data_list if isinstance(data_list, list) else [] data_list = data_list if isinstance(data_list, list) else []
# 验证 专利剩余年限 # 验证 专利剩余年限
# TODO 无法验证 专利剩余保护期>10年(10分)5-10年(7分)<5年(3分) # TODO 无法验证 专利剩余保护期>10年(10分)5-10年(7分)<5年(3分)
# 发展潜力D相关参数 专利数量 # 发展潜力D相关参数 专利数量
# 查询匹配申请号的记录集合 # 查询匹配申请号的记录集合
matched = [item for item in data_list if isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)] matched = [item for item in data_list if isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)]
@ -277,9 +277,9 @@ async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str,
# 流量因子B12相关参数 # 流量因子B12相关参数
# 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API # 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API
baidu_index = "暂无" baidu_index = 0.0
wechat_index = universal_api.wx_index(data.asset_name) # 通过资产信息获取微信指数 TODO 这里返回的没确认指数参数,有可能返回的图示是指数信息 wechat_index = wechat_index_calculator.process_wechat_index_response(universal_api.wx_index(data.asset_name)) # 通过资产信息获取微信指数 TODO 这里返回的没确认指数参数,有可能返回的图示是指数信息
weibo_index = "暂无" weibo_index = 0.0
search_index_s1 = calculate_search_index_s1(baidu_index,wechat_index,weibo_index) # 默认值实际应从API获取 search_index_s1 = calculate_search_index_s1(baidu_index,wechat_index,weibo_index) # 默认值实际应从API获取
# 行业均值S2 TODO 系统内置 未找到相关内容 # 行业均值S2 TODO 系统内置 未找到相关内容
industry_average_s2 = 0.0 industry_average_s2 = 0.0
@ -367,8 +367,9 @@ async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str,
# 获取 文化价值B2 相关参数 # 获取 文化价值B2 相关参数
async def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]: async def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]:
# 过去30天最高价格 过去30天最低价格 TODO 需要根据字样进行切分获取最高价和最低价 转换成 float 类型 # 过去30天最高价格 过去30天最低价格
highest_price,lowest_price= data.price_fluctuation price_fluctuation = [float(i) for i in data.price_fluctuation ]
highest_price,lowest_price= max(price_fluctuation), min(price_fluctuation)
lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取) lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取)
inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表 inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表
return { return {
@ -387,7 +388,7 @@ async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str,
# expert_valuations: 专家估值列表(系统配置) # expert_valuations: 专家估值列表(系统配置)
transaction_data: Dict = None transaction_data: Dict = None
manual_bids: List[float] = None manual_bids: List[float] = None
expert_valuations: List[float] = None expert_valuations = [random.uniform(0, 1) for _ in range(3)]
# 浏览热度分 TODO 需要先确定平台信息 # 浏览热度分 TODO 需要先确定平台信息
daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位) daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位)
collection_count = 50 # 收藏数(默认占位) collection_count = 50 # 收藏数(默认占位)

View File

@ -18,11 +18,11 @@ class UserValuationController:
def __init__(self): def __init__(self):
self.model = ValuationAssessment self.model = ValuationAssessment
async def create_valuation(self, user_id: int, data: UserValuationCreate, calculation_result: dict = None, calculation_input: dict = None, drp_result: float = None) -> UserValuationDetail: async def create_valuation(self, user_id: int, data: UserValuationCreate, calculation_result: dict = None, calculation_input: dict = None, drp_result: float = None, status: str = 'pending') -> UserValuationDetail:
"""用户创建估值评估""" """用户创建估值评估"""
valuation_data = data.model_dump() valuation_data = data.model_dump()
valuation_data['user_id'] = user_id valuation_data['user_id'] = user_id
valuation_data['status'] = 'pending' # 默认状态为待审核 valuation_data['status'] = status # 根据计算结果显示设置状态
# 添加计算结果到数据库 # 添加计算结果到数据库
if calculation_result: if calculation_result:

View File

@ -111,21 +111,21 @@ class APIConfig:
"path": "/api/douyin/get-video-detail/v2", "path": "/api/douyin/get-video-detail/v2",
"method": "GET", "method": "GET",
"description": "该接口用于获取指定抖音视频的详细信息,包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等", "description": "该接口用于获取指定抖音视频的详细信息,包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等",
"required_params": ["token", "videoId"], "required_params": ["videoId"],
"optional_params": [] "optional_params": []
}, },
"kuaishou_video_detail": { "kuaishou_video_detail": {
"path": "/api/kuaishou/get-video-detail/v2", "path": "/api/kuaishou/get-video-detail/v2",
"method": "GET", "method": "GET",
"description": "该接口用于获取指定快手视频的详细信息,包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等", "description": "该接口用于获取指定快手视频的详细信息,包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等",
"required_params": ["token", "videoId"], "required_params": ["videoId"],
"optional_params": [] "optional_params": []
}, },
"bilibili_video_detail": { "bilibili_video_detail": {
"path": "/api/bilibili/get-video-detail/v2", "path": "/api/bilibili/get-video-detail/v2",
"method": "GET", "method": "GET",
"description": "该接口用于获取指定B站视频的详细信息包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等", "description": "该接口用于获取指定B站视频的详细信息包括视频地址、描述文案、作者信息、发布时间、播放量、点赞数、评论数与分享数等",
"required_params": ["token", "bvid"], "required_params": [ "bvid"],
"optional_params": [] "optional_params": []
} }
} }

View File

@ -14,8 +14,10 @@ from typing import Dict, Any, Optional, Union
import json import json
import time import time
import urllib.parse import urllib.parse
from .api_config import api_config from .api_config import api_config
# 配置日志 # 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -62,7 +64,8 @@ class UniversalAPIManager:
provider_config = self._get_provider_config(provider) provider_config = self._get_provider_config(provider)
return provider_config.get('api_key') return provider_config.get('api_key')
def make_request(self, provider: str, endpoint: str, params: Dict[str, Any], timeout: Optional[int] = None) -> Dict[str, Any]: def make_request(self, provider: str, endpoint: str, params: Dict[str, Any], timeout: Optional[int] = None) -> Dict[
str, Any]:
""" """
发送API请求 发送API请求
@ -260,7 +263,6 @@ class UniversalAPIManager:
# 这里不需要修改paramsAuthorization会在make_request中处理 # 这里不需要修改paramsAuthorization会在make_request中处理
pass pass
return prepared_params return prepared_params
def make_request(self, def make_request(self,
@ -389,7 +391,6 @@ class UniversalAPIManager:
return self.make_request('justoneapi', 'platform_type', params) return self.make_request('justoneapi', 'platform_type', params)
# 站长之家API的便捷方法 # 站长之家API的便捷方法
def query_copyright_software(self, company_name: str, chinaz_ver: str = "1") -> Dict[str, Any]: def query_copyright_software(self, company_name: str, chinaz_ver: str = "1") -> Dict[str, Any]:
"""查询企业软件著作权""" """查询企业软件著作权"""
@ -460,6 +461,15 @@ class UniversalAPIManager:
return self.make_request('jizhiliao', 'index_search', params) return self.make_request('jizhiliao', 'index_search', params)
def douyin_video_detail(self, video_id: str) -> Dict[str, Any]:
"""抖音 视频详情接口"""
params = {
'videoId': video_id,
}
return self.make_request('justoneapi', 'douyin_video_detail', params)
# 通用方法 # 通用方法
def list_providers(self) -> list: def list_providers(self) -> list:
"""列出所有可用的API提供商""" """列出所有可用的API提供商"""
@ -498,4 +508,6 @@ class UniversalAPIManager:
# 创建全局实例 # 创建全局实例
universal_api = UniversalAPIManager() universal_api = UniversalAPIManager()