guzhi/app/services/sms_store.py

158 lines
4.6 KiB
Python

import random
import time
from datetime import date
from typing import Dict, Optional, Tuple
from app.settings import settings
class VerificationStore:
def __init__(self, code_ttl_seconds: int = 300, minute_window: int = 60, daily_limit: int = 10, max_failures: int = 5, lock_seconds: int = 3600) -> None:
"""验证码与限流存储
Args:
code_ttl_seconds: 验证码有效期秒数
minute_window: 同号分钟级限流窗口
daily_limit: 每日发送上限次数
max_failures: 最大失败次数后锁定
lock_seconds: 锁定时长秒数
Returns:
None
"""
self.code_ttl = code_ttl_seconds
self.minute_window = minute_window
self.daily_limit = daily_limit
self.max_failures = max_failures
self.lock_seconds = lock_seconds
self.codes: Dict[str, Tuple[str, float]] = {}
self.sends: Dict[str, Dict[str, float]] = {}
self.failures: Dict[str, Dict[str, float]] = {}
self.verified: Dict[str, float] = {}
def generate_code(self) -> str:
"""生成数字验证码
Returns:
指定位数的数字字符串
"""
digits = int(getattr(settings, "SMS_CODE_DIGITS", 6) or 6)
max_val = (10 ** digits) - 1
return f"{random.randint(0, max_val):0{digits}d}"
def set_code(self, phone: str, code: str) -> None:
"""设置验证码与过期时间
Args:
phone: 手机号
code: 验证码
Returns:
None
"""
expires_at = time.time() + self.code_ttl
self.codes[phone] = (code, expires_at)
def get_code(self, phone: str) -> Optional[Tuple[str, float]]:
"""获取存储的验证码与过期时间
Args:
phone: 手机号
Returns:
元组(code, expires_at)或None
"""
return self.codes.get(phone)
def clear_code(self, phone: str) -> None:
"""清除验证码记录
Args:
phone: 手机号
Returns:
None
"""
self.codes.pop(phone, None)
def allow_send(self, phone: str) -> Tuple[bool, Optional[str]]:
"""校验是否允许发送验证码
Args:
phone: 手机号
Returns:
(允许, 拒绝原因)
"""
now = time.time()
dkey = date.today().isoformat()
info = self.sends.get(phone) or {"day": dkey, "count": 0.0, "last_ts": 0.0}
if info["day"] != dkey:
info = {"day": dkey, "count": 0.0, "last_ts": 0.0}
if now - info["last_ts"] < self.minute_window:
self.sends[phone] = info
return False, "发送频率过高"
if info["count"] >= float(self.daily_limit):
self.sends[phone] = info
return False, "今日发送次数已达上限"
info["last_ts"] = now
info["count"] = info["count"] + 1.0
self.sends[phone] = info
return True, None
def can_verify(self, phone: str) -> Tuple[bool, Optional[str]]:
"""校验是否允许验证
Args:
phone: 手机号
Returns:
(允许, 拒绝原因)
"""
now = time.time()
stat = self.failures.get(phone)
if stat and stat.get("lock_until", 0.0) > now:
return False, "尝试次数过多,已锁定"
return True, None
def record_verify_failure(self, phone: str) -> Tuple[int, bool]:
"""记录一次验证失败并判断是否触发锁定
Args:
phone: 手机号
Returns:
(失败次数, 是否锁定)
"""
now = time.time()
stat = self.failures.get(phone) or {"count": 0.0, "lock_until": 0.0}
if stat.get("lock_until", 0.0) > now:
return int(stat["count"]), True
stat["count"] = stat.get("count", 0.0) + 1.0
if int(stat["count"]) >= self.max_failures:
stat["lock_until"] = now + self.lock_seconds
self.failures[phone] = stat
return int(stat["count"]), stat["lock_until"] > now
def reset_failures(self, phone: str) -> None:
"""重置失败计数
Args:
phone: 手机号
Returns:
None
"""
self.failures.pop(phone, None)
def mark_verified(self, phone: str, ttl_seconds: int = 300) -> None:
until = time.time() + ttl_seconds
self.verified[phone] = until
def is_recently_verified(self, phone: str) -> bool:
until = self.verified.get(phone, 0.0)
return until > time.time()
store = VerificationStore()