guzhi/app/controllers/invoice.py
邹方成 552c02516a feat(发票): 支持多附件上传和邮件发送功能
refactor(用户管理): 优化用户列表查询和备注字段处理

feat(估值): 评估报告和证书URL改为数组类型并添加下载地址

docs: 添加交易管理与用户备注功能增强实施计划

fix(邮件): 修复邮件发送接口的多附件支持问题

style: 清理注释代码和格式化文件
2025-11-25 20:09:50 +08:00

365 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Optional, List
from tortoise.queryset import QuerySet
from app.core.crud import CRUDBase
from app.models.invoice import Invoice, InvoiceHeader, PaymentReceipt
from app.schemas.invoice import (
InvoiceCreate,
InvoiceUpdate,
InvoiceOut,
InvoiceList,
InvoiceHeaderCreate,
InvoiceHeaderUpdate,
InvoiceHeaderOut,
UpdateStatus,
UpdateType,
PaymentReceiptCreate,
PaymentReceiptOut,
)
class InvoiceController(CRUDBase[Invoice, InvoiceCreate, InvoiceUpdate]):
"""发票控制器"""
def __init__(self):
super().__init__(model=Invoice)
async def create_header(self, user_id: Optional[int], data: InvoiceHeaderCreate) -> InvoiceHeaderOut:
"""
创建发票抬头
参数:
user_id: 关联的 AppUser ID可选
data: 发票抬头创建数据
返回:
InvoiceHeaderOut: 抬头输出对象
"""
payload = data.model_dump()
for k in ["register_address", "register_phone", "bank_name", "bank_account"]:
if payload.get(k) is None:
payload[k] = ""
if payload.get("is_default"):
if user_id is not None:
await InvoiceHeader.filter(app_user_id=user_id).update(is_default=False)
header = await InvoiceHeader.create(app_user_id=user_id, **payload)
return InvoiceHeaderOut.model_validate(header)
async def get_headers(self, user_id: Optional[int] = None) -> List[InvoiceHeaderOut]:
"""
获取发票抬头列表
参数:
user_id: 可筛选 AppUser 的抬头
返回:
List[InvoiceHeaderOut]: 抬头列表
"""
qs = InvoiceHeader.all()
if user_id is not None:
qs = qs.filter(app_user_id=user_id)
headers = await qs.order_by("-created_at")
return [InvoiceHeaderOut.model_validate(h) for h in headers]
async def get_header_by_id(self, id_: int) -> Optional[InvoiceHeaderOut]:
"""
根据ID获取抬头
参数:
id_: 抬头ID
返回:
InvoiceHeaderOut 或 None
"""
header = await InvoiceHeader.filter(id=id_).first()
return InvoiceHeaderOut.model_validate(header) if header else None
async def delete_header(self, id_: int) -> bool:
header = await InvoiceHeader.filter(id=id_).first()
if not header:
return False
await header.delete()
return True
async def update_header(self, id_: int, data: InvoiceHeaderUpdate) -> Optional[InvoiceHeaderOut]:
header = await InvoiceHeader.filter(id=id_).first()
if not header:
return None
update_data = data.model_dump(exclude_unset=True)
if update_data:
if update_data.get("is_default"):
if header.app_user_id is not None:
await InvoiceHeader.filter(app_user_id=header.app_user_id).exclude(id=header.id).update(is_default=False)
await header.update_from_dict(update_data).save()
return InvoiceHeaderOut.model_validate(header)
async def list(self, page: int = 1, page_size: int = 10, **filters) -> InvoiceList:
"""
获取发票列表(支持筛选与分页)
参数:
page: 页码
page_size: 每页数量
**filters: phone、company_name、tax_number、status、ticket_type、invoice_type、时间范围等
返回:
InvoiceList: 分页结果
"""
qs: QuerySet = self.model.all()
if filters.get("phone"):
qs = qs.filter(phone__icontains=filters["phone"])
if filters.get("company_name"):
qs = qs.filter(company_name__icontains=filters["company_name"])
if filters.get("tax_number"):
qs = qs.filter(tax_number__icontains=filters["tax_number"])
if filters.get("status"):
qs = qs.filter(status=filters["status"])
if filters.get("ticket_type"):
qs = qs.filter(ticket_type=filters["ticket_type"])
if filters.get("invoice_type"):
qs = qs.filter(invoice_type=filters["invoice_type"])
if filters.get("app_user_id"):
qs = qs.filter(app_user_id=filters["app_user_id"])
total = await qs.count()
rows = await qs.order_by("-created_at").offset((page - 1) * page_size).limit(page_size)
items = [
InvoiceOut(
id=row.id,
created_at=row.created_at.isoformat() if row.created_at else "",
ticket_type=row.ticket_type,
invoice_type=row.invoice_type,
phone=row.phone,
email=row.email,
company_name=row.company_name,
tax_number=row.tax_number,
register_address=row.register_address,
register_phone=row.register_phone,
bank_name=row.bank_name,
bank_account=row.bank_account,
status=row.status,
app_user_id=row.app_user_id,
header_id=row.header_id,
wechat=row.wechat,
)
for row in rows
]
return InvoiceList(items=items, total=total, page=page, page_size=page_size)
async def update_status(self, data: UpdateStatus) -> Optional[InvoiceOut]:
"""
更新发票状态
参数:
data: 包含 id 与目标状态
返回:
更新后的发票输出或 None
"""
inv = await self.model.filter(id=data.id).first()
if not inv:
return None
inv.status = data.status
await inv.save()
return await self.get_out(inv.id)
async def update_type(self, id_: int, data: UpdateType) -> Optional[InvoiceOut]:
"""
更新发票类型(电子/纸质、专票/普票)
参数:
id_: 发票ID
data: 类型更新数据
返回:
更新后的发票输出或 None
"""
inv = await self.model.filter(id=id_).first()
if not inv:
return None
inv.ticket_type = data.ticket_type
inv.invoice_type = data.invoice_type
await inv.save()
return await self.get_out(inv.id)
async def create_receipt(self, invoice_id: int, data: PaymentReceiptCreate) -> PaymentReceiptOut:
"""
上传付款凭证
参数:
invoice_id: 发票ID
data: 凭证创建数据
返回:
PaymentReceiptOut
"""
receipt = await PaymentReceipt.create(invoice_id=invoice_id, **data.model_dump())
return PaymentReceiptOut(
id=receipt.id,
url=receipt.url,
note=receipt.note,
verified=receipt.verified,
created_at=receipt.created_at.isoformat() if receipt.created_at else "",
extra=receipt.extra,
)
async def list_receipts(self, page: int = 1, page_size: int = 10, **filters) -> dict:
"""
对公转账记录列表
参数:
page: 页码
page_size: 每页数量
**filters: 提交时间范围、手机号、微信号、公司名称/税号、状态、开票类型等
返回:
dict: { items, total, page, page_size }
"""
qs = PaymentReceipt.all().prefetch_related("invoice")
# 通过关联发票进行筛选
if filters.get("phone"):
qs = qs.filter(invoice__phone__icontains=filters["phone"])
if filters.get("wechat"):
qs = qs.filter(invoice__wechat__icontains=filters["wechat"])
if filters.get("company_name"):
qs = qs.filter(invoice__company_name__icontains=filters["company_name"])
if filters.get("tax_number"):
qs = qs.filter(invoice__tax_number__icontains=filters["tax_number"])
if filters.get("status"):
qs = qs.filter(invoice__status=filters["status"])
if filters.get("ticket_type"):
qs = qs.filter(invoice__ticket_type=filters["ticket_type"])
if filters.get("invoice_type"):
qs = qs.filter(invoice__invoice_type=filters["invoice_type"])
# 时间区间筛选(凭证提交时间)
created_range = filters.get("created_at")
submitted_start = filters.get("submitted_start")
submitted_end = filters.get("submitted_end")
if created_range and isinstance(created_range, (list, tuple)) and len(created_range) == 2:
try:
# 前端可能传毫秒时间戳
start_ms = int(created_range[0])
end_ms = int(created_range[1])
from datetime import datetime
start_dt = datetime.fromtimestamp(start_ms / 1000)
end_dt = datetime.fromtimestamp(end_ms / 1000)
qs = qs.filter(created_at__gte=start_dt, created_at__lte=end_dt)
except Exception:
pass
else:
from datetime import datetime
def parse_time(v):
try:
iv = int(v)
return datetime.fromtimestamp(iv / 1000)
except Exception:
try:
# ISO 字符串
return datetime.fromisoformat(v)
except Exception:
return None
if submitted_start:
s_dt = parse_time(submitted_start)
if s_dt:
qs = qs.filter(created_at__gte=s_dt)
if submitted_end:
e_dt = parse_time(submitted_end)
if e_dt:
qs = qs.filter(created_at__lte=e_dt)
total = await qs.count()
rows = await qs.order_by("-created_at").offset((page - 1) * page_size).limit(page_size)
items = []
for r in rows:
inv = await r.invoice
items.append({
"id": r.id,
"invoice_id": getattr(inv, "id", None),
"submitted_at": r.created_at.isoformat() if r.created_at else "",
"receipt_uploaded_at": r.updated_at.isoformat() if getattr(r, "updated_at", None) else "",
"extra": r.extra,
"receipts": [
{
"id": r.id,
"url": r.url,
"note": r.note,
"verified": r.verified,
}
],
"phone": inv.phone,
"wechat": inv.wechat,
"company_name": inv.company_name,
"tax_number": inv.tax_number,
"register_address": inv.register_address,
"register_phone": inv.register_phone,
"bank_name": inv.bank_name,
"bank_account": inv.bank_account,
"email": inv.email,
"ticket_type": inv.ticket_type,
"invoice_type": inv.invoice_type,
"status": inv.status,
})
return {"items": items, "total": total, "page": page, "page_size": page_size}
async def get_receipt_by_id(self, id_: int) -> Optional[dict]:
"""
对公转账记录详情
参数:
id_: 付款凭证ID
返回:
dict 或 None
"""
r = await PaymentReceipt.filter(id=id_).first()
if not r:
return None
inv = await r.invoice
return {
"id": r.id,
"invoice_id": getattr(inv, "id", None),
"submitted_at": r.created_at.isoformat() if r.created_at else "",
"receipt_uploaded_at": r.updated_at.isoformat() if getattr(r, "updated_at", None) else "",
"extra": r.extra,
"receipts": [
{
"id": r.id,
"url": r.url,
"note": r.note,
"verified": r.verified,
}
],
"phone": inv.phone,
"wechat": inv.wechat,
"company_name": inv.company_name,
"tax_number": inv.tax_number,
"register_address": inv.register_address,
"register_phone": inv.register_phone,
"bank_name": inv.bank_name,
"bank_account": inv.bank_account,
"email": inv.email,
"ticket_type": inv.ticket_type,
"invoice_type": inv.invoice_type,
"status": inv.status,
}
async def get_out(self, id_: int) -> Optional[InvoiceOut]:
"""
根据ID返回发票输出对象
参数:
id_: 发票ID
返回:
InvoiceOut 或 None
"""
inv = await self.model.filter(id=id_).first()
if not inv:
return None
return InvoiceOut(
id=inv.id,
created_at=inv.created_at.isoformat() if inv.created_at else "",
ticket_type=inv.ticket_type,
invoice_type=inv.invoice_type,
phone=inv.phone,
email=inv.email,
company_name=inv.company_name,
tax_number=inv.tax_number,
register_address=inv.register_address,
register_phone=inv.register_phone,
bank_name=inv.bank_name,
bank_account=inv.bank_account,
status=inv.status,
app_user_id=inv.app_user_id,
header_id=inv.header_id,
wechat=inv.wechat,
)
invoice_controller = InvoiceController()