guzhi/app/controllers/valuation.py
2025-12-04 14:44:23 +08:00

881 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from typing import Any, Dict, List, Optional
from tortoise.expressions import Q
from tortoise.queryset import QuerySet
from tortoise.functions import Count
from app.models.valuation import ValuationAssessment, ValuationCalculationStep
from app.schemas.valuation import (
ValuationAssessmentCreate,
ValuationAssessmentUpdate,
ValuationAssessmentQuery,
ValuationAssessmentOut,
ValuationAssessmentList,
ValuationCalculationStepCreate,
ValuationCalculationStepOut
)
from app.models.user import AppUser
from app.utils.calculation_engine.formula_registry import get_formula_meta
class ValuationController:
"""估值评估控制器"""
model = ValuationAssessment
step_model = ValuationCalculationStep
# 参数说明映射表:将参数名(英文)映射到中文说明
PARAM_DESCRIPTIONS = {
# 财务价值相关
"three_year_income": "近三年收益(万元)",
"annual_revenue_3_years": "近三年收益(万元)",
"financial_value_f": "财务价值F",
# 法律强度相关
"patent_score": "专利分",
"popularity_score": "普及分",
"infringement_score": "侵权分",
"legal_strength_l": "法律强度L",
# 发展潜力相关
"patent_count": "专利数量",
"esg_score": "ESG分",
"innovation_ratio": "创新投入比",
"development_potential_d": "发展潜力D",
# 行业系数
"industry_coefficient": "行业系数I",
"target_industry_roe": "目标行业ROE",
"benchmark_industry_roe": "基准行业ROE",
# 流量因子相关
"search_index_s1": "搜索指数S1",
"industry_average_s2": "行业均值S2",
"social_media_spread_s3": "社交媒体传播度S3",
"likes": "点赞数",
"comments": "评论数",
"shares": "转发数",
"sales_volume": "销售量",
"link_views": "链接浏览量",
# 政策乘数相关
"implementation_stage": "实施阶段评分",
"funding_support": "资金支持度",
"policy_match_score": "政策匹配度",
# 文化价值相关
"inheritor_level_coefficient": "传承人等级系数",
"offline_sessions": "线下传习次数",
"douyin_views": "抖音浏览量",
"bilibili_views": "B站浏览量",
"kuaishou_views": "快手浏览量",
"cross_border_depth": "跨界合作深度",
"historical_inheritance": "历史传承度HI",
"structure_complexity": "结构复杂度SC",
"normalized_entropy": "归一化信息熵H",
# 风险调整相关
"highest_price": "最高价格",
"lowest_price": "最低价格",
"inheritor_ages": "传承人年龄列表",
"lawsuit_status": "诉讼状态",
# 市场估值相关
"manual_bids": "手动竞价列表",
"expert_valuations": "专家估值列表",
"weighted_average_price": "加权平均价格",
"daily_browse_volume": "日均浏览量",
"collection_count": "收藏数",
"issuance_level": "发行量",
"recent_market_activity": "最近市场活动时间",
# 动态质押率相关
"monthly_transaction_amount": "月交易额(万元)",
"monthly_amount": "月交易额(万元)",
"heritage_asset_level": "非遗等级",
"dynamic_pledge_rate": "动态质押率",
"base_pledge_rate": "基础质押率",
"flow_correction": "流量修正系数",
}
async def create_calculation_step(self, data: ValuationCalculationStepCreate) -> ValuationCalculationStepOut:
"""
创建估值计算步骤
Args:
data (ValuationCalculationStepCreate): 估值计算步骤数据
Returns:
ValuationCalculationStepOut: 创建的估值计算步骤
"""
step = await self.step_model.create(**data.model_dump())
logger.info(
"calcstep.create valuation_id={} order={} name={}",
data.valuation_id,
data.step_order,
data.step_name,
)
return ValuationCalculationStepOut.model_validate(step)
async def log_formula_step(
self,
valuation_id: int,
formula_code: str,
*,
status: str = "processing",
input_params: Optional[Dict[str, Any]] = None,
output_result: Optional[Dict[str, Any]] = None,
error_message: Optional[str] = None,
step_description: Optional[str] = None,
duration_ms: Optional[int] = None,
) -> ValuationCalculationStepOut:
"""
幂等记录(或更新)某个公式节点的计算过程。
"""
meta = get_formula_meta(formula_code)
description = step_description or meta.formula
create_payload: Dict[str, Any] = {
"valuation_id": valuation_id,
"formula_code": meta.code,
"formula_name": meta.name,
"formula_text": meta.formula,
"parent_formula_code": meta.parent_code,
"group_code": meta.group_code,
"step_order": meta.order,
"step_name": meta.name,
"step_description": description,
"status": status,
}
if input_params is not None:
create_payload["input_params"] = input_params
if output_result is not None:
create_payload["output_result"] = output_result
if error_message is not None:
create_payload["error_message"] = error_message
# 准备更新字段
update_fields: Dict[str, Any] = {
"status": status,
"step_description": description,
"formula_name": meta.name,
"formula_text": meta.formula,
"parent_formula_code": meta.parent_code,
"group_code": meta.group_code,
"step_order": meta.order,
"step_name": meta.name,
}
if input_params is not None:
update_fields["input_params"] = input_params
if output_result is not None:
update_fields["output_result"] = output_result
if error_message is not None:
update_fields["error_message"] = error_message
if duration_ms is not None:
result = update_fields.get("output_result") or {}
if not isinstance(result, dict):
result = {}
result["duration_ms"] = duration_ms
update_fields["output_result"] = result
# 先尝试查询是否存在(明确排除 formula_code 为 NULL 的情况)
step = await self.step_model.filter(
valuation_id=valuation_id,
formula_code=meta.code
).first()
# 如果没找到,再检查是否有 formula_code 为 NULL 的旧记录(不应该有,但为了安全)
if not step and meta.code:
# 检查是否有重复的旧记录formula_code 为 NULL
old_steps = await self.step_model.filter(
valuation_id=valuation_id,
formula_code__isnull=True
).all()
if old_steps:
logger.warning(
"calcstep.log_formula found old records with NULL formula_code: valuation_id={} count={}",
valuation_id,
len(old_steps),
)
logger.info(
"calcstep.log_formula query: valuation_id={} formula_code={} found={}",
valuation_id,
meta.code,
step is not None,
)
if step:
# 更新现有记录
await step.update_from_dict(update_fields).save()
logger.info(
"calcstep.log_formula updated valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
else:
# 尝试创建新记录
if duration_ms is not None:
result = create_payload.setdefault("output_result", {}) or {}
if not isinstance(result, dict):
result = {}
result["duration_ms"] = duration_ms
create_payload["output_result"] = result
try:
step = await self.step_model.create(**create_payload)
logger.info(
"calcstep.log_formula created valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
except Exception as e:
# 如果因为唯一约束冲突而失败(可能是并发插入),重新查询并更新
error_str = str(e).lower()
if "duplicate" in error_str or "unique" in error_str or "1062" in error_str:
logger.warning(
"calcstep.log_formula duplicate key detected, retrying query: {}",
str(e),
)
# 重新查询(可能已被其他请求插入)
step = await self.step_model.filter(
valuation_id=valuation_id,
formula_code=meta.code
).first()
if step:
# 更新刚插入的记录
await step.update_from_dict(update_fields).save()
logger.info(
"calcstep.log_formula updated after duplicate key: valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
else:
# 如果还是找不到,记录错误但继续
logger.error(
"calcstep.log_formula failed to find record after duplicate key error: valuation_id={} formula_code={}",
valuation_id,
meta.code,
)
raise
else:
# 其他错误直接抛出
raise
return ValuationCalculationStepOut.model_validate(step)
async def update_calculation_step(self, step_id: int, update: dict) -> ValuationCalculationStepOut:
step = await self.step_model.filter(id=step_id).first()
if not step:
raise ValueError(f"calculation_step not found: {step_id}")
await step.update_from_dict(update).save()
logger.info(
"calcstep.update id={} fields={}",
step_id,
list(update.keys()),
)
return ValuationCalculationStepOut.model_validate(step)
async def get_calculation_steps(self, valuation_id: int) -> List[ValuationCalculationStepOut]:
"""
根据估值ID获取所有相关的计算步骤。
此方法从数据库中检索与特定估值ID关联的所有计算步骤记录
并按创建时间升序排序,确保步骤的顺序正确。
Args:
valuation_id (int): 估值的唯一标识符。
Returns:
List[ValuationCalculationStepOut]: 一个包含所有相关计算步骤的列表,
如果找不到任何步骤,则返回空列表。
"""
steps = await self.step_model.filter(valuation_id=valuation_id).order_by('created_at')
logger.info("calcstep.list valuation_id={} count={}", valuation_id, len(steps))
return [ValuationCalculationStepOut.model_validate(step) for step in steps]
async def get_calculation_report_markdown(self, valuation_id: int) -> str:
"""
根据估值ID生成计算过程的 Markdown 报告。
此方法会查询所有相关的计算步骤,按照公式顺序组织,
并生成格式化的 Markdown 文档,包含:
- 公式名称
- 输入参数
- 公式文本
- 输出结果
Args:
valuation_id (int): 估值的唯一标识符。
Returns:
str: Markdown 格式的计算报告。
Raises:
ValueError: 如果找不到对应的估值记录。
"""
# 验证估值记录是否存在
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
raise ValueError(f"估值记录不存在: {valuation_id}")
# 获取所有计算步骤,按 step_order 排序
steps = await self.step_model.filter(valuation_id=valuation_id).order_by('step_order')
if not steps:
return f"# 计算摘要\n\n**估值ID**: {valuation_id}\n\n**资产名称**: {valuation.asset_name}\n\n> 暂无计算步骤记录。\n"
# 转换为字典列表,便于处理
steps_data = []
for step in steps:
step_dict = ValuationCalculationStepOut.model_validate(step).model_dump()
steps_data.append(step_dict)
# 生成 Markdown
markdown = self._generate_markdown(valuation, steps_data)
logger.info("calcstep.report_markdown generated valuation_id={} steps_count={}", valuation_id, len(steps_data))
return markdown
def _build_formula_tree(self, steps: List[Dict]) -> Dict:
"""
构建公式的树形结构。
Args:
steps: 计算步骤列表。
Returns:
Dict: 树形结构的字典key 为 formula_codevalue 为步骤数据和子节点。
"""
# 按 formula_code 索引
step_map = {}
for step in steps:
code = step.get('formula_code')
if code:
step_map[code] = step
# 构建树形结构
tree = {}
processed = set()
# 第一遍:创建所有节点
for step in steps:
code = step.get('formula_code')
if not code or code in processed:
continue
node = {
'step': step,
'children': []
}
tree[code] = node
processed.add(code)
# 第二遍:建立父子关系
root_nodes = []
for step in steps:
code = step.get('formula_code')
if not code:
continue
parent_code = step.get('parent_formula_code')
node = tree[code]
if parent_code and parent_code in tree:
# 有父节点,添加到父节点的 children
tree[parent_code]['children'].append(node)
else:
# 根节点
root_nodes.append(node)
# 按 step_order 排序
def sort_nodes(nodes):
nodes.sort(key=lambda n: float(n['step'].get('step_order', 0)))
for node in nodes:
if node['children']:
sort_nodes(node['children'])
sort_nodes(root_nodes)
return {'roots': root_nodes, 'all': tree}
def _generate_markdown(self, valuation, steps_data: List[Dict]) -> str:
"""
生成 Markdown 格式的报告。
Args:
valuation: 估值评估对象。
steps_data: 计算步骤列表(已按 step_order 排序)。
Returns:
str: Markdown 格式的字符串。
"""
lines = []
# 标题
lines.append("# 计算摘要")
lines.append("")
lines.append("")
# 遍历所有步骤,按顺序生成
for step in steps_data:
name = step.get('formula_name', step.get('step_name', '未知'))
formula_text = step.get('formula_text', step.get('step_description', ''))
input_params = step.get('input_params')
output_result = step.get('output_result')
# 公式标题(二级标题)
lines.append(f"## {name}")
lines.append("")
# 参数部分
if input_params:
lines.append("**参数:**")
lines.append("")
# 格式化参数显示
param_lines = self._format_params(input_params)
lines.extend(param_lines)
lines.append("")
# 公式部分
if formula_text:
lines.append("**公式:**")
lines.append("")
lines.append("```")
lines.append(formula_text)
lines.append("```")
lines.append("")
# 结果部分
if output_result:
# 提取主要结果值
result_value = self._extract_main_result(output_result, name)
if result_value is not None:
lines.append("**结果:**")
lines.append("")
lines.append(f"`{result_value}`")
lines.append("")
lines.append("")
return "\n".join(lines)
def _format_params(self, params: Dict[str, Any]) -> List[str]:
"""
格式化参数显示,优先使用列表格式(如果是数组),否则显示为列表项。
参数名会附带中文说明(如果存在)。
Args:
params: 参数字典
Returns:
List[str]: 格式化后的参数行列表
"""
lines = []
def _get_param_label(key: str) -> str:
"""获取参数标签,包含中文说明"""
description = self.PARAM_DESCRIPTIONS.get(key)
if description:
return f"{key}{description}"
return key
# 如果参数只有一个键,且值是数组,直接显示数组(不带参数名,符合示例格式)
if len(params) == 1:
key, value = next(iter(params.items()))
if isinstance(value, (list, tuple)):
# 格式化为列表:- [12.2, 13.2, 14.2]
value_str = json.dumps(list(value), ensure_ascii=False)
lines.append(f"- {value_str}")
return lines
# 多个参数或非数组,显示为列表项(带说明)
for key, value in params.items():
param_label = _get_param_label(key)
if isinstance(value, (list, tuple)):
value_str = json.dumps(list(value), ensure_ascii=False)
lines.append(f"- **{param_label}**: {value_str}")
elif isinstance(value, dict):
value_str = json.dumps(value, ensure_ascii=False)
lines.append(f"- **{param_label}**: {value_str}")
else:
lines.append(f"- **{param_label}**: {value}")
return lines
def _extract_main_result(self, output_result: Dict[str, Any], formula_name: str) -> Optional[str]:
"""
从输出结果中提取主要结果值。
优先顺序:
1. 如果结果中只有一个数值类型的值,返回该值
2. 如果结果中包含与公式名称相关的字段(如 "财务价值 F" -> "financial_value_f"),返回该值
3. 如果结果中包含常见的计算结果字段(如 "result", "value", "output"),返回该值
4. 返回第一个数值类型的值
Args:
output_result: 输出结果字典
formula_name: 公式名称
Returns:
Optional[str]: 主要结果值的字符串表示,如果找不到则返回 None
"""
if not output_result or not isinstance(output_result, dict):
return None
# 移除 duration_ms 等元数据字段
filtered_result = {k: v for k, v in output_result.items()
if k not in ['duration_ms', 'duration', 'timestamp', 'status']}
if not filtered_result:
return None
# 如果只有一个值,直接返回
if len(filtered_result) == 1:
value = next(iter(filtered_result.values()))
if isinstance(value, (int, float)):
return str(value)
elif isinstance(value, (list, tuple)) and len(value) == 1:
return str(value[0])
else:
return json.dumps(value, ensure_ascii=False)
# 尝试根据公式名称匹配字段
# 例如:"财务价值 F" -> 查找 "financial_value_f", "财务价值F" 等
# 提取公式名称中的关键部分(通常是最后一个字母或单词)
name_parts = formula_name.split()
if name_parts:
# 获取最后一个部分(通常是字母,如 "F", "L", "D"
last_part = name_parts[-1].lower()
# 构建可能的字段名:如 "financial_value_f", "legal_strength_l" 等
# 将中文名称转换为可能的英文字段名模式
possible_keys = []
# 1. 直接匹配包含最后部分的字段(如包含 "f", "l", "d"
for key in filtered_result.keys():
if last_part in key.lower() or key.lower().endswith(f"_{last_part}"):
possible_keys.append(key)
# 2. 尝试匹配常见的命名模式
# 例如:"财务价值 F" -> "financial_value_f"
# 这里我们尝试匹配以最后部分结尾的字段
suffix_patterns = [
f"_{last_part}",
f"_{last_part}_",
last_part,
]
for key in filtered_result.keys():
key_lower = key.lower()
for pattern in suffix_patterns:
if key_lower.endswith(pattern) or pattern in key_lower:
if key not in possible_keys:
possible_keys.append(key)
# 按优先级匹配
for key in possible_keys:
if key in filtered_result:
value = filtered_result[key]
if isinstance(value, (int, float)):
return str(value)
# 查找常见的结果字段
common_result_keys = ['result', 'value', 'output', 'final_value', 'calculated_value']
for key in common_result_keys:
if key in filtered_result:
value = filtered_result[key]
if isinstance(value, (int, float)):
return str(value)
# 返回第一个数值类型的值
for key, value in filtered_result.items():
if isinstance(value, (int, float)):
return str(value)
# 如果都没有,返回整个结果的 JSON但简化显示
return json.dumps(filtered_result, ensure_ascii=False)
async def create(self, data: ValuationAssessmentCreate, user_id: int) -> ValuationAssessmentOut:
"""创建估值评估"""
# 将用户ID添加到数据中
create_data = data.model_dump()
create_data['user_id'] = user_id
valuation = await self.model.create(**create_data)
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def get_by_id(self, valuation_id: int) -> Optional[ValuationAssessmentOut]:
"""根据ID获取估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if valuation:
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
return None
async def update(self, valuation_id: int, data: ValuationAssessmentUpdate) -> Optional[ValuationAssessmentOut]:
"""更新估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
update_data = data.model_dump(exclude_unset=True)
if update_data:
if 'certificate_url' in update_data and update_data.get('certificate_url'):
from datetime import datetime
update_data['audited_at'] = datetime.now()
update_data['updated_at'] = datetime.now()
else:
from datetime import datetime
update_data['updated_at'] = datetime.now()
await valuation.update_from_dict(update_data)
await valuation.save()
from datetime import datetime
valuation.status ="pending"
if not getattr(valuation, "audited_at", None):
valuation.audited_at = datetime.now()
valuation.updated_at = datetime.now()
await valuation.save()
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def update1(self, valuation_id: int, data: ValuationAssessmentUpdate) -> Optional[ValuationAssessmentOut]:
"""更新估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
update_data = data.model_dump(exclude_unset=True)
if update_data:
if 'certificate_url' in update_data and update_data.get('certificate_url'):
from datetime import datetime
update_data['audited_at'] = datetime.now()
update_data['updated_at'] = datetime.now()
else:
from datetime import datetime
update_data['updated_at'] = datetime.now()
await valuation.update_from_dict(update_data)
await valuation.save()
from datetime import datetime
valuation.status ="success"
if not getattr(valuation, "audited_at", None):
valuation.audited_at = datetime.now()
valuation.updated_at = datetime.now()
await valuation.save()
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def delete(self, valuation_id: int) -> bool:
"""软删除估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return False
valuation.is_active = False
await valuation.save()
return True
async def get_list(self, query: ValuationAssessmentQuery) -> ValuationAssessmentList:
"""获取估值评估列表"""
queryset = self._build_query(query)
# 计算总数
total = await queryset.count()
# 分页查询
offset = (query.page - 1) * query.size
valuations = await queryset.offset(offset).limit(query.size).order_by('-created_at')
# 转换为输出模型
items = [ValuationAssessmentOut.model_validate(v) for v in valuations]
items = await self._attach_user_phone_bulk(items)
# 计算总页数
pages = (total + query.size - 1) // query.size
return ValuationAssessmentList(
items=items,
total=total,
page=query.page,
size=query.size,
pages=pages
)
def _build_query(self, query: ValuationAssessmentQuery) -> QuerySet:
"""构建查询条件"""
queryset = self.model.filter(is_active=True)
if query.asset_name:
queryset = queryset.filter(asset_name__icontains=query.asset_name)
if query.institution:
queryset = queryset.filter(institution__icontains=query.institution)
if query.industry:
queryset = queryset.filter(industry__icontains=query.industry)
if query.heritage_level:
queryset = queryset.filter(heritage_level__icontains=query.heritage_level)
if query.is_active is not None:
queryset = queryset.filter(is_active=query.is_active)
# 添加状态筛选
if hasattr(query, 'status') and query.status:
queryset = queryset.filter(status=query.status)
if getattr(query, 'phone', None):
queryset = queryset.filter(user__phone__icontains=query.phone)
def _parse_time(v: Optional[str]):
if not v:
return None
try:
iv = int(v)
from datetime import datetime
return datetime.fromtimestamp(iv / 1000)
except Exception:
try:
from datetime import datetime
return datetime.fromisoformat(v)
except Exception:
return None
s_dt = _parse_time(getattr(query, 'submitted_start', None))
e_dt = _parse_time(getattr(query, 'submitted_end', None))
if s_dt:
queryset = queryset.filter(created_at__gte=s_dt)
if e_dt:
queryset = queryset.filter(created_at__lte=e_dt)
a_s_dt = _parse_time(getattr(query, 'audited_start', None))
a_e_dt = _parse_time(getattr(query, 'audited_end', None))
if a_s_dt:
queryset = queryset.filter(updated_at__gte=a_s_dt)
if a_e_dt:
queryset = queryset.filter(updated_at__lte=a_e_dt)
return queryset
async def get_statistics(self) -> dict:
"""获取统计信息"""
total_count = await self.model.filter(is_active=True).count()
# 按行业统计
industry_stats = await self.model.filter(is_active=True).group_by('industry').annotate(count=Count('id')).values('industry', 'count')
# 按非遗等级统计
heritage_level_stats = await self.model.filter(
is_active=True,
heritage_level__isnull=False
).group_by('heritage_level').annotate(count=Count('id')).values('heritage_level', 'count')
return {
'total_count': total_count,
'industry_distribution': industry_stats,
'heritage_level_distribution': heritage_level_stats
}
async def search(self, keyword: str, page: int = 1, size: int = 10) -> ValuationAssessmentList:
"""全文搜索"""
queryset = self.model.filter(
Q(asset_name__icontains=keyword) |
Q(institution__icontains=keyword) |
Q(industry__icontains=keyword) |
Q(heritage_level__icontains=keyword),
is_active=True
)
# 计算总数
total = await queryset.count()
# 分页查询
offset = (page - 1) * size
valuations = await queryset.offset(offset).limit(size).order_by('-created_at')
# 转换为输出模型
items = [ValuationAssessmentOut.model_validate(v) for v in valuations]
items = await self._attach_user_phone_bulk(items)
# 计算总页数
pages = (total + size - 1) // size
return ValuationAssessmentList(
items=items,
total=total,
page=page,
size=size,
pages=pages
)
async def approve_valuation(self, valuation_id: int, admin_notes: Optional[str] = None) -> Optional[ValuationAssessmentOut]:
"""审核通过估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
from datetime import datetime
update_data = {"status": "success", "audited_at": datetime.now(), "updated_at": datetime.now()}
if admin_notes:
update_data["admin_notes"] = admin_notes
await valuation.update_from_dict(update_data).save()
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def reject_valuation(self, valuation_id: int, admin_notes: Optional[str] = None) -> Optional[ValuationAssessmentOut]:
"""审核拒绝估值评估"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
from datetime import datetime
update_data = {"status": "rejected", "audited_at": datetime.now(), "updated_at": datetime.now()}
if admin_notes:
update_data["admin_notes"] = admin_notes
await valuation.update_from_dict(update_data).save()
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def update_admin_notes(self, valuation_id: int, admin_notes: str) -> Optional[ValuationAssessmentOut]:
"""更新管理员备注"""
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
from datetime import datetime
await valuation.update_from_dict({"admin_notes": admin_notes, "updated_at": datetime.now()}).save()
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def update_calc(self, valuation_id: int, data: ValuationAssessmentUpdate) -> Optional[ValuationAssessmentOut]:
valuation = await self.model.filter(id=valuation_id, is_active=True).first()
if not valuation:
return None
update_data = data.model_dump(exclude_unset=True)
valuation.status ="pending"
if update_data:
await valuation.update_from_dict(update_data)
await valuation.save()
out = ValuationAssessmentOut.model_validate(valuation)
return await self._attach_user_phone(out)
async def _attach_user_phone(self, out: ValuationAssessmentOut) -> ValuationAssessmentOut:
user = await AppUser.filter(id=out.user_id).first()
out.user_phone = getattr(user, "phone", None) if user else None
return out
async def _attach_user_phone_bulk(self, items: List[ValuationAssessmentOut]) -> List[ValuationAssessmentOut]:
ids = list({item.user_id for item in items if item.user_id})
if not ids:
return items
users = await AppUser.filter(id__in=ids).values("id", "phone")
phone_map = {u["id"]: u["phone"] for u in users}
for item in items:
item.user_phone = phone_map.get(item.user_id)
return items
# 创建控制器实例
valuation_controller = ValuationController()
from app.log import logger