up 市场估值C wx index 专利

This commit is contained in:
dubingyan666 2025-10-05 17:37:40 +08:00
parent c61f000292
commit e68610bd54
11 changed files with 594 additions and 17 deletions

View File

@ -23,7 +23,7 @@ from app.utils.calculation_engine.risk_adjustment_b3 import RiskAdjustmentB3Calc
from app.models.esg import ESG
from app.models.industry import Industry
from app.models.policy import Policy
from app.utils.universal_api_manager import universal_api
app_valuations_router = APIRouter(tags=["用户端估值评估"])
@ -66,10 +66,33 @@ async def calculate_valuation(
# 政策匹配度
input_data_by_b1["policy_match_score"] = policy_match_score
# 获取专利信息 TODO 参数
try:
patent_data = universal_api.query_patent_info("未找到 企业名称、企业统代、企业注册号")
# 解析专利数量
patentData = patent_data.get("data", {"dataList":[]})
patent_data_num = len(patentData["dataList"])
# 查询专利是否存在
patent_data_al = list(filter(lambda x: x.get("SQH") == data.patent_application_no, patentData))
if len(patent_data_al) > 0:
# 验证 专利剩余年限
# TODO 无法验证 专利剩余保护期>10年(10分)5-10年(7分)<5年(3分)
# 发展潜力D相关参数 专利数量
patent_count = calculate_patent_usage_score(len(patent_data_al))
input_data_by_b1["patent_count"] = patent_count
else:
input_data_by_b1["patent_count"] = 0
except Exception as e:
input_data_by_b1["patent_count"] = 0
# 提取 文化价值B2 计算参数
input_data_by_b2 = _extract_calculation_params_b2(data)
# 提取 风险调整系数B3 计算参数
input_data_by_b3 = _extract_calculation_params_b3(data)
# 提取 市场估值C 参数
input_data_by_c = _extract_calculation_params_c(data)
# 调用 经济价值B1 计算引擎
calculation_result_by_b1 = calculator_by_b1.calculate_complete_economic_value_b1(input_data_by_b1)
@ -111,19 +134,11 @@ def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str, Any]:
# 法律强度L相关参数
# 普及地域分值 默认 7分
popularity_score = calculate_popularity_score(data.application_coverage)
# TODO 专利剩余年限【专利证书】 需要使用第三方API
patent = data.patent_application_no
patent_score = 0
# 侵权分 默认 6 TODO 需要使用第三方API 无侵权记录(10分),历史侵权已解决(6分),现存纠纷(2分)
# infringement_score = calculate_infringement_score()
infringement_score = calculate_infringement_score("无侵权信息")
infringement_score = 0
# 发展潜力D相关参数
# 专利使用量 TODO 需要使用第三方API
patent_score = calculate_patent_usage_score(0)
# 创新投入比 = (研发费用/营收) * 100
try:
rd_investment = float(data.rd_investment or 0)
@ -132,10 +147,13 @@ def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str, Any]:
except (ValueError, TypeError):
innovation_ratio = 0.0
#
# 流量因子B12相关参数
# 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API
search_index_s1 = calculate_search_index_s1() # 默认值实际应从API获取
baidu_index = "暂无"
wechat_index = universal_api.wx_index(data.asset_name) # 通过资产信息获取微信指数 TODO 这里返回的没确认指数参数,有可能返回的图示是指数信息
weibo_index = "暂无"
search_index_s1 = calculate_search_index_s1(baidu_index,wechat_index,weibo_index) # 默认值实际应从API获取
# 行业均值S2 TODO 系统内置 未找到相关内容
industry_average_s2 = 0.0
# 社交媒体传播度S3 - TODO 需要使用第三方API,click_count view_count 未找到对应参数
@ -156,7 +174,6 @@ def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str, Any]:
return {
# 基础价值B11相关参数
'three_year_income': three_year_income,
'patent_score': patent_score,
'popularity_score': popularity_score,
'infringement_score': infringement_score,
'innovation_ratio': innovation_ratio,
@ -227,3 +244,13 @@ def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]:
"lawsuit_status": lawsuit_status,
"inheritor_ages": inheritor_ages,
}
# 获取 市场估值C 相关参数
def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str, Any]:
# 稀缺性乘数C3 发行量
circulation = data.circulation\
# 时效性衰减C4 参数 用户选择距离最近一次市场活动(交易、报价、评估)的相距时间
last_market_activity = data.last_market_activity

View File

@ -60,7 +60,7 @@ class APIConfig:
"base_url": "https://api.justoneapi.com",
"timeout": 30,
"retries": 3,
"endpoints": {
"endpoints": {
"xiaohongshu_note_detail": {
"path": "/api/xiaohongshu/get-note-detail/v7",
"method": "GET",
@ -84,8 +84,25 @@ class APIConfig:
}
}
},
# 微信指数
"dajiala": {
"api_key": "",
"base_url": "https://www.dajiala.com",
"timeout": 30,
"retries": 3,
"endpoints": {
"web_search":{
"path": "/fbmain/monitor/v3/web_search",
"method": "POST",
"description": "获取微信指数",
"required_params": ["keyword","key"],
"optional_params": []
}
}
},
"other_apis": {
# 可以添加其他第三方API配置
"example_api": {
"api_key": os.getenv("EXAMPLE_API_KEY", ""),
"base_url": "https://api.example.com",

View File

@ -72,12 +72,12 @@ class EconomicValueB1Calculator:
input_data["infringement_score"],
)
# 发展潜力 patent_score: 专利分 (0-10分) (用户填写)
# 发展潜力 patent_count: 专利分 (0-10分) (用户填写)
# esg_score: ESG分 (0-10分) (用户填写)
# innovation_ratio: 创新投入比 (研发费用/营收) * 100 (用户填写)
development_potential = self.basic_value_calculator.calculate_development_potential_d(
input_data["esg_score"],
input_data["patent_score"],
input_data["patent_count"],
input_data["innovation_ratio"],
)
# 计算行业系数I target_industry_roe: 目标行业平均ROE (系统配置)

View File

@ -0,0 +1,16 @@
from .market_value_c import MarketValueCCalculator
from .sub_formulas import (
MarketBiddingC1Calculator,
HeatCoefficientC2Calculator,
ScarcityMultiplierC3Calculator,
TemporalDecayC4Calculator
)
__all__ = [
"MarketValueCCalculator",
"MarketBiddingC1Calculator",
"HeatCoefficientC2Calculator",
"ScarcityMultiplierC3Calculator",
"TemporalDecayC4Calculator"
]

View File

@ -0,0 +1,145 @@
from typing import Dict, List, Optional
try:
# 相对导入(当作为包使用时)
from .sub_formulas.market_bidding_c1 import MarketBiddingC1Calculator
from .sub_formulas.heat_coefficient_c2 import HeatCoefficientC2Calculator
from .sub_formulas.scarcity_multiplier_c3 import ScarcityMultiplierC3Calculator
from .sub_formulas.temporal_decay_c4 import TemporalDecayC4Calculator
except ImportError:
# 绝对导入(当直接运行时)
from sub_formulas.market_bidding_c1 import MarketBiddingC1Calculator
from sub_formulas.heat_coefficient_c2 import HeatCoefficientC2Calculator
from sub_formulas.scarcity_multiplier_c3 import ScarcityMultiplierC3Calculator
from sub_formulas.temporal_decay_c4 import TemporalDecayC4Calculator
class MarketValueCCalculator:
"""市场估值C计算器"""
def __init__(self):
"""初始化计算器"""
self.market_bidding_calculator = MarketBiddingC1Calculator()
self.heat_coefficient_calculator = HeatCoefficientC2Calculator()
self.scarcity_multiplier_calculator = ScarcityMultiplierC3Calculator()
self.temporal_decay_calculator = TemporalDecayC4Calculator()
def calculate_market_value_c(self,
market_bidding_c1: float,
heat_coefficient_c2: float,
scarcity_multiplier_c3: float,
temporal_decay_c4: float) -> float:
"""
计算市场估值C
市场估值C = 市场竞价C1 × 热度系数C2 × 稀缺性乘数C3 × 时效性衰减C4
args:
market_bidding_c1: 市场竞价C1 (系统计算)
heat_coefficient_c2: 热度系数C2 (系统计算)
scarcity_multiplier_c3: 稀缺性乘数C3 (系统计算)
temporal_decay_c4: 时效性衰减C4 (系统计算)
return:
float: 市场估值C
"""
market_value = (market_bidding_c1 * heat_coefficient_c2 *
scarcity_multiplier_c3 * temporal_decay_c4)
return market_value
def calculate_complete_market_value_c(self, input_data: Dict) -> Dict:
"""
计算完整的市场估值C包含所有子公式
args:
input_data: 输入数据字典包含所有必要的参数
参数来源标记用户填写/系统配置/API获取/系统计算
- average_transaction_price: 系统计算(基于用户填写/API获取)
- market_activity_coefficient: 系统计算(基于用户填写)
- daily_browse_volume: API获取/系统估算
- collection_count: API获取/系统估算
- issuance_level: 用户填写
- recent_market_activity: 用户填写
- issuance_scarcity/circulation_scarcity/uniqueness_scarcity: 系统配置/系统计算保留向后兼容
return:
Dict: 包含所有中间计算结果和最终结果的字典
"""
# 计算市场竞价C1
market_bidding_c1 = self.market_bidding_calculator.calculate_market_bidding_c1(
transaction_data={'weighted_average_price': input_data.get('average_transaction_price', 50000.0)},
manual_bids=[input_data.get('average_transaction_price', 50000.0)],
expert_valuations=[input_data.get('average_transaction_price', 50000.0)]
)
# 计算热度系数C2
heat_coefficient_c2 = self.heat_coefficient_calculator.calculate_heat_coefficient_c2(
input_data.get('daily_browse_volume', 500.0),
input_data.get('collection_count', 50)
)
# 计算稀缺性乘数C3
scarcity_multiplier_c3 = self.scarcity_multiplier_calculator.calculate_scarcity_multiplier_c3(
input_data.get('issuance_level', '限量发行')
)
# 计算时效性衰减C4
temporal_decay_c4 = self.temporal_decay_calculator.calculate_temporal_decay_c4(
input_data.get('recent_market_activity', '近一月')
)
# 计算市场估值C
market_value_c = self.calculate_market_value_c(
market_bidding_c1,
heat_coefficient_c2,
scarcity_multiplier_c3,
temporal_decay_c4
)
return {
'market_bidding_c1': market_bidding_c1,
'heat_coefficient_c2': heat_coefficient_c2,
'scarcity_multiplier_c3': scarcity_multiplier_c3,
'temporal_decay_c4': temporal_decay_c4,
'market_value_c': market_value_c
}
# 示例使用
if __name__ == "__main__":
# 创建计算器实例
calculator = MarketValueCCalculator()
# 示例数据
input_data = {
# 市场竞价C1相关参数
'average_transaction_price': 50000.0, # 平均交易价格
'market_activity_coefficient': 0.8, # 市场活跃度系数
# 热度系数C2相关参数
'search_heat': 0.7, # 搜索热度
'social_media_heat': 0.6, # 社交媒体热度
'media_coverage_heat': 0.5, # 媒体报道热度
# 稀缺性乘数C3相关参数
'issuance_scarcity': 0.8, # 发行量稀缺性
'circulation_scarcity': 0.7, # 流通率稀缺性
'uniqueness_scarcity': 0.9, # 独特性稀缺性
# 时效性衰减C4相关参数
'time_decay_coefficient': 0.3, # 时间衰减系数
'market_activity_decay': 0.2 # 市场活跃度衰减
}
# 计算市场估值C
result = calculator.calculate_complete_market_value_c(input_data)
print("市场估值C计算结果:")
print(f"市场竞价C1: {result['market_bidding_c1']:.2f}")
print(f"热度系数C2: {result['heat_coefficient_c2']:.4f}")
print(f"稀缺性乘数C3: {result['scarcity_multiplier_c3']:.4f}")
print(f"时效性衰减C4: {result['temporal_decay_c4']:.4f}")
print(f"市场估值C: {result['market_value_c']:.2f}")

View File

@ -0,0 +1,13 @@
from .market_bidding_c1 import MarketBiddingC1Calculator
from .heat_coefficient_c2 import HeatCoefficientC2Calculator
from .scarcity_multiplier_c3 import ScarcityMultiplierC3Calculator
from .temporal_decay_c4 import TemporalDecayC4Calculator
__all__ = [
"MarketBiddingC1Calculator",
"HeatCoefficientC2Calculator",
"ScarcityMultiplierC3Calculator",
"TemporalDecayC4Calculator"
]

View File

@ -0,0 +1,87 @@
class HeatCoefficientC2Calculator:
"""热度系数C2计算器"""
def __init__(self):
"""初始化计算器"""
pass
def calculate_heat_coefficient_c2(self,
daily_browse_volume: float,
collection_count: int) -> float:
"""
计算热度系数C2
热度系数C2 = 1 + 浏览热度分
args:
daily_browse_volume: 近7日日均浏览量 (API获取)
collection_count: 收藏数 (API获取)
return:
float: 热度系数C2
"""
# 计算浏览热度分
browse_heat_score = self.calculate_browse_heat_score(daily_browse_volume, collection_count)
heat_coefficient = 1 + browse_heat_score
return heat_coefficient
def calculate_browse_heat_score(self, daily_browse_volume: float, collection_count: int) -> float:
"""
计算浏览热度分
根据所查询到的浏览量匹配热度分
高热度 近7日日均浏览量 > 1000或收藏数 > 100 1.0
中热度 近7日日均浏览量 [100, 1000]或收藏数 [20, 100] 0.6
低热度 近7日日均浏览量 < 100且收藏数 < 20 0.2
无数据 无任何浏览与互动数据 0.0默认值
args:
daily_browse_volume: 近7日日均浏览量 (API获取)
collection_count: 收藏数 (API获取)
return:
float: 浏览热度分
"""
# 高热度条件
if daily_browse_volume > 1000 or collection_count > 100:
return 1.0
# 中热度条件
elif (daily_browse_volume >= 100 and daily_browse_volume <= 1000) or \
(collection_count >= 20 and collection_count <= 100):
return 0.6
# 低热度条件
elif daily_browse_volume < 100 and collection_count < 20:
return 0.2
# 无数据
else:
return 0.0
# 示例使用
if __name__ == "__main__":
# 创建计算器实例
calculator = HeatCoefficientC2Calculator()
# 示例数据
daily_browse_volume = 500.0 # 近7日日均浏览量 (API获取)
collection_count = 50 # 收藏数 (API获取)
# 计算热度系数C2
heat_coefficient_c2 = calculator.calculate_heat_coefficient_c2(
daily_browse_volume, collection_count
)
print(f"近7日日均浏览量: {daily_browse_volume}")
print(f"收藏数: {collection_count}")
print(f"浏览热度分: {calculator.calculate_browse_heat_score(daily_browse_volume, collection_count):.1f}")
print(f"热度系数C2: {heat_coefficient_c2:.4f}")

View File

@ -0,0 +1,92 @@
from typing import Dict, List
class MarketBiddingC1Calculator:
"""市场竞价C1计算器"""
def __init__(self):
"""初始化计算器"""
pass
def calculate_market_bidding_c1(self,
transaction_data: Dict = None,
manual_bids: List[float] = None,
expert_valuations: List[float] = None) -> float:
"""
计算市场竞价C1
1)取同一资产或同品类同等级资产最近3个月内成功交易的加权平均价格
2)人工采集主流文化产权交易所拍卖平台如阿里拍卖京东拍卖上相似资产的当前报价或起拍价取其中位数
3)若以上数据均缺失由系统内置专家库中随机指派3位专家独立给出估值取平均值
args:
transaction_data: 交易数据字典 (API获取)
manual_bids: 手动收集的竞价列表 (用户填写)
expert_valuations: 专家估值列表 (系统配置)
retusn:
float: 市场竞价C1
"""
# 优先级1近3个月交易数据
if transaction_data and transaction_data.get('weighted_average_price'):
return transaction_data['weighted_average_price']
# 优先级2手动收集的竞价数据
if manual_bids and len(manual_bids) > 0:
# 取中位数
sorted_bids = sorted(manual_bids)
n = len(sorted_bids)
if n % 2 == 0:
return (sorted_bids[n//2-1] + sorted_bids[n//2]) / 2
else:
return sorted_bids[n//2]
# 优先级3专家估值
if expert_valuations and len(expert_valuations) > 0:
return sum(expert_valuations) / len(expert_valuations)
# 默认值
return 0.0
# 示例使用
if __name__ == "__main__":
# 创建计算器实例
calculator = MarketBiddingC1Calculator()
# 示例数据
asset_type = "传统工艺" # 资产类型 (用户填写)
time_period = "近一年" # 时间周期 (用户填写)
transaction_volume = 500 # 交易量 (API获取)
transaction_frequency = "中频" # 交易频率 (用户填写)
market_liquidity = "" # 市场流动性 (用户填写)
# 获取平均交易价格
average_transaction_price = calculator.get_average_transaction_price(asset_type, time_period)
# 计算市场活跃度系数
market_activity_coefficient = calculator.calculate_market_activity_coefficient(
transaction_volume, transaction_frequency, market_liquidity
)
# 示例数据
# 优先级1近3个月交易数据
transaction_data = {"weighted_average_price": 1000.0} # API获取
# 优先级2手动收集的竞价数据
manual_bids = [950.0, 1000.0, 1050.0, 1100.0] # 用户填写
# 优先级3专家估值
expert_valuations = [980.0, 1020.0, 990.0] # 系统配置
# 计算市场竞价C1
market_bidding_c1 = calculator.calculate_market_bidding_c1(
transaction_data, manual_bids, expert_valuations
)
print(f"近3个月交易数据: {transaction_data}")
print(f"手动收集竞价: {manual_bids}")
print(f"专家估值: {expert_valuations}")
print(f"市场竞价C1: {market_bidding_c1:.2f}")

View File

@ -0,0 +1,71 @@
class ScarcityMultiplierC3Calculator:
"""稀缺性乘数C3计算器"""
def __init__(self):
"""初始化计算器"""
pass
def calculate_scarcity_multiplier_c3(self, issuance_level: str) -> float:
"""
计算稀缺性乘数C3
稀缺性乘数C3 = 1 + 稀缺等级分
args:
issuance_level: 资产发行等级 (用户选择)
Returns:
float: 稀缺性乘数C3
"""
# 计算稀缺等级分
scarcity_level_score = self.calculate_scarcity_level_score(issuance_level)
scarcity_multiplier = 1 + scarcity_level_score
return scarcity_multiplier
def calculate_scarcity_level_score(self, issuance_level: str) -> float:
"""
计算稀缺等级分
根据用户所选择的资产发行量匹配稀缺等级
孤品 全球唯一不可复制如特定版权唯一实物 1.0
限量 总发行份数 100 0.7
稀有 总发行份数 (100, 1000]或二级市场流通率 < 5% 0.4
流通 总发行份数 > 1000或二级市场流通率 5% 0.1
args:
issuance_level: 资产发行等级 (用户选择)
Returns:
float: 稀缺等级分
"""
scarcity_scores = {
"孤品": 1.0,
"限量": 0.7,
"稀有": 0.4,
"流通": 0.1
}
return scarcity_scores.get(issuance_level, 0.1)
# 示例使用
if __name__ == "__main__":
# 创建计算器实例
calculator = ScarcityMultiplierC3Calculator()
# 示例数据
issuance_level = "限量" # 资产发行等级 (用户选择)
# 计算稀缺性乘数C3
scarcity_multiplier_c3 = calculator.calculate_scarcity_multiplier_c3(issuance_level)
print(f"资产发行等级: {issuance_level}")
print(f"稀缺等级分: {calculator.calculate_scarcity_level_score(issuance_level):.1f}")
print(f"稀缺性乘数C3: {scarcity_multiplier_c3:.4f}")

View File

@ -0,0 +1,89 @@
from datetime import datetime, timedelta
class TemporalDecayC4Calculator:
"""时效性衰减C4计算器"""
def __init__(self):
"""初始化计算器"""
pass
def calculate_temporal_decay_c4(self, time_elapsed: str) -> float:
"""
计算时效性衰减C4
近一周1.0
近一月0.7
近一年0.4
其他0.1
基于最近一次市场活动交易报价鉴定距今天数
args:
time_elapsed: 距离最近市场活动的时间 (用户选择)
retuen:
float: 时效性衰减C4
"""
timeliness_coefficients = {
"近一周": 1.0,
"近一月": 0.7,
"近一年": 0.4,
"其他": 0.1
}
return timeliness_coefficients.get(self.classify_date_distance(time_elapsed), 0.1)
@staticmethod
def classify_date_distance(target_date):
"""
计算日期距离今天是近一周近一月近一年还是其他
target_date: 目标日期可以是字符串或datetime对象
字符串格式支持'YYYY-MM-DD', 'YYYY/MM/DD', 'YYYYMMDD'
"""
# 获取当前日期
today = datetime.now().date()
# 处理输入日期
if isinstance(target_date, str):
# 尝试不同的日期格式
formats = ['%Y-%m-%d', '%Y/%m/%d', '%Y%m%d']
date_obj = None
for fmt in formats:
try:
date_obj = datetime.strptime(target_date, fmt).date()
break
except ValueError:
continue
if date_obj is None:
raise ValueError(f"无法解析日期: {target_date}")
elif isinstance(target_date, datetime):
date_obj = target_date.date()
else:
date_obj = target_date
# 计算日期差
delta = today - date_obj
# 分类判断
if delta.days <= 7:
return "近一周"
elif delta.days <= 30:
return "近一月"
elif delta.days <= 365:
return "近一年"
else:
return "其他"
# 示例使用
if __name__ == "__main__":
# 创建计算器实例
calculator = TemporalDecayC4Calculator()
# 示例数据
time_elapsed = "2024-01-15" # 距离最近市场活动的时间 (用户选择)
# 计算时效性衰减C4
temporal_decay_c4 = calculator.calculate_temporal_decay_c4(time_elapsed)
print(f"距离最近市场活动时间: {time_elapsed}")
print(f"时效性衰减C4: {temporal_decay_c4:.4f}")

View File

@ -152,6 +152,12 @@ class UniversalAPIManager:
api_key = provider_config.get('api_key')
if api_key:
prepared_params['token'] = api_key
elif provider == 'dajiala':
# JustOneAPI需要token参数
api_key = provider_config.get('api_key')
if api_key:
prepared_params['key'] = api_key
return prepared_params
@ -224,6 +230,20 @@ class UniversalAPIManager:
'endpoint': endpoint
}
time.sleep(2 ** attempt) # 指数退避
def wx_index(self,keyword):
"""
微信指数
"""
data = {
"keyword": keyword,
"mode": 2,
"BusinessType": 8192,
"sub_search_type": 0,
"verifycode": ""
}
return self.make_request('dajiala', 'web_search', data)
# 站长之家API的便捷方法
def query_copyright_software(self, company_name: str, chinaz_ver: str = "1") -> Dict[str, Any]: