guzhi/app/api/v1/app_invoices/app_invoices.py
邹方成 c690a95cab feat: 新增发票管理模块和用户端接口
refactor: 优化响应格式和错误处理

fix: 修复文件上传类型校验和删除无用PDF文件

perf: 添加估值评估审核时间字段和查询条件

docs: 更新Docker镜像版本至v1.8

test: 添加响应格式检查脚本

style: 统一API响应数据结构

chore: 清理无用静态文件和更新构建脚本
2025-11-24 16:39:53 +08:00

78 lines
3.8 KiB
Python

from fastapi import APIRouter, Query, Depends
from typing import Optional
from app.schemas.base import Success, SuccessExtra, BasicResponse, PageResponse
from app.schemas.invoice import InvoiceOut, InvoiceHeaderOut, InvoiceHeaderCreate, InvoiceHeaderUpdate
from app.controllers.invoice import invoice_controller
from app.utils.app_user_jwt import get_current_app_user
from app.models.user import AppUser
app_invoices_router = APIRouter(tags=["app-发票管理"])
@app_invoices_router.get("/list", summary="我的发票列表", response_model=PageResponse[InvoiceOut])
async def get_my_invoices(
status: Optional[str] = Query(None),
ticket_type: Optional[str] = Query(None),
invoice_type: Optional[str] = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100),
current_user: AppUser = Depends(get_current_app_user),
):
result = await invoice_controller.list(
page=page,
page_size=page_size,
status=status,
ticket_type=ticket_type,
invoice_type=invoice_type,
app_user_id=current_user.id,
)
return SuccessExtra(
data=result.items,
total=result.total,
page=result.page,
page_size=result.page_size,
msg="获取成功",
)
@app_invoices_router.get("/headers", summary="我的发票抬头", response_model=BasicResponse[list[InvoiceHeaderOut]])
async def get_my_headers(current_user: AppUser = Depends(get_current_app_user)):
headers = await invoice_controller.get_headers(user_id=current_user.id)
return Success(data=headers, msg="获取成功")
@app_invoices_router.get("/headers/{id}", summary="我的发票抬头详情", response_model=BasicResponse[InvoiceHeaderOut])
async def get_my_header_by_id(id: int, current_user: AppUser = Depends(get_current_app_user)):
header = await invoice_controller.get_header_by_id(id)
if not header or getattr(header, "id", None) is None:
return Success(data={}, msg="未找到")
# 仅允许访问属于自己的抬头
if getattr(header, "app_user_id", None) not in (current_user.id, None):
return Success(data={}, msg="未找到")
return Success(data=header, msg="获取成功")
@app_invoices_router.post("/headers", summary="新增我的发票抬头", response_model=BasicResponse[InvoiceHeaderOut])
async def create_my_header(data: InvoiceHeaderCreate, current_user: AppUser = Depends(get_current_app_user)):
header = await invoice_controller.create_header(user_id=current_user.id, data=data)
return Success(data=header, msg="创建成功")
@app_invoices_router.put("/headers/{id}", summary="更新我的发票抬头", response_model=BasicResponse[InvoiceHeaderOut])
async def update_my_header(id: int, data: InvoiceHeaderUpdate, current_user: AppUser = Depends(get_current_app_user)):
existing = await invoice_controller.get_header_by_id(id)
if not existing or getattr(existing, "id", None) is None:
return Success(data={}, msg="未找到")
if getattr(existing, "app_user_id", None) != current_user.id:
return Success(data={}, msg="未找到")
header = await invoice_controller.update_header(id, data)
return Success(data=header or {}, msg="更新成功" if header else "未找到")
@app_invoices_router.delete("/headers/{id}", summary="删除我的发票抬头", response_model=BasicResponse[dict])
async def delete_my_header(id: int, current_user: AppUser = Depends(get_current_app_user)):
existing = await invoice_controller.get_header_by_id(id)
if not existing or getattr(existing, "id", None) is None:
return Success(data={"deleted": False}, msg="未找到")
if getattr(existing, "app_user_id", None) != current_user.id:
return Success(data={"deleted": False}, msg="未找到")
ok = await invoice_controller.delete_header(id)
return Success(data={"deleted": ok}, msg="删除成功" if ok else "未找到")