feat(交易记录): 新增交易记录管理页面与API接口 feat(上传): 添加统一上传接口支持自动识别文件类型 feat(用户管理): 为用户模型添加备注字段并更新相关接口 feat(邮件): 实现SMTP邮件发送功能并添加测试脚本 feat(短信): 增强短信服务配置灵活性与日志记录 fix(发票): 修复发票列表时间筛选功能 fix(nginx): 调整上传大小限制与超时配置 docs: 添加多个功能模块的说明文档 docs(估值): 补充估值计算流程与API提交数据说明 chore: 更新依赖与Docker镜像版本
103 lines
3.2 KiB
Python
103 lines
3.2 KiB
Python
import os
|
||
import smtplib
|
||
from email.mime.multipart import MIMEMultipart
|
||
from email.mime.text import MIMEText
|
||
from typing import Dict, Optional
|
||
|
||
|
||
def _parse_bool(value: Optional[str]) -> bool:
|
||
"""
|
||
功能: 将环境变量中的布尔字符串解析为布尔值
|
||
参数: value (Optional[str]) - 环境变量字符串值, 如 "true", "1", "yes"
|
||
返回: bool - 解析后的布尔值
|
||
"""
|
||
if value is None:
|
||
return False
|
||
return str(value).strip().lower() in {"1", "true", "yes", "y"}
|
||
|
||
|
||
def get_smtp_config() -> Dict[str, Optional[str]]:
|
||
"""
|
||
功能: 从环境变量读取SMTP配置并返回配置字典
|
||
参数: 无
|
||
返回: Dict[str, Optional[str]] - 包含host、port、from、username、password、tls等键的配置
|
||
"""
|
||
host = os.environ.get("SMTP_HOST", "smtp.qiye.aliyun.com")
|
||
port_str = os.environ.get("SMTP_PORT", "465")
|
||
from_addr = os.environ.get("SMTP_FROM","value@cdcee.net")
|
||
username = os.environ.get("SMTP_USERNAME","value@cdcee.net")
|
||
password = os.environ.get("SMTP_PASSWORD","PPXbILdGlRCn2VOx")
|
||
tls = _parse_bool(os.environ.get("SMTP_TLS"))
|
||
|
||
port = None
|
||
if port_str:
|
||
try:
|
||
port = int(port_str)
|
||
except Exception:
|
||
port = None
|
||
|
||
return {
|
||
"host": host,
|
||
"port": port,
|
||
"from": from_addr,
|
||
"username": username,
|
||
"password": password,
|
||
"tls": tls,
|
||
}
|
||
|
||
|
||
def send_test_email(to_email: str, subject: Optional[str], body: str) -> Dict[str, str]:
|
||
"""
|
||
功能: 使用SMTP配置发送测试邮件到指定邮箱
|
||
参数: to_email (str) - 收件人邮箱; subject (Optional[str]) - 邮件主题; body (str) - 邮件正文内容
|
||
返回: Dict[str, str] - 发送结果字典, 包含status("OK"/"FAIL")与error(失败信息)
|
||
"""
|
||
cfg = get_smtp_config()
|
||
if not cfg["host"] or not cfg["port"] or not cfg["from"]:
|
||
return {"status": "FAIL", "error": "SMTP 未配置: 需设置 SMTP_HOST/SMTP_PORT/SMTP_FROM"}
|
||
|
||
msg = MIMEMultipart()
|
||
msg["From"] = cfg["from"]
|
||
msg["To"] = to_email
|
||
msg["Subject"] = subject or "估值服务通知"
|
||
msg.attach(MIMEText(body, "plain", "utf-8"))
|
||
|
||
server = None
|
||
try:
|
||
if cfg["tls"]:
|
||
server = smtplib.SMTP(cfg["host"], cfg["port"], timeout=30)
|
||
server.starttls()
|
||
else:
|
||
server = smtplib.SMTP_SSL(cfg["host"], cfg["port"], timeout=30)
|
||
|
||
if cfg["username"] and cfg["password"]:
|
||
server.login(cfg["username"], cfg["password"])
|
||
|
||
server.sendmail(cfg["from"], [to_email], msg.as_string())
|
||
server.quit()
|
||
return {"status": "OK"}
|
||
except Exception as e:
|
||
try:
|
||
if server:
|
||
server.quit()
|
||
except Exception:
|
||
pass
|
||
return {"status": "FAIL", "error": str(e)}
|
||
|
||
|
||
if __name__ == "__main__":
|
||
to = "zfc9393@163.com"
|
||
subject = "测试邮件"
|
||
body = "这是一封测试邮件,用于验证SMTP配置。"
|
||
|
||
cfg = get_smtp_config()
|
||
print({
|
||
"host": cfg["host"],
|
||
"port": cfg["port"],
|
||
"from": cfg["from"],
|
||
"username_set": bool(cfg["username"]),
|
||
"password_set": bool(cfg["password"]),
|
||
"tls": cfg["tls"],
|
||
})
|
||
result = send_test_email(to, subject, body)
|
||
print(result) |