import smtplib from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email import encoders from typing import Optional, List, Tuple import httpx from app.settings.config import settings class EmailClient: def send(self, to_email: str, subject: Optional[str], body: str, file_bytes: Optional[bytes], file_name: Optional[str], content_type: Optional[str]) -> dict: if not settings.SMTP_HOST or not settings.SMTP_PORT or not settings.SMTP_FROM: raise RuntimeError("SMTP 未配置") msg = MIMEMultipart() msg["From"] = settings.SMTP_FROM msg["To"] = to_email msg["Subject"] = subject or "估值服务通知" msg.attach(MIMEText(body, "plain", "utf-8")) if file_bytes and file_name: part = MIMEBase("application", "octet-stream") part.set_payload(file_bytes) encoders.encode_base64(part) part.add_header("Content-Disposition", f"attachment; filename=\"{file_name}\"") msg.attach(part) if hasattr(self, "_extra_attachments") and isinstance(self._extra_attachments, list): for fb, fn in self._extra_attachments: part = MIMEBase("application", "octet-stream") part.set_payload(fb) encoders.encode_base64(part) part.add_header("Content-Disposition", f"attachment; filename=\"{fn}\"") msg.attach(part) if settings.SMTP_TLS: server = smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT, timeout=30) server.starttls() else: server = smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT, timeout=30) try: if settings.SMTP_USERNAME and settings.SMTP_PASSWORD: server.login(settings.SMTP_USERNAME, settings.SMTP_PASSWORD) server.sendmail(settings.SMTP_FROM, [to_email], msg.as_string()) server.quit() return {"status": "OK"} except Exception as e: try: server.quit() except Exception: pass if isinstance(e, smtplib.SMTPRecipientsRefused): return {"status": "FAIL", "error": "收件方地址不存在或暂时不可用"} return {"status": "FAIL", "error": str(e)} def send_many(self, to_email: str, subject: Optional[str], body: str, attachments: Optional[List[Tuple[bytes, str]]] = None) -> dict: self._extra_attachments = attachments or [] try: return self.send(to_email, subject, body, None, None, None) finally: self._extra_attachments = [] email_client = EmailClient()