guzhi/app/api/v1/invoice/invoice.py
邹方成 c690a95cab feat: 新增发票管理模块和用户端接口
refactor: 优化响应格式和错误处理

fix: 修复文件上传类型校验和删除无用PDF文件

perf: 添加估值评估审核时间字段和查询条件

docs: 更新Docker镜像版本至v1.8

test: 添加响应格式检查脚本

style: 统一API响应数据结构

chore: 清理无用静态文件和更新构建脚本
2025-11-24 16:39:53 +08:00

204 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Query, Depends, Header, HTTPException
from typing import Optional
from app.schemas.base import Success, SuccessExtra, BasicResponse, PageResponse, MessageOut
from app.schemas.invoice import (
InvoiceCreate,
InvoiceUpdate,
UpdateStatus,
UpdateType,
InvoiceHeaderCreate,
InvoiceHeaderUpdate,
PaymentReceiptCreate,
InvoiceOut,
InvoiceList,
InvoiceHeaderOut,
PaymentReceiptOut,
)
from app.controllers.invoice import invoice_controller
from app.utils.app_user_jwt import get_current_app_user
from app.core.dependency import DependAuth, DependPermission
from app.models.user import AppUser
invoice_router = APIRouter(tags=["发票管理"])
@invoice_router.get("/list", summary="获取发票列表", response_model=PageResponse[InvoiceOut], dependencies=[DependAuth, DependPermission])
async def list_invoices(
phone: Optional[str] = Query(None),
company_name: Optional[str] = Query(None),
tax_number: Optional[str] = Query(None),
status: Optional[str] = Query(None),
ticket_type: Optional[str] = Query(None),
invoice_type: Optional[str] = Query(None),
user_id: Optional[int] = Query(None, description="按App用户ID过滤"),
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100),
):
"""
发票列表查询
参数支持按手机号、公司名称、税号、状态、发票类型进行筛选
返回分页结构
"""
result = await invoice_controller.list(
page=page,
page_size=page_size,
phone=phone,
company_name=company_name,
tax_number=tax_number,
status=status,
ticket_type=ticket_type,
invoice_type=invoice_type,
app_user_id=user_id,
)
return SuccessExtra(
data=result.items, total=result.total, page=result.page, page_size=result.page_size, msg="获取成功"
)
@invoice_router.get("/detail", summary="发票详情", response_model=BasicResponse[InvoiceOut], dependencies=[DependAuth, DependPermission])
async def invoice_detail(id: int = Query(...)):
"""
根据ID获取发票详情
"""
out = await invoice_controller.get_out(id)
if not out:
return Success(data={}, msg="未找到")
return Success(data=out, msg="获取成功")
@invoice_router.post("/create", summary="创建发票", response_model=BasicResponse[InvoiceOut], dependencies=[DependAuth, DependPermission])
async def create_invoice(data: InvoiceCreate):
"""
创建发票记录
"""
inv = await invoice_controller.create(data)
out = await invoice_controller.get_out(inv.id)
return Success(data=out, msg="创建成功")
@invoice_router.post("/update", summary="更新发票", response_model=BasicResponse[InvoiceOut], dependencies=[DependAuth, DependPermission])
async def update_invoice(data: InvoiceUpdate, id: int = Query(...)):
"""
更新发票记录
"""
updated = await invoice_controller.update(id, data)
out = await invoice_controller.get_out(id) if updated else None
return Success(data=out or {}, msg="更新成功" if updated else "未找到")
@invoice_router.delete("/delete", summary="删除发票", response_model=BasicResponse[MessageOut], dependencies=[DependAuth, DependPermission])
async def delete_invoice(id: int = Query(...)):
"""
删除发票记录
"""
try:
await invoice_controller.remove(id)
ok = True
except Exception:
ok = False
return Success(data={"deleted": ok}, msg="删除成功" if ok else "未找到")
@invoice_router.post("/update-status", summary="更新发票状态", response_model=BasicResponse[InvoiceOut], dependencies=[DependAuth, DependPermission])
async def update_invoice_status(data: UpdateStatus):
"""
更新发票状态pending|invoiced|rejected|refunded
"""
out = await invoice_controller.update_status(data)
return Success(data=out or {}, msg="更新成功" if out else "未找到")
@invoice_router.post("/{id}/receipt", summary="上传付款凭证", response_model=BasicResponse[dict], dependencies=[DependAuth, DependPermission])
async def upload_payment_receipt(id: int, data: PaymentReceiptCreate):
"""
上传对公转账付款凭证
"""
receipt = await invoice_controller.create_receipt(id, data)
detail = await invoice_controller.get_receipt_by_id(receipt.id)
return Success(data=detail, msg="上传成功")
@invoice_router.get("/headers", summary="发票抬头列表", response_model=BasicResponse[list[InvoiceHeaderOut]], dependencies=[DependAuth, DependPermission])
async def get_invoice_headers(app_user_id: Optional[int] = Query(None)):
"""
管理端抬头列表管理员token允许按 app_user_id 过滤;为空则返回全部。
"""
headers = await invoice_controller.get_headers(user_id=app_user_id)
return Success(data=headers, msg="获取成功")
@invoice_router.get("/headers/{id}", summary="发票抬头详情", response_model=BasicResponse[InvoiceHeaderOut], dependencies=[DependAuth, DependPermission])
async def get_invoice_header_by_id(id: int):
"""
获取发票抬头详情
"""
header = await invoice_controller.get_header_by_id(id)
return Success(data=header or {}, msg="获取成功" if header else "未找到")
@invoice_router.post("/headers", summary="新增发票抬头", response_model=BasicResponse[InvoiceHeaderOut], dependencies=[DependAuth, DependPermission])
async def create_invoice_header(data: InvoiceHeaderCreate, app_user_id: Optional[int] = Query(None)):
"""
新增发票抬头
"""
header = await invoice_controller.create_header(user_id=app_user_id, data=data)
return Success(data=header, msg="创建成功")
@invoice_router.put("/{id}/type", summary="更新发票类型", response_model=BasicResponse[InvoiceOut], dependencies=[DependAuth, DependPermission])
async def update_invoice_type(id: int, data: UpdateType):
"""
更新发票的电子/纸质与专票/普票类型
"""
out = await invoice_controller.update_type(id, data)
return Success(data=out or {}, msg="更新成功" if out else "未找到")
@invoice_router.delete("/headers/{id}", summary="删除发票抬头", response_model=BasicResponse[MessageOut], dependencies=[DependAuth, DependPermission])
async def delete_invoice_header(id: int):
ok = await invoice_controller.delete_header(id)
return Success(msg="删除成功" if ok else "未找到")
@invoice_router.put("/headers/{id}", summary="更新发票抬头", response_model=BasicResponse[InvoiceHeaderOut], dependencies=[DependAuth, DependPermission])
async def update_invoice_header(id: int, data: InvoiceHeaderUpdate):
header = await invoice_controller.update_header(id, data)
return Success(data=header or {}, msg="更新成功" if header else "未找到")
# 用户端我的发票列表使用App用户token
@invoice_router.get("/app-list", summary="我的发票列表", response_model=PageResponse[InvoiceOut])
async def list_my_invoices(
status: Optional[str] = Query(None),
ticket_type: Optional[str] = Query(None),
invoice_type: Optional[str] = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100),
current_user: AppUser = Depends(get_current_app_user),
):
result = await invoice_controller.list(
page=page,
page_size=page_size,
status=status,
ticket_type=ticket_type,
invoice_type=invoice_type,
app_user_id=current_user.id,
)
return SuccessExtra(
data=result.items,
total=result.total,
page=result.page,
page_size=result.page_size,
msg="获取成功",
)
# 用户端我的发票抬头使用App用户token
@invoice_router.get("/app-headers", summary="我的发票抬头", response_model=BasicResponse[list[InvoiceHeaderOut]])
async def get_my_invoice_headers(current_user: AppUser = Depends(get_current_app_user)):
headers = await invoice_controller.get_headers(user_id=current_user.id)
return Success(data=headers, msg="获取成功")