144 lines
7.6 KiB
Python
144 lines
7.6 KiB
Python
from fastapi import APIRouter, Query, Depends
|
|
from typing import Optional
|
|
|
|
from app.schemas.base import Success, SuccessExtra, BasicResponse, PageResponse
|
|
from app.schemas.invoice import InvoiceOut, InvoiceHeaderOut, InvoiceHeaderCreate, InvoiceHeaderUpdate, PaymentReceiptCreate, AppCreateInvoiceWithReceipt, InvoiceCreate
|
|
from app.controllers.invoice import invoice_controller
|
|
from app.utils.app_user_jwt import get_current_app_user
|
|
from app.models.user import AppUser
|
|
from app.models.invoice import InvoiceHeader
|
|
|
|
app_invoices_router = APIRouter(tags=["app-发票管理"])
|
|
|
|
|
|
@app_invoices_router.get("/list", summary="我的发票列表", response_model=PageResponse[InvoiceOut])
|
|
async def get_my_invoices(
|
|
status: Optional[str] = Query(None),
|
|
ticket_type: Optional[str] = Query(None),
|
|
invoice_type: Optional[str] = Query(None),
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(10, ge=1, le=100),
|
|
current_user: AppUser = Depends(get_current_app_user),
|
|
):
|
|
result = await invoice_controller.list(
|
|
page=page,
|
|
page_size=page_size,
|
|
status=status,
|
|
ticket_type=ticket_type,
|
|
invoice_type=invoice_type,
|
|
app_user_id=current_user.id,
|
|
)
|
|
return SuccessExtra(
|
|
data=result.items,
|
|
total=result.total,
|
|
page=result.page,
|
|
page_size=result.page_size,
|
|
msg="获取成功",
|
|
)
|
|
|
|
|
|
@app_invoices_router.get("/headers", summary="我的发票抬头", response_model=BasicResponse[list[InvoiceHeaderOut]])
|
|
async def get_my_headers(current_user: AppUser = Depends(get_current_app_user)):
|
|
headers = await invoice_controller.get_headers(user_id=current_user.id)
|
|
return Success(data=headers, msg="获取成功")
|
|
|
|
@app_invoices_router.get("/headers/{id}", summary="我的发票抬头详情", response_model=BasicResponse[InvoiceHeaderOut])
|
|
async def get_my_header_by_id(id: int, current_user: AppUser = Depends(get_current_app_user)):
|
|
header = await invoice_controller.get_header_by_id(id)
|
|
if not header or getattr(header, "id", None) is None:
|
|
return Success(data={}, msg="未找到")
|
|
# 仅允许访问属于自己的抬头
|
|
if getattr(header, "app_user_id", None) not in (current_user.id, None):
|
|
return Success(data={}, msg="未找到")
|
|
return Success(data=header, msg="获取成功")
|
|
|
|
@app_invoices_router.post("/headers", summary="新增我的发票抬头", response_model=BasicResponse[InvoiceHeaderOut])
|
|
async def create_my_header(data: InvoiceHeaderCreate, current_user: AppUser = Depends(get_current_app_user)):
|
|
header = await invoice_controller.create_header(user_id=current_user.id, data=data)
|
|
return Success(data=header, msg="创建成功")
|
|
|
|
@app_invoices_router.put("/headers/{id}", summary="更新我的发票抬头", response_model=BasicResponse[InvoiceHeaderOut])
|
|
async def update_my_header(id: int, data: InvoiceHeaderUpdate, current_user: AppUser = Depends(get_current_app_user)):
|
|
existing = await invoice_controller.get_header_by_id(id)
|
|
if not existing or getattr(existing, "id", None) is None:
|
|
return Success(data={}, msg="未找到")
|
|
if getattr(existing, "app_user_id", None) != current_user.id:
|
|
return Success(data={}, msg="未找到")
|
|
header = await invoice_controller.update_header(id, data)
|
|
return Success(data=header or {}, msg="更新成功" if header else "未找到")
|
|
|
|
@app_invoices_router.delete("/headers/{id}", summary="删除我的发票抬头", response_model=BasicResponse[dict])
|
|
async def delete_my_header(id: int, current_user: AppUser = Depends(get_current_app_user)):
|
|
existing = await invoice_controller.get_header_by_id(id)
|
|
if not existing or getattr(existing, "id", None) is None:
|
|
return Success(data={"deleted": False}, msg="未找到")
|
|
if getattr(existing, "app_user_id", None) != current_user.id:
|
|
return Success(data={"deleted": False}, msg="未找到")
|
|
ok = await invoice_controller.delete_header(id)
|
|
return Success(data={"deleted": ok}, msg="删除成功" if ok else "未找到")
|
|
|
|
|
|
@app_invoices_router.post("/receipts/{id}", summary="上传我的付款凭证", response_model=BasicResponse[dict])
|
|
async def upload_my_receipt(id: int, data: PaymentReceiptCreate, current_user: AppUser = Depends(get_current_app_user)):
|
|
inv = await invoice_controller.model.filter(id=id, app_user_id=current_user.id).first()
|
|
if not inv:
|
|
return Success(data={}, msg="未找到")
|
|
receipt = await invoice_controller.create_receipt(id, data)
|
|
detail = await invoice_controller.get_receipt_by_id(receipt.id)
|
|
return Success(data=detail, msg="上传成功")
|
|
|
|
|
|
@app_invoices_router.post("/create-with-receipt", summary="创建我的发票并上传付款凭证", response_model=BasicResponse[dict])
|
|
async def create_with_receipt(payload: AppCreateInvoiceWithReceipt, current_user: AppUser = Depends(get_current_app_user)):
|
|
header = await InvoiceHeader.filter(id=payload.header_id, app_user_id=current_user.id).first()
|
|
if not header:
|
|
return Success(data={}, msg="抬头未找到")
|
|
ticket_type = payload.ticket_type or "electronic"
|
|
invoice_type = payload.invoice_type
|
|
if not invoice_type:
|
|
mapping = {"0": "normal", "1": "special"}
|
|
invoice_type = mapping.get(str(payload.invoiceTypeIndex)) if payload.invoiceTypeIndex is not None else None
|
|
if not invoice_type:
|
|
invoice_type = "normal"
|
|
inv_data = InvoiceCreate(
|
|
ticket_type=ticket_type,
|
|
invoice_type=invoice_type,
|
|
phone=current_user.phone,
|
|
email=header.email,
|
|
company_name=header.company_name,
|
|
tax_number=header.tax_number,
|
|
register_address=header.register_address,
|
|
register_phone=header.register_phone,
|
|
bank_name=header.bank_name,
|
|
bank_account=header.bank_account,
|
|
app_user_id=current_user.id,
|
|
header_id=header.id,
|
|
wechat=getattr(current_user, "alias", None),
|
|
)
|
|
inv = await invoice_controller.create(inv_data)
|
|
if payload.receipt_urls:
|
|
receipts = []
|
|
for url in payload.receipt_urls:
|
|
receipt = await invoice_controller.create_receipt(inv.id, PaymentReceiptCreate(url=url, note=payload.note))
|
|
detail = await invoice_controller.get_receipt_by_id(receipt.id)
|
|
if detail:
|
|
receipts.append(detail)
|
|
return Success(data={"invoice_id": inv.id, "receipts": receipts}, msg="创建并上传成功")
|
|
if payload.receipt_url:
|
|
receipt = await invoice_controller.create_receipt(inv.id, PaymentReceiptCreate(url=payload.receipt_url, note=payload.note))
|
|
detail = await invoice_controller.get_receipt_by_id(receipt.id)
|
|
return Success(data=detail, msg="创建并上传成功")
|
|
else:
|
|
out = await invoice_controller.get_out(inv.id)
|
|
return Success(data=out.model_dump() if out else {}, msg="创建成功,未上传凭证")
|
|
@app_invoices_router.get("/headers/list", summary="我的抬头列表(分页)", response_model=PageResponse[InvoiceHeaderOut])
|
|
async def get_my_headers_paged(page: int = Query(1, ge=1), page_size: int = Query(10, ge=1, le=100), current_user: AppUser = Depends(get_current_app_user)):
|
|
qs = invoice_controller.model_header.filter(app_user_id=current_user.id) if hasattr(invoice_controller, "model_header") else None
|
|
# Fallback when controller没有暴露model_header
|
|
from app.models.invoice import InvoiceHeader
|
|
qs = InvoiceHeader.filter(app_user_id=current_user.id)
|
|
total = await qs.count()
|
|
rows = await qs.order_by("-created_at").offset((page - 1) * page_size).limit(page_size)
|
|
items = [InvoiceHeaderOut.model_validate(r) for r in rows]
|
|
return SuccessExtra(data=[i.model_dump() for i in items], total=total, page=page, page_size=page_size, msg="获取成功")
|