#!/usr/bin/env python3 """ 测试动态默认值计算逻辑 """ import sys import os import asyncio # 添加项目根目录到 Python 路径 project_root = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, project_root) from app.utils.calculation_engine.market_value_c.market_data_analyzer import MarketDataAnalyzer async def test_market_data_analyzer(): """测试市场数据分析器""" print("=== 测试市场数据分析器 ===") analyzer = MarketDataAnalyzer() # 测试获取近期交易中位数 print("\n1. 测试获取近期交易中位数:") try: median_price = await analyzer.get_recent_transaction_median(days=30) print(f" 近30天交易中位数: {median_price}万元") except Exception as e: print(f" 获取交易中位数失败: {e}") # 测试自适应默认值计算 print("\n2. 测试自适应默认值计算:") test_cases = [ ("限量", "文化艺术"), ("普通", "科技创新"), ("稀有", "传统工艺"), ("", "") # 空值测试 ] for issuance_level, asset_type in test_cases: try: adaptive_price = analyzer.calculate_adaptive_default_value(asset_type, issuance_level) print(f" 发行级别: {issuance_level or '未知'}, 资产类型: {asset_type or '未知'} -> {adaptive_price}万元") except Exception as e: print(f" 计算自适应默认值失败 ({issuance_level}, {asset_type}): {e}") async def test_market_value_c_integration(): """测试市场估值C的集成""" print("\n=== 测试市场估值C集成 ===") from app.utils.calculation_engine.market_value_c.market_value_c import MarketValueCCalculator calculator = MarketValueCCalculator() # 测试数据:没有提供 average_transaction_price test_data = { 'daily_browse_volume': 500.0, 'collection_count': 50, 'issuance_level': '限量', 'recent_market_activity': '近一周' } print(f"\n测试输入数据: {test_data}") try: result = await calculator.calculate_complete_market_value_c(test_data) print(f"\n计算结果:") print(f" 市场估值C: {result['market_value_c']}万元") print(f" 市场竞价C1: {result['market_bidding_c1']}万元") print(f" 热度系数C2: {result['heat_coefficient_c2']}") print(f" 稀缺性乘数C3: {result['scarcity_multiplier_c3']}") print(f" 时间衰减C4: {result['temporal_decay_c4']}") except Exception as e: print(f"计算失败: {e}") import traceback traceback.print_exc() async def main(): """主测试函数""" print("开始测试动态默认值计算逻辑...") await test_market_data_analyzer() await test_market_value_c_integration() print("\n测试完成!") if __name__ == "__main__": asyncio.run(main())