diff --git a/app/api/v1/app_valuations/app_valuations.py b/app/api/v1/app_valuations/app_valuations.py index 6ad99b9..0e0e949 100644 --- a/app/api/v1/app_valuations/app_valuations.py +++ b/app/api/v1/app_valuations/app_valuations.py @@ -71,18 +71,18 @@ async def _perform_valuation_calculation(user_id: int, data: UserValuationCreate logger.warning("valuation.policy_fetch_timeout industry={} err={}", data.industry, repr(e)) policy_match_score = getattr(policy_obj, 'score', 0.0) or 0.0 + + # 提取 经济价值B1 计算参数 + input_data_by_b1 = await _extract_calculation_params_b1( + data, esg_score=esg_score, industry_coefficient=fix_num_score, policy_match_score=policy_match_score + ) + # ESG关联价值 ESG分 (0-10分) input_data_by_b1["esg_score"] = esg_score # 行业修正系数I input_data_by_b1["industry_coefficient"] = fix_num_score # 政策匹配度 input_data_by_b1["policy_match_score"] = policy_match_score - # 提取 经济价值B1 计算参数 - input_data_by_b1 = await _extract_calculation_params_b1( - data, esg_score=esg_score, industry_coefficient=fix_num_score, policy_match_score=policy_match_score - ) - - # 侵权分 默认 6 try: @@ -151,7 +151,6 @@ async def _perform_valuation_calculation(user_id: int, data: UserValuationCreate "market_data": input_data_by_c, } - calculator = FinalValueACalculator() # 先创建估值记录以获取ID,方便步骤落库关联 initial_detail = await user_valuation_controller.create_valuation( @@ -165,8 +164,85 @@ async def _perform_valuation_calculation(user_id: int, data: UserValuationCreate valuation_id = initial_detail.id logger.info("valuation.init_created user_id={} valuation_id={}", user_id, valuation_id) + # 步骤1:立即更新计算输入参数(不管后续是否成功) + try: + await valuation_controller.update( + valuation_id, + ValuationAssessmentUpdate( + calculation_input=input_data, + ) + ) + logger.info("valuation.input_updated valuation_id={}", valuation_id) + except Exception as e: + logger.warning("valuation.failed_to_update_input valuation_id={} err={}", valuation_id, repr(e)) + + # 步骤1.5:更新内置API计算字段 + try: + # 准备内置API计算字段的值 + api_calc_fields = {} + + # ESG关联价值 + api_calc_fields["esg_value"] = str(esg_score) if esg_score is not None else None + + # 政策匹配度 + api_calc_fields["policy_matching"] = str(policy_match_score) if policy_match_score is not None else None + + # 侵权记录/法律风险 + infringement_record_value = "有侵权记录" if infringement_score == 0.0 else "无侵权记录" + api_calc_fields["infringement_record"] = infringement_record_value + api_calc_fields["legal_risk"] = infringement_record_value + + # 专利使用量 + patent_count_value = input_data_by_b1.get("patent_count", 0.0) + api_calc_fields["patent_count"] = str(patent_count_value) if patent_count_value is not None else None + + # 结构复杂度(纹样基因熵值B22) + structure_complexity_value = input_data_by_b2.get("structure_complexity", 1.5) + api_calc_fields["pattern_complexity"] = str(structure_complexity_value) if structure_complexity_value is not None else None + + # 归一化信息熵H + normalized_entropy_value = input_data_by_b2.get("normalized_entropy", 9) + api_calc_fields["normalized_entropy"] = str(normalized_entropy_value) if normalized_entropy_value is not None else None + + # 线上课程点击量(暂时没有计算逻辑,设为None或默认值) + # api_calc_fields["online_course_views"] = None + + # 基础质押率和流量修正系数(暂时没有计算逻辑,设为None或默认值) + # api_calc_fields["base_pledge_rate"] = None + # api_calc_fields["flow_correction"] = None + + if api_calc_fields: + await valuation_controller.update( + valuation_id, + ValuationAssessmentUpdate(**api_calc_fields) + ) + logger.info("valuation.api_calc_fields_updated valuation_id={} fields={}", valuation_id, list(api_calc_fields.keys())) + except Exception as e: + logger.warning("valuation.failed_to_update_api_calc_fields valuation_id={} err={}", valuation_id, repr(e)) + # 计算最终估值A(统一计算),传入估值ID以关联步骤落库 calculation_result = await calculator.calculate_complete_final_value_a(valuation_id, input_data) + + # 步骤2:更新计算结果字段(模型估值B、市场估值C、最终估值AB、完整计算结果) + try: + await valuation_controller.update( + valuation_id, + ValuationAssessmentUpdate( + model_value_b=calculation_result.get('model_value_b'), + market_value_c=calculation_result.get('market_value_c'), + final_value_ab=calculation_result.get('final_value_ab'), + calculation_result=calculation_result, + ) + ) + logger.info( + "valuation.result_updated valuation_id={} model_b={} market_c={} final_ab={}", + valuation_id, + calculation_result.get('model_value_b'), + calculation_result.get('market_value_c'), + calculation_result.get('final_value_ab'), + ) + except Exception as e: + logger.warning("valuation.failed_to_update_result valuation_id={} err={}", valuation_id, repr(e)) # 计算动态质押 drp_c = DynamicPledgeRateCalculator() @@ -176,7 +252,45 @@ async def _perform_valuation_calculation(user_id: int, data: UserValuationCreate ''' # 解析月交易额字符串为数值 monthly_amount = drp_c.parse_monthly_transaction_amount(data.monthly_transaction_amount or "") + drp_start_ts = time.monotonic() drp_result = drp_c.calculate_dynamic_pledge_rate(monthly_amount, data.heritage_asset_level) + drp_duration_ms = int((time.monotonic() - drp_start_ts) * 1000) + + # 记录动态质押率计算步骤 + await valuation_controller.log_formula_step( + valuation_id, + "DYNAMIC_PLEDGE_RATE", + status="completed", + input_params={ + "monthly_transaction_amount": data.monthly_transaction_amount, + "monthly_amount": monthly_amount, + "heritage_asset_level": data.heritage_asset_level, + }, + output_result={ + "dynamic_pledge_rate": drp_result, + "duration_ms": drp_duration_ms, + }, + ) + logger.info("valuation.drp_calculated valuation_id={} drp={} duration_ms={}", valuation_id, drp_result, drp_duration_ms) + + # 步骤3:更新动态质押率及相关字段 + try: + # 从动态质押率计算器中获取基础质押率和流量修正系数 + base_pledge_rate_value = "0.5" # 固定值:基础质押率 = 0.5 + flow_correction_value = "0.3" # 固定值:流量修正系数 = 0.3 + + await valuation_controller.update( + valuation_id, + ValuationAssessmentUpdate( + dynamic_pledge_rate=drp_result, + base_pledge_rate=base_pledge_rate_value, + flow_correction=flow_correction_value, + ) + ) + logger.info("valuation.drp_updated valuation_id={} drp={} base_rate={} flow_correction={}", + valuation_id, drp_result, base_pledge_rate_value, flow_correction_value) + except Exception as e: + logger.warning("valuation.failed_to_update_drp valuation_id={} err={}", valuation_id, repr(e)) # 结构化日志:关键分值 try: @@ -192,33 +306,21 @@ async def _perform_valuation_calculation(user_id: int, data: UserValuationCreate except Exception: pass - # 更新估值评估记录(写入计算结果与输入摘要) - update_data = ValuationAssessmentUpdate( - model_value_b=calculation_result.get('model_value_b'), - market_value_c=calculation_result.get('market_value_c'), - final_value_ab=calculation_result.get('final_value_ab'), - dynamic_pledge_rate=drp_result, - calculation_result=calculation_result, - calculation_input={ - 'model_data': { - 'economic_data': list(input_data.get('model_data', {}).get('economic_data', {}).keys()), - 'cultural_data': list(input_data.get('model_data', {}).get('cultural_data', {}).keys()), - 'risky_data': list(input_data.get('model_data', {}).get('risky_data', {}).keys()), - }, - 'market_data': list(input_data.get('market_data', {}).keys()), - }, - status='success' - ) - result = await valuation_controller.update(valuation_id, update_data) - logger.info( - "valuation.updated valuation_id={} model_b={} market_c={} final_ab={}", - valuation_id, - calculation_result.get('model_value_b'), - calculation_result.get('market_value_c'), - calculation_result.get('final_value_ab'), - ) + # 步骤4:最后更新状态为成功 + try: + result = await valuation_controller.update( + valuation_id, + ValuationAssessmentUpdate( + status='success' + ) + ) + logger.info("valuation.status_updated valuation_id={} status=success", valuation_id) + except Exception as e: + logger.warning("valuation.failed_to_update_status valuation_id={} err={}", valuation_id, repr(e)) + # 即使状态更新失败,也尝试获取结果用于日志 + result = await valuation_controller.get_by_id(valuation_id) - logger.info("valuation.background_calc_success user_id={} valuation_id={}", user_id, result.id) + logger.info("valuation.background_calc_success user_id={} valuation_id={}", user_id, valuation_id) except Exception as e: import traceback @@ -228,8 +330,53 @@ async def _perform_valuation_calculation(user_id: int, data: UserValuationCreate # 计算失败时更新记录为失败状态 try: if 'valuation_id' in locals(): - fail_update = ValuationAssessmentUpdate(status='rejected') - await valuation_controller.update(valuation_id, fail_update) + # 准备失败时需要更新的字段 + fail_update_fields = {"status": "rejected"} + + # 如果 input_data 已经准备好,确保 calculation_input 被更新(即使计算失败) + if 'input_data' in locals(): + fail_update_fields["calculation_input"] = input_data + + # 如果内置API计算字段已经准备好,也尝试更新(即使计算失败) + # 这些字段在步骤1.5中计算,如果步骤1.5执行了,这些变量应该已经存在 + api_calc_fields = {} + if 'esg_score' in locals(): + api_calc_fields["esg_value"] = str(esg_score) if esg_score is not None else None + if 'policy_match_score' in locals(): + api_calc_fields["policy_matching"] = str(policy_match_score) if policy_match_score is not None else None + if 'infringement_score' in locals(): + infringement_record_value = "有侵权记录" if infringement_score == 0.0 else "无侵权记录" + api_calc_fields["infringement_record"] = infringement_record_value + api_calc_fields["legal_risk"] = infringement_record_value + if 'input_data_by_b1' in locals(): + patent_count_value = input_data_by_b1.get("patent_count", 0.0) + api_calc_fields["patent_count"] = str(patent_count_value) if patent_count_value is not None else None + if 'input_data_by_b2' in locals(): + structure_complexity_value = input_data_by_b2.get("structure_complexity", 1.5) + api_calc_fields["pattern_complexity"] = str(structure_complexity_value) if structure_complexity_value is not None else None + normalized_entropy_value = input_data_by_b2.get("normalized_entropy", 9) + api_calc_fields["normalized_entropy"] = str(normalized_entropy_value) if normalized_entropy_value is not None else None + + # 合并所有需要更新的字段 + fail_update_fields.update(api_calc_fields) + + try: + await valuation_controller.update( + valuation_id, + ValuationAssessmentUpdate(**fail_update_fields) + ) + logger.info("valuation.failed_but_fields_saved valuation_id={} fields={}", valuation_id, list(fail_update_fields.keys())) + except Exception as input_err: + logger.warning("valuation.failed_to_save_fields_on_error valuation_id={} err={}", valuation_id, repr(input_err)) + # 如果保存失败,至少更新状态 + try: + fail_update = ValuationAssessmentUpdate(status='rejected') + await valuation_controller.update(valuation_id, fail_update) + except Exception: + pass + else: + # 如果 valuation_id 都不存在,说明在创建记录时就失败了,无法更新 + logger.warning("valuation.failed_before_creation user_id={}", user_id) except Exception as create_error: logger.error("valuation.failed_to_update_record user_id={} err={}", user_id, repr(create_error)) diff --git a/app/schemas/valuation.py b/app/schemas/valuation.py index fa44b24..5b1524c 100644 --- a/app/schemas/valuation.py +++ b/app/schemas/valuation.py @@ -100,10 +100,13 @@ class ValuationAssessmentUpdate(BaseModel): # 非遗等级与技术 inheritor_level: Optional[str] = Field(None, description="非遗传承人等级") + inheritor_ages: Optional[List[int]] = Field(None, description="传承人年龄列表") inheritor_age_count: Optional[List[Any]] = Field(None, description="非遗传承人年龄水平及数量") inheritor_certificates: Optional[List[Any]] = Field(None, description="非遗传承人等级证书") heritage_level: Optional[str] = Field(None, description="非遗等级") + heritage_asset_level: Optional[str] = Field(None, description="非遗资产等级") patent_application_no: Optional[str] = Field(None, description="非遗资产所用专利的申请号") + patent_remaining_years: Optional[str] = Field(None, description="专利剩余年限") historical_evidence: Optional[Dict[str, int]] = Field(None, description="非遗资产历史证明证据及数量") patent_certificates: Optional[List[Any]] = Field(None, description="非遗资产所用专利的证书") pattern_images: Optional[List[Any]] = Field(None, description="非遗纹样图片") @@ -111,22 +114,56 @@ class ValuationAssessmentUpdate(BaseModel): certificate_url: Optional[str] = Field(None, description="证书URL") # 非遗应用与推广 + implementation_stage: Optional[str] = Field(None, description="非遗资产应用成熟度") application_maturity: Optional[str] = Field(None, description="非遗资产应用成熟度") application_coverage: Optional[str] = Field(None, description="非遗资产应用覆盖范围") + coverage_area: Optional[str] = Field(None, description="应用覆盖范围") cooperation_depth: Optional[str] = Field(None, description="非遗资产跨界合作深度") + collaboration_type: Optional[str] = Field(None, description="跨界合作类型") offline_activities: Optional[str] = Field(None, description="近12个月线下相关宣讲活动次数") + offline_teaching_count: Optional[int] = Field(None, description="近12个月线下相关演讲活动次数") online_accounts: Optional[List[Any]] = Field(None, description="线上相关宣传账号信息") + platform_accounts: Optional[Dict[str, Dict[str, Union[str, int]]]] = Field(None, description="线上相关宣传账号信息") # 非遗资产衍生商品信息 sales_volume: Optional[str] = Field(None, description="该商品近12个月销售量") link_views: Optional[str] = Field(None, description="该商品近12个月的链接浏览量") circulation: Optional[str] = Field(None, description="该商品的发行量") + scarcity_level: Optional[str] = Field(None, description="稀缺等级") last_market_activity: Optional[str] = Field(None, description="该商品最近一次市场活动时间") + market_activity_time: Optional[str] = Field(None, description="市场活动的时间") monthly_transaction: Optional[str] = Field(None, description="月交易额") + monthly_transaction_amount: Optional[str] = Field(None, description="月交易额") price_fluctuation: Optional[List[Union[str, int, float]]] = Field(None, description="该商品近30天价格波动区间") + price_range: Optional[Dict[str, Union[int, float]]] = Field(None, description="资产商品的价格波动率") + market_price: Optional[Union[int, float]] = Field(None, description="市场价格(单位:万元)") credit_code_or_id: Optional[str] = Field(None, description="统一社会信用代码或身份证号") biz_intro: Optional[str] = Field(None, description="业务/传承介绍") + # 内置API计算字段 + infringement_record: Optional[str] = Field(None, description="侵权记录") + patent_count: Optional[str] = Field(None, description="专利使用量") + esg_value: Optional[str] = Field(None, description="ESG关联价值") + policy_matching: Optional[str] = Field(None, description="政策匹配度") + online_course_views: Optional[int] = Field(None, description="线上课程点击量") + pattern_complexity: Optional[str] = Field(None, description="结构复杂度") + normalized_entropy: Optional[str] = Field(None, description="归一化信息熵") + legal_risk: Optional[str] = Field(None, description="法律风险-侵权诉讼历史") + base_pledge_rate: Optional[str] = Field(None, description="基础质押率") + flow_correction: Optional[str] = Field(None, description="流量修正系数") + + # 计算结果字段 + model_value_b: Optional[float] = Field(None, description="模型估值B(万元)") + market_value_c: Optional[float] = Field(None, description="市场估值C(万元)") + final_value_ab: Optional[float] = Field(None, description="最终估值AB(万元)") + dynamic_pledge_rate: Optional[float] = Field(None, description="动态质押率") + calculation_result: Optional[Dict[str, Any]] = Field(None, description="完整计算结果JSON") + calculation_input: Optional[Dict[str, Any]] = Field(None, description="计算输入参数JSON") + + # 系统字段 + status: Optional[str] = Field(None, description="评估状态: pending(待审核), success(已通过), fail(已拒绝)") + admin_notes: Optional[str] = Field(None, description="管理员备注") + is_active: Optional[bool] = Field(None, description="是否激活") @field_validator('report_url', 'certificate_url', mode='before')