382 lines
14 KiB
Python
382 lines
14 KiB
Python
from typing import Optional, List
|
||
from tortoise.queryset import QuerySet
|
||
|
||
from app.core.crud import CRUDBase
|
||
from app.models.invoice import Invoice, InvoiceHeader, PaymentReceipt
|
||
from app.schemas.invoice import (
|
||
InvoiceCreate,
|
||
InvoiceUpdate,
|
||
InvoiceOut,
|
||
InvoiceList,
|
||
InvoiceHeaderCreate,
|
||
InvoiceHeaderUpdate,
|
||
InvoiceHeaderOut,
|
||
UpdateStatus,
|
||
UpdateType,
|
||
PaymentReceiptCreate,
|
||
PaymentReceiptOut,
|
||
)
|
||
|
||
|
||
class InvoiceController(CRUDBase[Invoice, InvoiceCreate, InvoiceUpdate]):
|
||
"""发票控制器"""
|
||
|
||
def __init__(self):
|
||
super().__init__(model=Invoice)
|
||
|
||
async def create_header(self, user_id: Optional[int], data: InvoiceHeaderCreate) -> InvoiceHeaderOut:
|
||
"""
|
||
创建发票抬头
|
||
参数:
|
||
user_id: 关联的 AppUser ID(可选)
|
||
data: 发票抬头创建数据
|
||
返回:
|
||
InvoiceHeaderOut: 抬头输出对象
|
||
"""
|
||
payload = data.model_dump()
|
||
for k in ["register_address", "register_phone", "bank_name", "bank_account"]:
|
||
if payload.get(k) is None:
|
||
payload[k] = ""
|
||
if payload.get("is_default"):
|
||
if user_id is not None:
|
||
await InvoiceHeader.filter(app_user_id=user_id).update(is_default=False)
|
||
header = await InvoiceHeader.create(app_user_id=user_id, **payload)
|
||
return InvoiceHeaderOut.model_validate(header)
|
||
|
||
async def get_headers(self, user_id: Optional[int] = None) -> List[InvoiceHeaderOut]:
|
||
"""
|
||
获取发票抬头列表
|
||
参数:
|
||
user_id: 可筛选 AppUser 的抬头
|
||
返回:
|
||
List[InvoiceHeaderOut]: 抬头列表
|
||
"""
|
||
qs = InvoiceHeader.all()
|
||
if user_id is not None:
|
||
qs = qs.filter(app_user_id=user_id)
|
||
headers = await qs.order_by("-created_at")
|
||
return [InvoiceHeaderOut.model_validate(h) for h in headers]
|
||
|
||
async def get_header_by_id(self, id_: int) -> Optional[InvoiceHeaderOut]:
|
||
"""
|
||
根据ID获取抬头
|
||
参数:
|
||
id_: 抬头ID
|
||
返回:
|
||
InvoiceHeaderOut 或 None
|
||
"""
|
||
header = await InvoiceHeader.filter(id=id_).first()
|
||
return InvoiceHeaderOut.model_validate(header) if header else None
|
||
|
||
async def delete_header(self, id_: int) -> bool:
|
||
header = await InvoiceHeader.filter(id=id_).first()
|
||
if not header:
|
||
return False
|
||
await header.delete()
|
||
return True
|
||
|
||
async def update_header(self, id_: int, data: InvoiceHeaderUpdate) -> Optional[InvoiceHeaderOut]:
|
||
header = await InvoiceHeader.filter(id=id_).first()
|
||
if not header:
|
||
return None
|
||
update_data = data.model_dump(exclude_unset=True)
|
||
if update_data:
|
||
if update_data.get("is_default"):
|
||
if header.app_user_id is not None:
|
||
await InvoiceHeader.filter(app_user_id=header.app_user_id).exclude(id=header.id).update(is_default=False)
|
||
await header.update_from_dict(update_data).save()
|
||
# 同步引用该抬头的发票基本信息
|
||
sync_fields = {
|
||
"company_name": header.company_name,
|
||
"tax_number": header.tax_number,
|
||
"register_address": header.register_address,
|
||
"register_phone": header.register_phone,
|
||
"bank_name": header.bank_name,
|
||
"bank_account": header.bank_account,
|
||
"email": header.email,
|
||
}
|
||
await Invoice.filter(header_id=header.id).update(**sync_fields)
|
||
return InvoiceHeaderOut.model_validate(header)
|
||
|
||
async def list(self, page: int = 1, page_size: int = 10, **filters) -> InvoiceList:
|
||
"""
|
||
获取发票列表(支持筛选与分页)
|
||
参数:
|
||
page: 页码
|
||
page_size: 每页数量
|
||
**filters: phone、company_name、tax_number、status、ticket_type、invoice_type、时间范围等
|
||
返回:
|
||
InvoiceList: 分页结果
|
||
"""
|
||
qs: QuerySet = self.model.all()
|
||
if filters.get("phone"):
|
||
qs = qs.filter(phone__icontains=filters["phone"])
|
||
if filters.get("company_name"):
|
||
qs = qs.filter(company_name__icontains=filters["company_name"])
|
||
if filters.get("tax_number"):
|
||
qs = qs.filter(tax_number__icontains=filters["tax_number"])
|
||
if filters.get("status"):
|
||
qs = qs.filter(status=filters["status"])
|
||
if filters.get("ticket_type"):
|
||
qs = qs.filter(ticket_type=filters["ticket_type"])
|
||
if filters.get("invoice_type"):
|
||
qs = qs.filter(invoice_type=filters["invoice_type"])
|
||
if filters.get("app_user_id"):
|
||
qs = qs.filter(app_user_id=filters["app_user_id"])
|
||
|
||
total = await qs.count()
|
||
rows = await qs.order_by("-created_at").offset((page - 1) * page_size).limit(page_size)
|
||
|
||
items = [
|
||
InvoiceOut(
|
||
id=row.id,
|
||
created_at=row.created_at.isoformat() if row.created_at else "",
|
||
ticket_type=row.ticket_type,
|
||
invoice_type=row.invoice_type,
|
||
phone=row.phone,
|
||
email=row.email,
|
||
company_name=row.company_name,
|
||
tax_number=row.tax_number,
|
||
register_address=row.register_address,
|
||
register_phone=row.register_phone,
|
||
bank_name=row.bank_name,
|
||
bank_account=row.bank_account,
|
||
status=row.status,
|
||
app_user_id=row.app_user_id,
|
||
header_id=row.header_id,
|
||
wechat=row.wechat,
|
||
)
|
||
for row in rows
|
||
]
|
||
|
||
return InvoiceList(items=items, total=total, page=page, page_size=page_size)
|
||
|
||
async def update_status(self, data: UpdateStatus) -> Optional[InvoiceOut]:
|
||
"""
|
||
更新发票状态
|
||
参数:
|
||
data: 包含 id 与目标状态
|
||
返回:
|
||
更新后的发票输出或 None
|
||
"""
|
||
inv = await self.model.filter(id=data.id).first()
|
||
if not inv:
|
||
return None
|
||
inv.status = data.status
|
||
await inv.save()
|
||
return await self.get_out(inv.id)
|
||
|
||
async def update_type(self, id_: int, data: UpdateType) -> Optional[InvoiceOut]:
|
||
"""
|
||
更新发票类型(电子/纸质、专票/普票)
|
||
参数:
|
||
id_: 发票ID
|
||
data: 类型更新数据
|
||
返回:
|
||
更新后的发票输出或 None
|
||
"""
|
||
inv = await self.model.filter(id=id_).first()
|
||
if not inv:
|
||
return None
|
||
inv.ticket_type = data.ticket_type
|
||
inv.invoice_type = data.invoice_type
|
||
await inv.save()
|
||
return await self.get_out(inv.id)
|
||
|
||
async def create_receipt(self, invoice_id: int, data: PaymentReceiptCreate) -> PaymentReceiptOut:
|
||
"""
|
||
上传付款凭证
|
||
参数:
|
||
invoice_id: 发票ID
|
||
data: 凭证创建数据
|
||
返回:
|
||
PaymentReceiptOut
|
||
"""
|
||
receipt = await PaymentReceipt.create(invoice_id=invoice_id, **data.model_dump())
|
||
return PaymentReceiptOut(
|
||
id=receipt.id,
|
||
url=receipt.url,
|
||
note=receipt.note,
|
||
verified=receipt.verified,
|
||
created_at=receipt.created_at.isoformat() if receipt.created_at else "",
|
||
extra=receipt.extra,
|
||
)
|
||
|
||
async def list_receipts(self, page: int = 1, page_size: int = 10, **filters) -> dict:
|
||
"""
|
||
对公转账记录列表
|
||
参数:
|
||
page: 页码
|
||
page_size: 每页数量
|
||
**filters: 提交时间范围、手机号、微信号、公司名称/税号、状态、开票类型等
|
||
返回:
|
||
dict: { items, total, page, page_size }
|
||
"""
|
||
qs = PaymentReceipt.all().prefetch_related("invoice")
|
||
# 通过关联发票进行筛选
|
||
if filters.get("phone"):
|
||
qs = qs.filter(invoice__phone__icontains=filters["phone"])
|
||
if filters.get("wechat"):
|
||
qs = qs.filter(invoice__wechat__icontains=filters["wechat"])
|
||
if filters.get("company_name"):
|
||
qs = qs.filter(invoice__company_name__icontains=filters["company_name"])
|
||
if filters.get("tax_number"):
|
||
qs = qs.filter(invoice__tax_number__icontains=filters["tax_number"])
|
||
if filters.get("status"):
|
||
qs = qs.filter(invoice__status=filters["status"])
|
||
if filters.get("ticket_type"):
|
||
qs = qs.filter(invoice__ticket_type=filters["ticket_type"])
|
||
if filters.get("invoice_type"):
|
||
qs = qs.filter(invoice__invoice_type=filters["invoice_type"])
|
||
|
||
# 时间区间筛选(凭证提交时间)
|
||
created_range = filters.get("created_at")
|
||
submitted_start = filters.get("submitted_start")
|
||
submitted_end = filters.get("submitted_end")
|
||
if created_range and isinstance(created_range, (list, tuple)) and len(created_range) == 2:
|
||
try:
|
||
# 前端可能传毫秒时间戳
|
||
start_ms = int(created_range[0])
|
||
end_ms = int(created_range[1])
|
||
from datetime import datetime
|
||
start_dt = datetime.fromtimestamp(start_ms / 1000)
|
||
end_dt = datetime.fromtimestamp(end_ms / 1000)
|
||
qs = qs.filter(created_at__gte=start_dt, created_at__lte=end_dt)
|
||
except Exception:
|
||
pass
|
||
else:
|
||
from datetime import datetime
|
||
def parse_time(v):
|
||
try:
|
||
iv = int(v)
|
||
return datetime.fromtimestamp(iv / 1000)
|
||
except Exception:
|
||
try:
|
||
# ISO 字符串
|
||
return datetime.fromisoformat(v)
|
||
except Exception:
|
||
return None
|
||
if submitted_start:
|
||
s_dt = parse_time(submitted_start)
|
||
if s_dt:
|
||
qs = qs.filter(created_at__gte=s_dt)
|
||
if submitted_end:
|
||
e_dt = parse_time(submitted_end)
|
||
if e_dt:
|
||
qs = qs.filter(created_at__lte=e_dt)
|
||
|
||
total = await qs.count()
|
||
rows = await qs.order_by("-created_at").offset((page - 1) * page_size).limit(page_size)
|
||
|
||
items = []
|
||
for r in rows:
|
||
inv = await r.invoice
|
||
urls = []
|
||
if isinstance(r.extra, list):
|
||
urls = [str(u) for u in r.extra if u]
|
||
elif isinstance(r.extra, dict):
|
||
v = r.extra.get("urls")
|
||
if isinstance(v, list):
|
||
urls = [str(u) for u in v if u]
|
||
if not urls:
|
||
urls = [r.url] if r.url else []
|
||
receipts = [{"id": r.id, "url": u, "note": r.note, "verified": r.verified} for u in urls]
|
||
items.append({
|
||
"id": r.id,
|
||
"invoice_id": getattr(inv, "id", None),
|
||
"submitted_at": r.created_at.isoformat() if r.created_at else "",
|
||
"receipt_uploaded_at": r.updated_at.isoformat() if getattr(r, "updated_at", None) else "",
|
||
"extra": r.extra,
|
||
"receipts": receipts,
|
||
"phone": inv.phone,
|
||
"wechat": inv.wechat,
|
||
"company_name": inv.company_name,
|
||
"tax_number": inv.tax_number,
|
||
"register_address": inv.register_address,
|
||
"register_phone": inv.register_phone,
|
||
"bank_name": inv.bank_name,
|
||
"bank_account": inv.bank_account,
|
||
"email": inv.email,
|
||
"ticket_type": inv.ticket_type,
|
||
"invoice_type": inv.invoice_type,
|
||
"status": inv.status,
|
||
})
|
||
|
||
return {"items": items, "total": total, "page": page, "page_size": page_size}
|
||
|
||
async def get_receipt_by_id(self, id_: int) -> Optional[dict]:
|
||
"""
|
||
对公转账记录详情
|
||
参数:
|
||
id_: 付款凭证ID
|
||
返回:
|
||
dict 或 None
|
||
"""
|
||
r = await PaymentReceipt.filter(id=id_).first()
|
||
if not r:
|
||
return None
|
||
inv = await r.invoice
|
||
urls = []
|
||
if isinstance(r.extra, list):
|
||
urls = [str(u) for u in r.extra if u]
|
||
elif isinstance(r.extra, dict):
|
||
v = r.extra.get("urls")
|
||
if isinstance(v, list):
|
||
urls = [str(u) for u in v if u]
|
||
if not urls:
|
||
urls = [r.url] if r.url else []
|
||
receipts = [{"id": r.id, "url": u, "note": r.note, "verified": r.verified} for u in urls]
|
||
return {
|
||
"id": r.id,
|
||
"invoice_id": getattr(inv, "id", None),
|
||
"submitted_at": r.created_at.isoformat() if r.created_at else "",
|
||
"receipt_uploaded_at": r.updated_at.isoformat() if getattr(r, "updated_at", None) else "",
|
||
"extra": r.extra,
|
||
"receipts": receipts,
|
||
"phone": inv.phone,
|
||
"wechat": inv.wechat,
|
||
"company_name": inv.company_name,
|
||
"tax_number": inv.tax_number,
|
||
"register_address": inv.register_address,
|
||
"register_phone": inv.register_phone,
|
||
"bank_name": inv.bank_name,
|
||
"bank_account": inv.bank_account,
|
||
"email": inv.email,
|
||
"ticket_type": inv.ticket_type,
|
||
"invoice_type": inv.invoice_type,
|
||
"status": inv.status,
|
||
}
|
||
|
||
async def get_out(self, id_: int) -> Optional[InvoiceOut]:
|
||
"""
|
||
根据ID返回发票输出对象
|
||
参数:
|
||
id_: 发票ID
|
||
返回:
|
||
InvoiceOut 或 None
|
||
"""
|
||
inv = await self.model.filter(id=id_).first()
|
||
if not inv:
|
||
return None
|
||
return InvoiceOut(
|
||
id=inv.id,
|
||
created_at=inv.created_at.isoformat() if inv.created_at else "",
|
||
ticket_type=inv.ticket_type,
|
||
invoice_type=inv.invoice_type,
|
||
phone=inv.phone,
|
||
email=inv.email,
|
||
company_name=inv.company_name,
|
||
tax_number=inv.tax_number,
|
||
register_address=inv.register_address,
|
||
register_phone=inv.register_phone,
|
||
bank_name=inv.bank_name,
|
||
bank_account=inv.bank_account,
|
||
status=inv.status,
|
||
app_user_id=inv.app_user_id,
|
||
header_id=inv.header_id,
|
||
wechat=inv.wechat,
|
||
)
|
||
|
||
|
||
invoice_controller = InvoiceController()
|