guzhi/app/api/v1/valuations/valuations.py
邹方成 0021c94024 fix(valuations): 修正估值评估更新方法中的状态设置问题
修复valuation_controller.update方法中状态被硬编码为"pending"的问题,并添加update1方法用于不同的状态更新场景
同时更新dockerignore文件,添加web1/node_modules和migrations目录的忽略
优化发票抬头相关逻辑,包括空字符串处理和发票信息同步
调整发票抬头列表接口,支持分页和用户过滤
2025-11-27 18:01:12 +08:00

243 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, HTTPException, Query, Depends
from typing import Optional, List
from app.controllers.valuation import valuation_controller
from app.schemas.valuation import (
ValuationAssessmentCreate,
ValuationAssessmentUpdate,
ValuationAssessmentOut,
ValuationAssessmentList,
ValuationAssessmentQuery,
ValuationApprovalRequest,
ValuationAdminNotesUpdate,
ValuationCalculationStepOut
)
from app.schemas.base import Success, SuccessExtra, BasicResponse, PageResponse
from app.core.ctx import CTX_USER_ID
valuations_router = APIRouter(tags=["估值评估"])
@valuations_router.post("/", summary="创建估值评估", response_model=BasicResponse[ValuationAssessmentOut])
async def create_valuation(data: ValuationAssessmentCreate):
"""创建新的估值评估记录"""
try:
# 获取当前用户ID
user_id = CTX_USER_ID.get()
print(user_id)
result = await valuation_controller.create(data, user_id)
import json
return Success(data=json.loads(result.model_dump_json()), msg="创建成功")
except Exception as e:
raise HTTPException(status_code=400, detail=f"创建失败: {str(e)}")
@valuations_router.get("/statistics/overview", summary="获取统计信息", response_model=BasicResponse[dict])
async def get_statistics():
"""获取估值评估统计信息"""
result = await valuation_controller.get_statistics()
return Success(data=result, msg="获取统计信息成功")
@valuations_router.get("/{valuation_id}", summary="获取估值评估详情", response_model=BasicResponse[ValuationAssessmentOut])
async def get_valuation(valuation_id: int):
"""根据ID获取估值评估详情"""
result = await valuation_controller.get_by_id(valuation_id)
if not result:
raise HTTPException(status_code=404, detail="估值评估记录不存在")
import json
return Success(data=json.loads(result.model_dump_json()), msg="获取成功")
@valuations_router.get("/{valuation_id}/steps", summary="获取估值计算步骤", response_model=BasicResponse[List[ValuationCalculationStepOut]])
async def get_valuation_steps(valuation_id: int):
"""根据估值ID获取所有计算步骤"""
steps = await valuation_controller.get_calculation_steps(valuation_id)
if not steps:
raise HTTPException(status_code=404, detail="未找到该估值的计算步骤")
import json
steps_out = [json.loads(step.model_dump_json()) for step in steps]
return Success(data=steps_out, msg="获取计算步骤成功")
@valuations_router.get("/{valuation_id}/report", summary="获取估值计算报告Markdown格式")
async def get_valuation_report(valuation_id: int):
"""
根据估值ID生成计算过程的 Markdown 报告
返回格式化的 Markdown 文档,包含:
- 估值基本信息
- 计算结果摘要
- 详细计算过程(按公式层级组织)
- 每个公式的输入参数、输出结果、状态等信息
"""
try:
markdown = await valuation_controller.get_calculation_report_markdown(valuation_id)
from fastapi import Response
return Response(
content=markdown,
media_type="text/markdown; charset=utf-8",
headers={
"Content-Disposition": f'attachment; filename="valuation_report_{valuation_id}.md"'
}
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"生成报告失败: {str(e)}")
@valuations_router.put("/{valuation_id}", summary="更新估值评估", response_model=BasicResponse[ValuationAssessmentOut])
async def update_valuation(valuation_id: int, data: ValuationAssessmentUpdate):
"""更新估值评估记录"""
result = await valuation_controller.update1(valuation_id, data)
if not result:
raise HTTPException(status_code=404, detail="估值评估记录不存在")
import json
return Success(data=json.loads(result.model_dump_json()), msg="更新成功")
@valuations_router.delete("/{valuation_id}", summary="删除估值评估", response_model=BasicResponse[dict])
async def delete_valuation(valuation_id: int):
"""软删除估值评估记录"""
result = await valuation_controller.delete(valuation_id)
if not result:
raise HTTPException(status_code=404, detail="估值评估记录不存在")
return Success(data={"deleted": True}, msg="删除成功")
@valuations_router.get("/", summary="获取估值评估列表", response_model=PageResponse[ValuationAssessmentOut])
async def get_valuations(
asset_name: Optional[str] = Query(None, description="资产名称"),
institution: Optional[str] = Query(None, description="所属机构"),
industry: Optional[str] = Query(None, description="所属行业"),
heritage_level: Optional[str] = Query(None, description="非遗等级"),
status: Optional[str] = Query(None, description="评估状态"),
is_active: Optional[bool] = Query(None, description="是否激活"),
phone: Optional[str] = Query(None, description="手机号模糊查询"),
submitted_start: Optional[str] = Query(None, description="提交时间开始毫秒或ISO"),
submitted_end: Optional[str] = Query(None, description="提交时间结束毫秒或ISO"),
audited_start: Optional[str] = Query(None, description="审核时间开始证书修改时间毫秒或ISO"),
audited_end: Optional[str] = Query(None, description="审核时间结束证书修改时间毫秒或ISO"),
page: int = Query(1, ge=1, description="页码"),
size: int = Query(10, ge=1, le=100, description="每页数量"),
page_size: Optional[int] = Query(None, alias="page_size", ge=1, le=100, description="每页数量")
):
"""获取估值评估列表,支持筛选和分页"""
query = ValuationAssessmentQuery(
asset_name=asset_name,
institution=institution,
industry=industry,
heritage_level=heritage_level,
status=status,
is_active=is_active,
phone=phone,
submitted_start=submitted_start,
submitted_end=submitted_end,
audited_start=audited_start,
audited_end=audited_end,
page=page,
size=page_size if page_size is not None else size
)
result = await valuation_controller.get_list(query)
import json
items = [json.loads(item.model_dump_json()) for item in result.items]
return SuccessExtra(
data=items,
total=result.total,
page=result.page,
page_size=result.size,
pages=result.pages,
msg="获取估值评估列表成功"
)
@valuations_router.get("/search/keyword", summary="搜索估值评估", response_model=PageResponse[ValuationAssessmentOut])
async def search_valuations(
keyword: str = Query(..., description="搜索关键词"),
page: int = Query(1, ge=1, description="页码"),
size: int = Query(10, ge=1, le=100, description="每页数量"),
page_size: Optional[int] = Query(None, alias="page_size", ge=1, le=100, description="每页数量")
):
"""根据关键词搜索估值评估记录"""
result = await valuation_controller.search(keyword, page, page_size if page_size is not None else size)
import json
items = [json.loads(item.model_dump_json()) for item in result.items]
return SuccessExtra(
data=items,
total=result.total,
page=result.page,
page_size=result.size,
pages=result.pages,
msg="搜索成功"
)
# 批量操作接口
@valuations_router.post("/batch/delete", summary="批量删除估值评估", response_model=BasicResponse[dict])
async def batch_delete_valuations(valuation_ids: list[int]):
"""批量软删除估值评估记录"""
success_count = 0
failed_ids = []
for valuation_id in valuation_ids:
result = await valuation_controller.delete(valuation_id)
if result:
success_count += 1
else:
failed_ids.append(valuation_id)
return Success(
data={
"success_count": success_count,
"failed_ids": failed_ids,
"total_count": len(valuation_ids)
},
msg=f"批量删除完成,成功删除 {success_count} 条记录"
)
# 导出接口
@valuations_router.get("/export/excel", summary="导出估值评估数据", response_model=BasicResponse[dict])
async def export_valuations(
asset_name: Optional[str] = Query(None, description="资产名称"),
institution: Optional[str] = Query(None, description="所属机构"),
industry: Optional[str] = Query(None, description="所属行业"),
heritage_level: Optional[str] = Query(None, description="非遗等级")
):
"""导出估值评估数据为Excel文件"""
# 这里可以实现Excel导出逻辑
# 暂时返回提示信息
return Success(data={"message": "导出功能待实现"}, msg="导出请求已接收")
# 审核管理接口
@valuations_router.post("/{valuation_id}/approve", summary="审核通过估值评估", response_model=BasicResponse[ValuationAssessmentOut])
async def approve_valuation(valuation_id: int, data: ValuationApprovalRequest):
"""审核通过估值评估"""
result = await valuation_controller.approve_valuation(valuation_id, data.admin_notes)
if not result:
raise HTTPException(status_code=404, detail="估值评估记录不存在")
import json
return Success(data=json.loads(result.model_dump_json()), msg="审核通过成功")
@valuations_router.post("/{valuation_id}/reject", summary="审核拒绝估值评估", response_model=BasicResponse[ValuationAssessmentOut])
async def reject_valuation(valuation_id: int, data: ValuationApprovalRequest):
"""审核拒绝估值评估"""
result = await valuation_controller.reject_valuation(valuation_id, data.admin_notes)
if not result:
raise HTTPException(status_code=404, detail="估值评估记录不存在")
import json
return Success(data=json.loads(result.model_dump_json()), msg="审核拒绝成功")
@valuations_router.put("/{valuation_id}/admin-notes", summary="更新管理员备注", response_model=BasicResponse[ValuationAssessmentOut])
async def update_admin_notes(valuation_id: int, data: ValuationAdminNotesUpdate):
"""更新管理员备注"""
result = await valuation_controller.update_admin_notes(valuation_id, data.admin_notes)
if not result:
raise HTTPException(status_code=404, detail="估值评估记录不存在")
import json
return Success(data=json.loads(result.model_dump_json()), msg="管理员备注更新成功")