guzhi/app/api/v1/invoice/invoice.py
邹方成 552c02516a feat(发票): 支持多附件上传和邮件发送功能
refactor(用户管理): 优化用户列表查询和备注字段处理

feat(估值): 评估报告和证书URL改为数组类型并添加下载地址

docs: 添加交易管理与用户备注功能增强实施计划

fix(邮件): 修复邮件发送接口的多附件支持问题

style: 清理注释代码和格式化文件
2025-11-25 20:09:50 +08:00

173 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Query, Depends, Header, HTTPException
from typing import Optional
from app.schemas.base import Success, SuccessExtra, BasicResponse, PageResponse, MessageOut
from app.schemas.invoice import (
InvoiceCreate,
InvoiceUpdate,
UpdateStatus,
UpdateType,
InvoiceHeaderCreate,
InvoiceHeaderUpdate,
PaymentReceiptCreate,
AppCreateInvoiceWithReceipt,
InvoiceOut,
InvoiceList,
InvoiceHeaderOut,
PaymentReceiptOut,
)
from app.controllers.invoice import invoice_controller
from app.utils.app_user_jwt import get_current_app_user
from app.core.dependency import DependAuth, DependPermission
from app.models.user import AppUser
from app.models.invoice import InvoiceHeader
invoice_router = APIRouter(tags=["发票管理"])
@invoice_router.get("/list", summary="获取发票列表", response_model=PageResponse[InvoiceOut], dependencies=[DependAuth, DependPermission])
async def list_invoices(
phone: Optional[str] = Query(None),
company_name: Optional[str] = Query(None),
tax_number: Optional[str] = Query(None),
status: Optional[str] = Query(None),
ticket_type: Optional[str] = Query(None),
invoice_type: Optional[str] = Query(None),
user_id: Optional[int] = Query(None, description="按App用户ID过滤"),
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100),
):
"""
发票列表查询
参数支持按手机号、公司名称、税号、状态、发票类型进行筛选
返回分页结构
"""
result = await invoice_controller.list(
page=page,
page_size=page_size,
phone=phone,
company_name=company_name,
tax_number=tax_number,
status=status,
ticket_type=ticket_type,
invoice_type=invoice_type,
app_user_id=user_id,
)
return SuccessExtra(
data=result.items, total=result.total, page=result.page, page_size=result.page_size, msg="获取成功"
)
@invoice_router.get("/detail", summary="发票详情", response_model=BasicResponse[InvoiceOut], dependencies=[DependAuth, DependPermission])
async def invoice_detail(id: int = Query(...)):
"""
根据ID获取发票详情
"""
out = await invoice_controller.get_out(id)
if not out:
return Success(data={}, msg="未找到")
return Success(data=out, msg="获取成功")
@invoice_router.post("/create", summary="创建发票", response_model=BasicResponse[InvoiceOut], dependencies=[DependAuth, DependPermission])
async def create_invoice(data: InvoiceCreate):
"""
创建发票记录
"""
inv = await invoice_controller.create(data)
out = await invoice_controller.get_out(inv.id)
return Success(data=out, msg="创建成功")
@invoice_router.post("/update", summary="更新发票", response_model=BasicResponse[InvoiceOut], dependencies=[DependAuth, DependPermission])
async def update_invoice(data: InvoiceUpdate, id: int = Query(...)):
"""
更新发票记录
"""
updated = await invoice_controller.update(id, data)
out = await invoice_controller.get_out(id) if updated else None
return Success(data=out or {}, msg="更新成功" if updated else "未找到")
@invoice_router.delete("/delete", summary="删除发票", response_model=BasicResponse[MessageOut], dependencies=[DependAuth, DependPermission])
async def delete_invoice(id: int = Query(...)):
"""
删除发票记录
"""
try:
await invoice_controller.remove(id)
ok = True
except Exception:
ok = False
return Success(data={"deleted": ok}, msg="删除成功" if ok else "未找到")
@invoice_router.post("/update-status", summary="更新发票状态", response_model=BasicResponse[InvoiceOut], dependencies=[DependAuth, DependPermission])
async def update_invoice_status(data: UpdateStatus):
"""
更新发票状态pending|invoiced|rejected|refunded
"""
out = await invoice_controller.update_status(data)
return Success(data=out or {}, msg="更新成功" if out else "未找到")
@invoice_router.post("/{id}/receipt", summary="上传付款凭证", response_model=BasicResponse[dict], dependencies=[DependAuth, DependPermission])
async def upload_payment_receipt(id: int, data: PaymentReceiptCreate):
"""
上传对公转账付款凭证
"""
receipt = await invoice_controller.create_receipt(id, data)
detail = await invoice_controller.get_receipt_by_id(receipt.id)
return Success(data=detail, msg="上传成功")
@invoice_router.get("/headers", summary="发票抬头列表", response_model=BasicResponse[list[InvoiceHeaderOut]], dependencies=[DependAuth, DependPermission])
async def get_invoice_headers(app_user_id: Optional[int] = Query(None)):
"""
管理端抬头列表管理员token允许按 app_user_id 过滤;为空则返回全部。
"""
headers = await invoice_controller.get_headers(user_id=app_user_id)
return Success(data=headers, msg="获取成功")
@invoice_router.get("/headers/{id}", summary="发票抬头详情", response_model=BasicResponse[InvoiceHeaderOut], dependencies=[DependAuth, DependPermission])
async def get_invoice_header_by_id(id: int):
"""
获取发票抬头详情
"""
header = await invoice_controller.get_header_by_id(id)
return Success(data=header or {}, msg="获取成功" if header else "未找到")
@invoice_router.post("/headers", summary="新增发票抬头", response_model=BasicResponse[InvoiceHeaderOut], dependencies=[DependAuth, DependPermission])
async def create_invoice_header(data: InvoiceHeaderCreate, app_user_id: Optional[int] = Query(None)):
"""
新增发票抬头
"""
header = await invoice_controller.create_header(user_id=app_user_id, data=data)
return Success(data=header, msg="创建成功")
@invoice_router.put("/{id}/type", summary="更新发票类型", response_model=BasicResponse[InvoiceOut], dependencies=[DependAuth, DependPermission])
async def update_invoice_type(id: int, data: UpdateType):
"""
更新发票的电子/纸质与专票/普票类型
"""
out = await invoice_controller.update_type(id, data)
return Success(data=out or {}, msg="更新成功" if out else "未找到")
@invoice_router.delete("/headers/{id}", summary="删除发票抬头", response_model=BasicResponse[MessageOut], dependencies=[DependAuth, DependPermission])
async def delete_invoice_header(id: int):
ok = await invoice_controller.delete_header(id)
return Success(msg="删除成功" if ok else "未找到")
@invoice_router.put("/headers/{id}", summary="更新发票抬头", response_model=BasicResponse[InvoiceHeaderOut], dependencies=[DependAuth, DependPermission])
async def update_invoice_header(id: int, data: InvoiceHeaderUpdate):
header = await invoice_controller.update_header(id, data)
return Success(data=header or {}, msg="更新成功" if header else "未找到")