guzhi/app/controllers/invoice.py
2025-12-04 14:44:23 +08:00

382 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Optional, List
from tortoise.queryset import QuerySet
from app.core.crud import CRUDBase
from app.models.invoice import Invoice, InvoiceHeader, PaymentReceipt
from app.schemas.invoice import (
InvoiceCreate,
InvoiceUpdate,
InvoiceOut,
InvoiceList,
InvoiceHeaderCreate,
InvoiceHeaderUpdate,
InvoiceHeaderOut,
UpdateStatus,
UpdateType,
PaymentReceiptCreate,
PaymentReceiptOut,
)
class InvoiceController(CRUDBase[Invoice, InvoiceCreate, InvoiceUpdate]):
"""发票控制器"""
def __init__(self):
super().__init__(model=Invoice)
async def create_header(self, user_id: Optional[int], data: InvoiceHeaderCreate) -> InvoiceHeaderOut:
"""
创建发票抬头
参数:
user_id: 关联的 AppUser ID可选
data: 发票抬头创建数据
返回:
InvoiceHeaderOut: 抬头输出对象
"""
payload = data.model_dump()
for k in ["register_address", "register_phone", "bank_name", "bank_account"]:
if payload.get(k) is None:
payload[k] = ""
if payload.get("is_default"):
if user_id is not None:
await InvoiceHeader.filter(app_user_id=user_id).update(is_default=False)
header = await InvoiceHeader.create(app_user_id=user_id, **payload)
return InvoiceHeaderOut.model_validate(header)
async def get_headers(self, user_id: Optional[int] = None) -> List[InvoiceHeaderOut]:
"""
获取发票抬头列表
参数:
user_id: 可筛选 AppUser 的抬头
返回:
List[InvoiceHeaderOut]: 抬头列表
"""
qs = InvoiceHeader.all()
if user_id is not None:
qs = qs.filter(app_user_id=user_id)
headers = await qs.order_by("-created_at")
return [InvoiceHeaderOut.model_validate(h) for h in headers]
async def get_header_by_id(self, id_: int) -> Optional[InvoiceHeaderOut]:
"""
根据ID获取抬头
参数:
id_: 抬头ID
返回:
InvoiceHeaderOut 或 None
"""
header = await InvoiceHeader.filter(id=id_).first()
return InvoiceHeaderOut.model_validate(header) if header else None
async def delete_header(self, id_: int) -> bool:
header = await InvoiceHeader.filter(id=id_).first()
if not header:
return False
await header.delete()
return True
async def update_header(self, id_: int, data: InvoiceHeaderUpdate) -> Optional[InvoiceHeaderOut]:
header = await InvoiceHeader.filter(id=id_).first()
if not header:
return None
update_data = data.model_dump(exclude_unset=True)
if update_data:
if update_data.get("is_default"):
if header.app_user_id is not None:
await InvoiceHeader.filter(app_user_id=header.app_user_id).exclude(id=header.id).update(is_default=False)
await header.update_from_dict(update_data).save()
# 同步引用该抬头的发票基本信息
sync_fields = {
"company_name": header.company_name,
"tax_number": header.tax_number,
"register_address": header.register_address,
"register_phone": header.register_phone,
"bank_name": header.bank_name,
"bank_account": header.bank_account,
"email": header.email,
}
await Invoice.filter(header_id=header.id).update(**sync_fields)
return InvoiceHeaderOut.model_validate(header)
async def list(self, page: int = 1, page_size: int = 10, **filters) -> InvoiceList:
"""
获取发票列表(支持筛选与分页)
参数:
page: 页码
page_size: 每页数量
**filters: phone、company_name、tax_number、status、ticket_type、invoice_type、时间范围等
返回:
InvoiceList: 分页结果
"""
qs: QuerySet = self.model.all()
if filters.get("phone"):
qs = qs.filter(phone__icontains=filters["phone"])
if filters.get("company_name"):
qs = qs.filter(company_name__icontains=filters["company_name"])
if filters.get("tax_number"):
qs = qs.filter(tax_number__icontains=filters["tax_number"])
if filters.get("status"):
qs = qs.filter(status=filters["status"])
if filters.get("ticket_type"):
qs = qs.filter(ticket_type=filters["ticket_type"])
if filters.get("invoice_type"):
qs = qs.filter(invoice_type=filters["invoice_type"])
if filters.get("app_user_id"):
qs = qs.filter(app_user_id=filters["app_user_id"])
total = await qs.count()
rows = await qs.order_by("-created_at").offset((page - 1) * page_size).limit(page_size)
items = [
InvoiceOut(
id=row.id,
created_at=row.created_at.isoformat() if row.created_at else "",
ticket_type=row.ticket_type,
invoice_type=row.invoice_type,
phone=row.phone,
email=row.email,
company_name=row.company_name,
tax_number=row.tax_number,
register_address=row.register_address,
register_phone=row.register_phone,
bank_name=row.bank_name,
bank_account=row.bank_account,
status=row.status,
app_user_id=row.app_user_id,
header_id=row.header_id,
wechat=row.wechat,
)
for row in rows
]
return InvoiceList(items=items, total=total, page=page, page_size=page_size)
async def update_status(self, data: UpdateStatus) -> Optional[InvoiceOut]:
"""
更新发票状态
参数:
data: 包含 id 与目标状态
返回:
更新后的发票输出或 None
"""
inv = await self.model.filter(id=data.id).first()
if not inv:
return None
inv.status = data.status
await inv.save()
return await self.get_out(inv.id)
async def update_type(self, id_: int, data: UpdateType) -> Optional[InvoiceOut]:
"""
更新发票类型(电子/纸质、专票/普票)
参数:
id_: 发票ID
data: 类型更新数据
返回:
更新后的发票输出或 None
"""
inv = await self.model.filter(id=id_).first()
if not inv:
return None
inv.ticket_type = data.ticket_type
inv.invoice_type = data.invoice_type
await inv.save()
return await self.get_out(inv.id)
async def create_receipt(self, invoice_id: int, data: PaymentReceiptCreate) -> PaymentReceiptOut:
"""
上传付款凭证
参数:
invoice_id: 发票ID
data: 凭证创建数据
返回:
PaymentReceiptOut
"""
receipt = await PaymentReceipt.create(invoice_id=invoice_id, **data.model_dump())
return PaymentReceiptOut(
id=receipt.id,
url=receipt.url,
note=receipt.note,
verified=receipt.verified,
created_at=receipt.created_at.isoformat() if receipt.created_at else "",
extra=receipt.extra,
)
async def list_receipts(self, page: int = 1, page_size: int = 10, **filters) -> dict:
"""
对公转账记录列表
参数:
page: 页码
page_size: 每页数量
**filters: 提交时间范围、手机号、微信号、公司名称/税号、状态、开票类型等
返回:
dict: { items, total, page, page_size }
"""
qs = PaymentReceipt.all().prefetch_related("invoice")
# 通过关联发票进行筛选
if filters.get("phone"):
qs = qs.filter(invoice__phone__icontains=filters["phone"])
if filters.get("wechat"):
qs = qs.filter(invoice__wechat__icontains=filters["wechat"])
if filters.get("company_name"):
qs = qs.filter(invoice__company_name__icontains=filters["company_name"])
if filters.get("tax_number"):
qs = qs.filter(invoice__tax_number__icontains=filters["tax_number"])
if filters.get("status"):
qs = qs.filter(invoice__status=filters["status"])
if filters.get("ticket_type"):
qs = qs.filter(invoice__ticket_type=filters["ticket_type"])
if filters.get("invoice_type"):
qs = qs.filter(invoice__invoice_type=filters["invoice_type"])
# 时间区间筛选(凭证提交时间)
created_range = filters.get("created_at")
submitted_start = filters.get("submitted_start")
submitted_end = filters.get("submitted_end")
if created_range and isinstance(created_range, (list, tuple)) and len(created_range) == 2:
try:
# 前端可能传毫秒时间戳
start_ms = int(created_range[0])
end_ms = int(created_range[1])
from datetime import datetime
start_dt = datetime.fromtimestamp(start_ms / 1000)
end_dt = datetime.fromtimestamp(end_ms / 1000)
qs = qs.filter(created_at__gte=start_dt, created_at__lte=end_dt)
except Exception:
pass
else:
from datetime import datetime
def parse_time(v):
try:
iv = int(v)
return datetime.fromtimestamp(iv / 1000)
except Exception:
try:
# ISO 字符串
return datetime.fromisoformat(v)
except Exception:
return None
if submitted_start:
s_dt = parse_time(submitted_start)
if s_dt:
qs = qs.filter(created_at__gte=s_dt)
if submitted_end:
e_dt = parse_time(submitted_end)
if e_dt:
qs = qs.filter(created_at__lte=e_dt)
total = await qs.count()
rows = await qs.order_by("-created_at").offset((page - 1) * page_size).limit(page_size)
items = []
for r in rows:
inv = await r.invoice
urls = []
if isinstance(r.extra, list):
urls = [str(u) for u in r.extra if u]
elif isinstance(r.extra, dict):
v = r.extra.get("urls")
if isinstance(v, list):
urls = [str(u) for u in v if u]
if not urls:
urls = [r.url] if r.url else []
receipts = [{"id": r.id, "url": u, "note": r.note, "verified": r.verified} for u in urls]
items.append({
"id": r.id,
"invoice_id": getattr(inv, "id", None),
"submitted_at": r.created_at.isoformat() if r.created_at else "",
"receipt_uploaded_at": r.updated_at.isoformat() if getattr(r, "updated_at", None) else "",
"extra": r.extra,
"receipts": receipts,
"phone": inv.phone,
"wechat": inv.wechat,
"company_name": inv.company_name,
"tax_number": inv.tax_number,
"register_address": inv.register_address,
"register_phone": inv.register_phone,
"bank_name": inv.bank_name,
"bank_account": inv.bank_account,
"email": inv.email,
"ticket_type": inv.ticket_type,
"invoice_type": inv.invoice_type,
"status": inv.status,
})
return {"items": items, "total": total, "page": page, "page_size": page_size}
async def get_receipt_by_id(self, id_: int) -> Optional[dict]:
"""
对公转账记录详情
参数:
id_: 付款凭证ID
返回:
dict 或 None
"""
r = await PaymentReceipt.filter(id=id_).first()
if not r:
return None
inv = await r.invoice
urls = []
if isinstance(r.extra, list):
urls = [str(u) for u in r.extra if u]
elif isinstance(r.extra, dict):
v = r.extra.get("urls")
if isinstance(v, list):
urls = [str(u) for u in v if u]
if not urls:
urls = [r.url] if r.url else []
receipts = [{"id": r.id, "url": u, "note": r.note, "verified": r.verified} for u in urls]
return {
"id": r.id,
"invoice_id": getattr(inv, "id", None),
"submitted_at": r.created_at.isoformat() if r.created_at else "",
"receipt_uploaded_at": r.updated_at.isoformat() if getattr(r, "updated_at", None) else "",
"extra": r.extra,
"receipts": receipts,
"phone": inv.phone,
"wechat": inv.wechat,
"company_name": inv.company_name,
"tax_number": inv.tax_number,
"register_address": inv.register_address,
"register_phone": inv.register_phone,
"bank_name": inv.bank_name,
"bank_account": inv.bank_account,
"email": inv.email,
"ticket_type": inv.ticket_type,
"invoice_type": inv.invoice_type,
"status": inv.status,
}
async def get_out(self, id_: int) -> Optional[InvoiceOut]:
"""
根据ID返回发票输出对象
参数:
id_: 发票ID
返回:
InvoiceOut 或 None
"""
inv = await self.model.filter(id=id_).first()
if not inv:
return None
return InvoiceOut(
id=inv.id,
created_at=inv.created_at.isoformat() if inv.created_at else "",
ticket_type=inv.ticket_type,
invoice_type=inv.invoice_type,
phone=inv.phone,
email=inv.email,
company_name=inv.company_name,
tax_number=inv.tax_number,
register_address=inv.register_address,
register_phone=inv.register_phone,
bank_name=inv.bank_name,
bank_account=inv.bank_account,
status=inv.status,
app_user_id=inv.app_user_id,
header_id=inv.header_id,
wechat=inv.wechat,
)
invoice_controller = InvoiceController()