from random import random from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status from typing import Optional, List, Dict, Any import json import asyncio import time from app.controllers.user_valuation import user_valuation_controller from app.schemas.valuation import ( UserValuationCreate, UserValuationQuery, UserValuationList, UserValuationOut, UserValuationDetail ) from app.schemas.base import Success, SuccessExtra from app.utils.app_user_jwt import get_current_app_user_id, get_current_app_user from app.utils.calculation_engine import FinalValueACalculator from app.utils.calculation_engine.drp import DynamicPledgeRateCalculator from app.utils.calculation_engine.economic_value_b1.sub_formulas.basic_value_b11 import calculate_popularity_score, \ calculate_infringement_score, calculate_patent_usage_score, calculate_patent_score from app.utils.calculation_engine.economic_value_b1.sub_formulas.traffic_factor_b12 import calculate_search_index_s1 from app.log.log import logger from app.models.esg import ESG from app.models.industry import Industry from app.models.policy import Policy from app.models.user import AppUser from app.utils.universal_api_manager import universal_api from app.utils.wechat_index_calculator import wechat_index_calculator app_valuations_router = APIRouter(tags=["用户端估值评估"]) @app_valuations_router.post("/", summary="创建估值评估") async def calculate_valuation( data: UserValuationCreate, user_id: int = Depends(get_current_app_user_id) ): """ 计算估值评估 根据用户提交的估值评估数据,调用计算引擎进行经济价值B1计算 请求示例JSON (仅包含用户填写部分): { "asset_name": "传统刺绣工艺", "institution": "某文化传承机构", "industry": "传统手工艺", // 财务状况 (用户填写) "annual_revenue": "500", // 近12个月机构营收/万元 "rd_investment": "50", // 近12个月机构研发投入/万元 "three_year_income": [400, 450, 500], // 近三年机构收益/万元 // 非遗等级与技术 (用户填写) "inheritor_level": "国家级传承人", // 非遗传承人等级 "inheritor_ages": [45, 60, 75], // 传承人年龄列表 "heritage_level": "国家级", // 非遗等级 "patent_application_no": "CN202310123456.7", // 专利申请号 "patent_remaining_years": "15", // 专利剩余年限 "historical_evidence": { // 历史证明证据及数量 "历史文献": 3, "考古发现": 2, "传承谱系": 5 }, "pattern_images": ["demo.jpg"], // 非遗纹样图片 // 非遗应用与推广 (用户填写) "application_maturity": "成熟应用", // 应用成熟度 "application_coverage": "全国覆盖", // 应用覆盖范围 "cooperation_depth": "0.5", // 跨界合作深度 "offline_activities": "12", // 近12个月线下宣讲活动次数 "online_accounts": [ // 线上宣传账号信息 {"platform": "抖音", "account": "传统刺绣大师"}, {"platform": "微博", "account": "非遗传承人"} ], // 市场信息 (用户填写) "sales_volume": "1000", // 近12个月销售量 "link_views": "5000", // 近12个月链接浏览量 "circulation": "限量", // 发行量 "last_market_activity": "2024-01-15", // 最近一次市场活动时间 "price_fluctuation": [95.0, 105.0], // 近30天价格波动区间 "manual_bids": [48000.0, 50000.0, 52000.0], // 手动收集的竞价列表 // 政策相关 (用户填写) "funding_status": "国家级资助", // 资金支持情况 "implementation_stage": "成熟应用" // 实施阶段 } API获取参数 (系统自动获取,无需用户填写): - 搜索指数: 百度、微信、微博搜索指数 - 社交媒体数据: 点赞数、评论数、转发数、粉丝数 - 交易数据: 近3个月加权平均价格 - 热度数据: 近7日日均浏览量、收藏数 - ESG评分: 根据行业自动匹配 - 行业系数: 根据行业ROE计算 - 政策匹配度: 根据行业自动匹配 - 专利验证: 通过API验证专利有效性 - 侵权记录: 通过API查询侵权诉讼历史 """ try: start_ts = time.monotonic() logger.info("valuation.calc_start user_id={} asset_name={} industry={}", user_id, getattr(data, 'asset_name', None), getattr(data, 'industry', None)) # 根据行业查询 ESG 基准分(优先用行业名称匹配,如用的是行业代码就把 name 改成 code) esg_obj = None industry_obj = None policy_obj = None try: esg_obj = await asyncio.wait_for(ESG.filter(name=data.industry).first(), timeout=2.0) except Exception as e: logger.warning("valuation.esg_fetch_timeout industry={} err={}", data.industry, repr(e)) esg_score = float(getattr(esg_obj, 'number', 0.0) or 0.0) # 根据行业查询 行业修正系数与ROE try: industry_obj = await asyncio.wait_for(Industry.filter(name=data.industry).first(), timeout=2.0) except Exception as e: logger.warning("valuation.industry_fetch_timeout industry={} err={}", data.industry, repr(e)) fix_num_score = getattr(industry_obj, 'fix_num', 0.0) or 0.0 # 根据行业查询 政策匹配度 try: policy_obj = await asyncio.wait_for(Policy.filter(name=data.industry).first(), timeout=2.0) except Exception as e: logger.warning("valuation.policy_fetch_timeout industry={} err={}", data.industry, repr(e)) policy_match_score = getattr(policy_obj, 'score', 0.0) or 0.0 # 提取 经济价值B1 计算参数 input_data_by_b1 = await _extract_calculation_params_b1(data) # ESG关联价值 ESG分 (0-10分) input_data_by_b1["esg_score"] = esg_score # 行业修正系数I input_data_by_b1["industry_coefficient"] = fix_num_score # 政策匹配度 input_data_by_b1["policy_match_score"] = policy_match_score # 侵权分 默认 6 try: judicial_data = universal_api.query_judicial_data(data.institution) _data = judicial_data["data"].get("target",None) # 诉讼标的 if _data: infringement_score = 0.0 else: infringement_score = 10.0 except: infringement_score = 0.0 input_data_by_b1["infringement_score"] = infringement_score # 获取专利信息 TODO 参数 try: patent_data = universal_api.query_patent_info(data.industry) except Exception as e: logger.warning("valuation.patent_api_error err={}", repr(e)) input_data_by_b1["patent_count"] = 0.0 input_data_by_b1["patent_score"] = 0.0 patent_dict = patent_data if isinstance(patent_data, dict) else {} inner_data = patent_dict.get("data", {}) if isinstance(patent_dict.get("data", {}), dict) else {} data_list = inner_data.get("dataList", []) data_list = data_list if isinstance(data_list, list) else [] # 验证 专利剩余年限 # 发展潜力D相关参数 专利数量 # 查询匹配申请号的记录集合 matched = [item for item in data_list if isinstance(item, dict) and item.get("SQH") == getattr(data, 'patent_application_no', None)] if matched: patent_count = calculate_patent_usage_score(len(matched)) input_data_by_b1["patent_count"] = float(patent_count) else: input_data_by_b1["patent_count"] = 0.0 patent_score = calculate_patent_score(calculate_total_years(data_list)) input_data_by_b1["patent_score"] = patent_score # 提取 文化价值B2 计算参数 input_data_by_b2 = await _extract_calculation_params_b2(data) # 提取 风险调整系数B3 计算参数 input_data_by_b3 = await _extract_calculation_params_b3(data) if infringement_score == 10.0: input_data_by_b3["lawsuit_status"] = "无诉讼状态" if 0 < infringement_score < 4.0: input_data_by_b3["lawsuit_status"] = "已解决诉讼" else: input_data_by_b3["lawsuit_status"] = "未解决诉讼" # 提取 市场估值C 参数 input_data_by_c = await _extract_calculation_params_c(data) input_data = { # 模型估值B 相关参数 "model_data": { # 经济价值B1 参数 "economic_data": input_data_by_b1, # 文化价值B2 参数 "cultural_data": input_data_by_b2, # 风险调整参数 B3 "risky_data": input_data_by_b3, }, # 市场估值C 参数 "market_data": input_data_by_c, } calculator = FinalValueACalculator() # 计算最终估值A(统一计算) calculation_result = calculator.calculate_complete_final_value_a(input_data) # 计算动态质押 drp_c = DynamicPledgeRateCalculator() ''' monthly_amount (float): 月交易额(万元) heritage_level (str): 非遗等级 ''' # 解析月交易额字符串为数值 monthly_amount = drp_c.parse_monthly_transaction_amount(data.monthly_transaction_amount or "") drp_result = drp_c.calculate_dynamic_pledge_rate(monthly_amount, data.heritage_asset_level) # 结构化日志:关键分值 try: duration_ms = int((time.monotonic() - start_ts) * 1000) logger.info( "valuation.calc_done user_id={} duration_ms={} model_value_b={} market_value_c={} final_value_ab={}", user_id, duration_ms, calculation_result.get('model_value_b'), calculation_result.get('market_value_c'), calculation_result.get('final_value_ab'), ) except Exception: pass # 创建估值评估记录 result = await user_valuation_controller.create_valuation( user_id=user_id, data=data, calculation_result=calculation_result, calculation_input={ 'model_data': { 'economic_data': list(input_data.get('model_data', {}).get('economic_data', {}).keys()), 'cultural_data': list(input_data.get('model_data', {}).get('cultural_data', {}).keys()), 'risky_data': list(input_data.get('model_data', {}).get('risky_data', {}).keys()), }, 'market_data': list(input_data.get('market_data', {}).keys()), }, drp_result=drp_result, status='approved' # 计算成功,设置为approved状态 ) # 组装返回 result_dict = json.loads(result.model_dump_json()) # "calculation_result": { # "model_value_b": 660.1534497474814, # "market_value_c": 8800.0, # "final_value_ab": 3102.107414823237 # } result_dict['calculation_result'] = calculation_result result_dict['calculation_input'] = { 'model_data': { 'economic_data': list(input_data.get('model_data', {}).get('economic_data', {}).keys()), 'cultural_data': list(input_data.get('model_data', {}).get('cultural_data', {}).keys()), 'risky_data': list(input_data.get('model_data', {}).get('risky_data', {}).keys()), }, 'market_data': list(input_data.get('market_data', {}).keys()), } return Success(data=result_dict, msg="估值计算完成") except Exception as e: import traceback print(traceback.format_exc()) logger.error("valuation.calc_failed user_id={} err={}", user_id, repr(e)) # 计算失败时也创建记录,状态设置为failed try: result = await user_valuation_controller.create_valuation( user_id=user_id, data=data, calculation_result=None, calculation_input=None, drp_result=None, status='rejected' # 计算失败,设置为rejected状态 ) logger.info("valuation.failed_record_created user_id={} valuation_id={}", user_id, result.id) except Exception as create_error: logger.error("valuation.failed_to_create_record user_id={} err={}", user_id, repr(create_error)) raise HTTPException(status_code=400, detail=f"计算失败: {str(e)}") async def _extract_calculation_params_b1(data: UserValuationCreate) -> Dict[str, Any]: """ 从用户提交的数据中提取计算所需的参数 Args: data: 用户提交的估值评估数据 Returns: Dict: 计算所需的参数字典 """ # 基础价值B11相关参数 # 财务价值所需数据 从近三年收益计算 three_year_income = data.three_year_income or [0, 0, 0] # 法律强度L相关参数 # 普及地域分值 默认 7分 popularity_score = calculate_popularity_score(data.application_coverage) # 创新投入比 = (研发费用/营收) * 100 try: rd_investment = float(data.rd_investment or 0) annual_revenue = float(data.annual_revenue or 1) # 避免除零 innovation_ratio = (rd_investment / annual_revenue) * 100 if annual_revenue > 0 else 0 except (ValueError, TypeError): innovation_ratio = 0.0 # 流量因子B12相关参数 # 近30天搜索指数S1 - 从社交媒体数据计算 TODO 需要使用第三方API baidu_index = 0 # 获取微信指数并计算近30天平均值 try: wechat_index_response = universal_api.wx_index(data.asset_name) wechat_index = wechat_index_calculator.process_wechat_index_response(wechat_index_response) logger.info(f"资产 '{data.asset_name}' 的微信指数近30天平均值: {wechat_index}") except Exception as e: logger.error(f"获取微信指数失败: {e}") wechat_index = 0 weibo_index = 0 search_index_s1 = calculate_search_index_s1(baidu_index, wechat_index, weibo_index) # 默认值,实际应从API获取 # 行业均值S2 - 从数据库查询行业数据计算 from app.utils.industry_calculator import calculate_industry_average_s2 industry_average_s2 = await calculate_industry_average_s2(data.industry) # 社交媒体传播度S3 - TODO 需要使用第三方API,click_count view_count 未找到对应参数 # likes: 点赞数(API获取) # comments: 评论数(API获取) # shares: 转发数(API获取) # followers: 粉丝数 # click_count: 商品链接点击量(用户填写) sales_volume 使用 sales_volume 销售量 # view_count: 内容浏览量(用户填写) link_views # 政策乘数B13相关参数 # 政策契合度评分P - 根据行业和资助情况计算 # 实施阶段 - 需要转换为对应的评分 implementation_stage_str = data.application_maturity or "成熟应用" # 资金支持 - 需要转换为对应的评分 funding_support_str = data.funding_status or "无资助" # 使用PolicyMultiplierB13Calculator来计算评分 from app.utils.calculation_engine.economic_value_b1.sub_formulas.policy_multiplier_b13 import \ PolicyMultiplierB13Calculator policy_calculator = PolicyMultiplierB13Calculator() implementation_stage = policy_calculator.calculate_implementation_stage_score(implementation_stage_str) funding_support = policy_calculator.calculate_funding_support_score(funding_support_str) return { # 基础价值B11相关参数 'three_year_income': three_year_income, 'popularity_score': popularity_score, 'innovation_ratio': innovation_ratio, # 流量因子B12相关参数 'search_index_s1': search_index_s1, 'industry_average_s2': industry_average_s2, 'social_media_spread_s3': 0.0, # 这些社交数据暂未来源,置为0,后续接入API再填充 'likes': 0, 'comments': 0, 'shares': 0, # followers 非当前计算用键,先移除避免干扰 # click_count 与 view_count 目前未参与计算,先移除 # 政策乘数B13相关参数 'implementation_stage': implementation_stage, 'funding_support': funding_support } # 获取 文化价值B2 相关参数 async def _extract_calculation_params_b2(data: UserValuationCreate) -> Dict[str, Any]: """ argrg: data: 用户提交的估值评估数据 retus: Dict: 计算所需的参数字典 """ # 导入计算器来转换传承人等级 from app.utils.calculation_engine.cultural_value_b2.sub_formulas.living_heritage_b21 import \ LivingHeritageB21Calculator # 活态传承系数B21 县官参数 living_heritage_calculator = LivingHeritageB21Calculator() inheritor_level = data.inheritor_level or "市级传承人" # 设置默认值 inheritor_level_coefficient = living_heritage_calculator.calculate_inheritor_level_coefficient(inheritor_level) offline_sessions = int(data.offline_activities) # 线下传习次数 # 以下调用API douyin\bilibili\kuaishou douyin_views = 0 kuaishou_views = 0 bilibili_views = 0 # 跨界合作深度 品牌联名0.3,科技载体0.5,国家外交礼品1.0 cross_border_depth = float(data.cooperation_depth) # 纹样基因值B22相关参数 # 以下三项需由后续模型/服务计算;此处提供默认可计算占位 historical_inheritance = 0.8 structure_complexity = 0.75 normalized_entropy = 0.85 return { "inheritor_level_coefficient": inheritor_level_coefficient, "offline_sessions": offline_sessions, "douyin_views": douyin_views, "kuaishou_views": kuaishou_views, "bilibili_views": bilibili_views, "cross_border_depth": cross_border_depth, "historical_inheritance": historical_inheritance, "structure_complexity": structure_complexity, "normalized_entropy": normalized_entropy, } # 获取 文化价值B2 相关参数 async def _extract_calculation_params_b3(data: UserValuationCreate) -> Dict[str, Any]: # 过去30天最高价格 过去30天最低价格 TODO 需要根据字样进行切分获取最高价和最低价 转换成 float 类型 price_fluctuation = [float(i) for i in data.price_fluctuation] highest_price, lowest_price = max(price_fluctuation), min(price_fluctuation) # lawsuit_status = "无诉讼" # 诉讼状态 TODO (API获取) inheritor_ages = data.inheritor_age_count # [45, 60, 75] # 传承人年龄列表 return { "highest_price": highest_price, "lowest_price": lowest_price, "inheritor_ages": inheritor_ages, } # 获取 市场估值C 相关参数 async def _extract_calculation_params_c(data: UserValuationCreate) -> Dict[str, Any]: # 市场竞价C1 TODO 暂无 # transaction_data: 交易数据字典(API获取) # manual_bids: 手动收集的竞价列表(用户填写) # expert_valuations: 专家估值列表(系统配置) transaction_data: Dict = None manual_bids: List[float] = None # TODO 需要客户确认 三个数值 expert_valuations = None # 浏览热度分 TODO 需要先确定平台信息 daily_browse_volume = 500.0 # 近7日日均浏览量(默认占位) collection_count = 50 # 收藏数(默认占位) # 稀缺性乘数C3 发行量 circulation = data.circulation or '限量' # 时效性衰减C4 参数 用户选择距离最近一次市场活动(交易、报价、评估)的相距时间 recent_market_activity = data.last_market_activity # 如果为空、None或"0",设置默认值 if not recent_market_activity or recent_market_activity == "0": recent_market_activity = '2024-01-15' return { # 计算市场竞价C1 # C1 的实现接受 transaction_data={'weighted_average_price': x} "weighted_average_price": transaction_data, "manual_bids": manual_bids, # 手动收集的竞价列表 (用户填写) "expert_valuations": expert_valuations, # 专家估值列表 (系统配置) # 计算热度系数C2 "daily_browse_volume": daily_browse_volume, # 近7日日均浏览量 (API获取) "collection_count": collection_count, # 收藏数 "issuance_level": circulation, # 默认 限量发行 计算稀缺性乘数C3 "recent_market_activity": recent_market_activity, # 默认 '近一月' 计算市场估值C } @app_valuations_router.get("/", summary="获取我的估值评估列表") async def get_my_valuations( query: UserValuationQuery = Depends(), current_user: AppUser = Depends(get_current_app_user) ): """ 获取当前用户的估值评估列表 """ try: result = await user_valuation_controller.get_user_valuations( user_id=current_user.id, query=query ) # 使用model_dump_json()来正确序列化datetime,然后解析为dict列表 import json serialized_items = [json.loads(item.model_dump_json()) for item in result.items] return SuccessExtra( data=serialized_items, total=result.total, page=result.page, page_size=result.size, pages=result.pages, msg="获取估值评估列表成功" ) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取估值评估列表失败: {str(e)}" ) @app_valuations_router.get("/{valuation_id}", summary="获取估值评估详情") async def get_valuation_detail( valuation_id: int, current_user: AppUser = Depends(get_current_app_user) ): """ 获取指定估值评估的详细信息 """ try: result = await user_valuation_controller.get_user_valuation_detail( user_id=current_user.id, valuation_id=valuation_id ) if not result: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="估值评估记录不存在" ) # 使用model_dump_json()来正确序列化datetime,然后解析为dict import json result_dict = json.loads(result.model_dump_json()) return Success(data=result_dict, msg="获取估值评估详情成功") except HTTPException: raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取估值评估详情失败: {str(e)}" ) @app_valuations_router.get("/statistics/overview", summary="获取我的估值评估统计") async def get_my_valuation_statistics( current_user: AppUser = Depends(get_current_app_user) ): """ 获取当前用户的估值评估统计信息 """ try: result = await user_valuation_controller.get_user_valuation_statistics( user_id=current_user.id ) return Success(data=result, msg="获取统计信息成功") except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取统计信息失败: {str(e)}" ) def calculate_total_years(data_list): current_date = datetime.now().date() total_years = 0 date_count = 0 for item in data_list: if "SQRQ" in item and item["SQRQ"]: try: # 解析日期字符串 sqrq_date = datetime.strptime(item["SQRQ"], "%Y-%m-%d").date() # 计算与当前日期的差值(年,包含小数) date_diff = current_date - sqrq_date years_diff = date_diff.days / 365.25 total_years += years_diff date_count += 1 except ValueError as e: return 0 return total_years