guzhi/app/services/sms_store.py
邹方成 cc352d3184 feat: 重构后端服务并添加新功能
refactor: 优化API路由和响应模型
feat(admin): 添加App用户管理接口
feat(sms): 实现阿里云短信服务集成
feat(email): 添加SMTP邮件发送功能
feat(upload): 支持文件上传接口
feat(rate-limiter): 实现手机号限流器
fix: 修复计算步骤入库问题
docs: 更新API文档和测试计划
chore: 更新依赖和配置
2025-11-19 19:36:03 +08:00

144 lines
4.1 KiB
Python

import random
import time
from datetime import date
from typing import Dict, Optional, Tuple
class VerificationStore:
def __init__(self, code_ttl_seconds: int = 300, minute_window: int = 60, daily_limit: int = 10, max_failures: int = 5, lock_seconds: int = 3600) -> None:
"""验证码与限流存储
Args:
code_ttl_seconds: 验证码有效期秒数
minute_window: 同号分钟级限流窗口
daily_limit: 每日发送上限次数
max_failures: 最大失败次数后锁定
lock_seconds: 锁定时长秒数
Returns:
None
"""
self.code_ttl = code_ttl_seconds
self.minute_window = minute_window
self.daily_limit = daily_limit
self.max_failures = max_failures
self.lock_seconds = lock_seconds
self.codes: Dict[str, Tuple[str, float]] = {}
self.sends: Dict[str, Dict[str, float]] = {}
self.failures: Dict[str, Dict[str, float]] = {}
def generate_code(self) -> str:
"""生成6位数字验证码
Returns:
六位数字字符串
"""
return f"{random.randint(0, 999999):06d}"
def set_code(self, phone: str, code: str) -> None:
"""设置验证码与过期时间
Args:
phone: 手机号
code: 验证码
Returns:
None
"""
expires_at = time.time() + self.code_ttl
self.codes[phone] = (code, expires_at)
def get_code(self, phone: str) -> Optional[Tuple[str, float]]:
"""获取存储的验证码与过期时间
Args:
phone: 手机号
Returns:
元组(code, expires_at)或None
"""
return self.codes.get(phone)
def clear_code(self, phone: str) -> None:
"""清除验证码记录
Args:
phone: 手机号
Returns:
None
"""
self.codes.pop(phone, None)
def allow_send(self, phone: str) -> Tuple[bool, Optional[str]]:
"""校验是否允许发送验证码
Args:
phone: 手机号
Returns:
(允许, 拒绝原因)
"""
now = time.time()
dkey = date.today().isoformat()
info = self.sends.get(phone) or {"day": dkey, "count": 0.0, "last_ts": 0.0}
if info["day"] != dkey:
info = {"day": dkey, "count": 0.0, "last_ts": 0.0}
if now - info["last_ts"] < self.minute_window:
self.sends[phone] = info
return False, "发送频率过高"
if info["count"] >= float(self.daily_limit):
self.sends[phone] = info
return False, "今日发送次数已达上限"
info["last_ts"] = now
info["count"] = info["count"] + 1.0
self.sends[phone] = info
return True, None
def can_verify(self, phone: str) -> Tuple[bool, Optional[str]]:
"""校验是否允许验证
Args:
phone: 手机号
Returns:
(允许, 拒绝原因)
"""
now = time.time()
stat = self.failures.get(phone)
if stat and stat.get("lock_until", 0.0) > now:
return False, "尝试次数过多,已锁定"
return True, None
def record_verify_failure(self, phone: str) -> Tuple[int, bool]:
"""记录一次验证失败并判断是否触发锁定
Args:
phone: 手机号
Returns:
(失败次数, 是否锁定)
"""
now = time.time()
stat = self.failures.get(phone) or {"count": 0.0, "lock_until": 0.0}
if stat.get("lock_until", 0.0) > now:
return int(stat["count"]), True
stat["count"] = stat.get("count", 0.0) + 1.0
if int(stat["count"]) >= self.max_failures:
stat["lock_until"] = now + self.lock_seconds
self.failures[phone] = stat
return int(stat["count"]), stat["lock_until"] > now
def reset_failures(self, phone: str) -> None:
"""重置失败计数
Args:
phone: 手机号
Returns:
None
"""
self.failures.pop(phone, None)
store = VerificationStore()