guzhi/app/services/email_client.py
邹方成 cc352d3184 feat: 重构后端服务并添加新功能
refactor: 优化API路由和响应模型
feat(admin): 添加App用户管理接口
feat(sms): 实现阿里云短信服务集成
feat(email): 添加SMTP邮件发送功能
feat(upload): 支持文件上传接口
feat(rate-limiter): 实现手机号限流器
fix: 修复计算步骤入库问题
docs: 更新API文档和测试计划
chore: 更新依赖和配置
2025-11-19 19:36:03 +08:00

49 lines
1.8 KiB
Python

import smtplib
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email import encoders
from typing import Optional
import httpx
from app.settings.config import settings
class EmailClient:
def send(self, to_email: str, subject: Optional[str], body: str, file_bytes: Optional[bytes], file_name: Optional[str], content_type: Optional[str]) -> dict:
if not settings.SMTP_HOST or not settings.SMTP_PORT or not settings.SMTP_FROM:
raise RuntimeError("SMTP 未配置")
msg = MIMEMultipart()
msg["From"] = settings.SMTP_FROM
msg["To"] = to_email
msg["Subject"] = subject or "估值服务通知"
msg.attach(MIMEText(body, "plain", "utf-8"))
if file_bytes and file_name:
part = MIMEBase("application", "octet-stream")
part.set_payload(file_bytes)
encoders.encode_base64(part)
part.add_header("Content-Disposition", f"attachment; filename=\"{file_name}\"")
msg.attach(part)
if settings.SMTP_TLS:
server = smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT)
server.starttls()
else:
server = smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT)
try:
if settings.SMTP_USERNAME and settings.SMTP_PASSWORD:
server.login(settings.SMTP_USERNAME, settings.SMTP_PASSWORD)
server.sendmail(settings.SMTP_FROM, [to_email], msg.as_string())
server.quit()
return {"status": "OK"}
except Exception as e:
try:
server.quit()
except Exception:
pass
return {"status": "FAIL", "error": str(e)}
email_client = EmailClient()