package service import ( "context" "database/sql" "encoding/json" "errors" "fmt" "strings" "sync" "time" "github.com/Wei-Shaw/sub2api/internal/config" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" "github.com/google/uuid" "github.com/redis/go-redis/v9" "github.com/robfig/cron/v3" ) const ( opsCleanupJobName = "ops_cleanup" opsCleanupLeaderLockKeyDefault = "ops:cleanup:leader" opsCleanupLeaderLockTTLDefault = 30 * time.Minute ) var opsCleanupCronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) var opsCleanupReleaseScript = redis.NewScript(` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) end return 0 `) // OpsCleanupService periodically deletes old ops data to prevent unbounded DB growth. // // - Scheduling: 5-field cron spec (minute hour dom month dow). // - Multi-instance: best-effort Redis leader lock so only one node runs cleanup. // - Safety: deletes in batches to avoid long transactions. // // 附带:在 runCleanupOnce 末尾调用 ChannelMonitorService.RunDailyMaintenance, // 统一共享 cron schedule + leader lock + heartbeat,避免再引一套调度。 type OpsCleanupService struct { opsRepo OpsRepository db *sql.DB redisClient *redis.Client cfg *config.Config channelMonitorSvc *ChannelMonitorService settingRepo SettingRepository instanceID string // mu 守护 cron 实例切换 + effective 配置切换。 // 这里不再用 startOnce/stopOnce,是因为 Reload 需要"停旧 cron 重启新 cron", // 而 Once 一旦触发就无法再次执行;改为 started/stopped 布尔配合 mu。 mu sync.Mutex cron *cron.Cron started bool stopped bool effective config.OpsCleanupConfig warnNoRedisOnce sync.Once } func NewOpsCleanupService( opsRepo OpsRepository, db *sql.DB, redisClient *redis.Client, cfg *config.Config, channelMonitorSvc *ChannelMonitorService, settingRepo SettingRepository, ) *OpsCleanupService { return &OpsCleanupService{ opsRepo: opsRepo, db: db, redisClient: redisClient, cfg: cfg, channelMonitorSvc: channelMonitorSvc, settingRepo: settingRepo, instanceID: uuid.NewString(), } } // Start 首次启动 cron 调度。Enabled / Schedule 由 effective 配置决定(settings 优先 cfg)。 // 重复调用幂等。 func (s *OpsCleanupService) Start() { if s == nil { return } if s.cfg != nil && !s.cfg.Ops.Enabled { return } if s.opsRepo == nil || s.db == nil { logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] not started (missing deps)") return } s.mu.Lock() defer s.mu.Unlock() if s.started || s.stopped { return } s.started = true if err := s.applyScheduleLocked(context.Background()); err != nil { logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] not started: %v", err) } } // Stop 关闭 cron。幂等。 func (s *OpsCleanupService) Stop() { if s == nil { return } s.mu.Lock() defer s.mu.Unlock() if s.stopped { return } s.stopped = true s.stopCronLocked() } // stopCronLocked 停掉当前 cron 实例(带 3s 超时)。调用方持锁。 func (s *OpsCleanupService) stopCronLocked() { if s.cron == nil { return } ctx := s.cron.Stop() select { case <-ctx.Done(): case <-time.After(opsCleanupCronStopTimeout): logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cron stop timed out") } s.cron = nil } // applyScheduleLocked 重新计算 effective 配置并按其 schedule 重建 cron。调用方持锁。 // 若 effective.Enabled=false(用户在 UI 关闭清理),停旧 cron 后直接返回,不创建新 cron。 func (s *OpsCleanupService) applyScheduleLocked(ctx context.Context) error { s.computeEffectiveLocked(ctx) s.stopCronLocked() if !s.effective.Enabled { logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cron disabled by settings") return nil } schedule := strings.TrimSpace(s.effective.Schedule) if schedule == "" { schedule = opsCleanupDefaultSchedule } loc := time.Local if s.cfg != nil && strings.TrimSpace(s.cfg.Timezone) != "" { if parsed, err := time.LoadLocation(strings.TrimSpace(s.cfg.Timezone)); err == nil && parsed != nil { loc = parsed } } c := cron.New(cron.WithParser(opsCleanupCronParser), cron.WithLocation(loc)) if _, err := c.AddFunc(schedule, func() { s.runScheduled() }); err != nil { return fmt.Errorf("invalid schedule %q: %w", schedule, err) } c.Start() s.cron = c logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] scheduled (schedule=%q tz=%s retention_days=err:%d/min:%d/hour:%d)", schedule, loc.String(), s.effective.ErrorLogRetentionDays, s.effective.MinuteMetricsRetentionDays, s.effective.HourlyMetricsRetentionDays, ) return nil } // Reload 重新读取 ops_advanced_settings.data_retention 并按新配置重建 cron。 // 适用于 admin 在 UI 修改清理设置后立即生效(schedule / enabled 改动需要 Reload; // retention 改动 runScheduled 顶部也会刷新,下一次触发即生效)。 // 若 service 还未 Start 或已 Stop,Reload 不做任何事。 func (s *OpsCleanupService) Reload(ctx context.Context) error { if s == nil { return nil } s.mu.Lock() defer s.mu.Unlock() if !s.started || s.stopped { return nil } return s.applyScheduleLocked(ctx) } // computeEffectiveLocked 计算"生效配置"并写入 s.effective。调用方持锁。 // // 优先级:UI 写入的 settings.ops_advanced_settings.data_retention(权威)覆盖 cfg.Ops.Cleanup 的副本。 // - Enabled:settings 直接覆盖 // - Schedule:settings 非空时覆盖,否则保留 cfg // - *RetentionDays:settings >=0 时覆盖(包括 0=TRUNCATE),<0 沿用 cfg // // 若 settings 表无该 key(ErrSettingNotFound)或解析失败,整体 fallback 到 cfg.Ops.Cleanup。 func (s *OpsCleanupService) computeEffectiveLocked(ctx context.Context) { base := config.OpsCleanupConfig{} if s.cfg != nil { base = s.cfg.Ops.Cleanup } defer func() { s.effective = base }() if s.settingRepo == nil { return } if ctx == nil { ctx = context.Background() } raw, err := s.settingRepo.GetValue(ctx, SettingKeyOpsAdvancedSettings) if err != nil { if !errors.Is(err, ErrSettingNotFound) { logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] read advanced settings failed, using cfg: %v", err) } return } var adv OpsAdvancedSettings if err := json.Unmarshal([]byte(raw), &adv); err != nil { logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] parse advanced settings failed, using cfg: %v", err) return } dr := adv.DataRetention base.Enabled = dr.CleanupEnabled if sched := strings.TrimSpace(dr.CleanupSchedule); sched != "" { base.Schedule = sched } if dr.ErrorLogRetentionDays >= 0 { base.ErrorLogRetentionDays = dr.ErrorLogRetentionDays } if dr.MinuteMetricsRetentionDays >= 0 { base.MinuteMetricsRetentionDays = dr.MinuteMetricsRetentionDays } if dr.HourlyMetricsRetentionDays >= 0 { base.HourlyMetricsRetentionDays = dr.HourlyMetricsRetentionDays } } // snapshotEffective 取一份 effective 副本(runCleanupOnce 等读路径使用)。 func (s *OpsCleanupService) snapshotEffective() config.OpsCleanupConfig { s.mu.Lock() defer s.mu.Unlock() return s.effective } // refreshEffectiveBeforeRun 在 cron 触发时刷新 effective,让 retention 改动当次即生效。 // schedule 改动不影响当次(cron 调度由库管理,需要 Reload 才换 schedule)。 func (s *OpsCleanupService) refreshEffectiveBeforeRun(ctx context.Context) { s.mu.Lock() defer s.mu.Unlock() s.computeEffectiveLocked(ctx) } func (s *OpsCleanupService) runScheduled() { if s == nil || s.db == nil || s.opsRepo == nil { return } ctx, cancel := context.WithTimeout(context.Background(), opsCleanupRunTimeout) defer cancel() // 让 retention 改动当次生效(schedule/enabled 改动需要 Reload)。 s.refreshEffectiveBeforeRun(ctx) release, ok := s.tryAcquireLeaderLock(ctx) if !ok { return } if release != nil { defer release() } startedAt := time.Now().UTC() runAt := startedAt counts, err := s.runCleanupOnce(ctx) if err != nil { s.recordHeartbeatError(runAt, time.Since(startedAt), err) logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cleanup failed: %v", err) return } s.recordHeartbeatSuccess(runAt, time.Since(startedAt), counts) logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cleanup complete: %s", counts) } func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDeletedCounts, error) { out := opsCleanupDeletedCounts{} if s == nil || s.db == nil || s.cfg == nil { return out, nil } effective := s.snapshotEffective() now := time.Now().UTC() targets := []opsCleanupTarget{ {effective.ErrorLogRetentionDays, "ops_error_logs", "created_at", false, &out.errorLogs}, {effective.ErrorLogRetentionDays, "ops_retry_attempts", "created_at", false, &out.retryAttempts}, {effective.ErrorLogRetentionDays, "ops_alert_events", "created_at", false, &out.alertEvents}, {effective.ErrorLogRetentionDays, "ops_system_logs", "created_at", false, &out.systemLogs}, {effective.ErrorLogRetentionDays, "ops_system_log_cleanup_audits", "created_at", false, &out.logAudits}, {effective.MinuteMetricsRetentionDays, "ops_system_metrics", "created_at", false, &out.systemMetrics}, {effective.HourlyMetricsRetentionDays, "ops_metrics_hourly", "bucket_start", false, &out.hourlyPreagg}, {effective.HourlyMetricsRetentionDays, "ops_metrics_daily", "bucket_date", true, &out.dailyPreagg}, } for _, t := range targets { cutoff, truncate, ok := opsCleanupPlan(now, t.retentionDays) if !ok { continue } n, err := opsCleanupRunOne(ctx, s.db, truncate, cutoff, t.table, t.timeCol, t.castDate, opsCleanupBatchSize) if err != nil { return out, err } *t.counter = n } // Channel monitor 每日维护(聚合昨日明细 + 软删过期明细/聚合)。 // 失败只记日志,不影响 ops 清理的成功状态(与 ops 各步骤风格一致); // 维护本身已经把每步错误打到 slog,heartbeat result 不再分项记录。 if s.channelMonitorSvc != nil { if err := s.channelMonitorSvc.RunDailyMaintenance(ctx); err != nil { logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] channel monitor maintenance failed: %v", err) } } return out, nil } func (s *OpsCleanupService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) { if s == nil { return nil, false } // In simple run mode, assume single instance. if s.cfg != nil && s.cfg.RunMode == config.RunModeSimple { return nil, true } key := opsCleanupLeaderLockKeyDefault ttl := opsCleanupLeaderLockTTLDefault // Prefer Redis leader lock when available, but avoid stampeding the DB when Redis is flaky by // falling back to a DB advisory lock. if s.redisClient != nil { ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result() if err == nil { if !ok { return nil, false } return func() { _, _ = opsCleanupReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result() }, true } // Redis error: fall back to DB advisory lock. s.warnNoRedisOnce.Do(func() { logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] leader lock SetNX failed; falling back to DB advisory lock: %v", err) }) } else { s.warnNoRedisOnce.Do(func() { logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] redis not configured; using DB advisory lock") }) } release, ok := tryAcquireDBAdvisoryLock(ctx, s.db, hashAdvisoryLockID(key)) if !ok { return nil, false } return release, true } func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration, counts opsCleanupDeletedCounts) { if s == nil || s.opsRepo == nil { return } now := time.Now().UTC() durMs := duration.Milliseconds() result := truncateString(counts.String(), 2048) ctx, cancel := context.WithTimeout(context.Background(), opsCleanupHeartbeatTimeout) defer cancel() _ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ JobName: opsCleanupJobName, LastRunAt: &runAt, LastSuccessAt: &now, LastDurationMs: &durMs, LastResult: &result, }) } func (s *OpsCleanupService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) { if s == nil || s.opsRepo == nil || err == nil { return } now := time.Now().UTC() durMs := duration.Milliseconds() msg := truncateString(err.Error(), 2048) ctx, cancel := context.WithTimeout(context.Background(), opsCleanupHeartbeatTimeout) defer cancel() _ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ JobName: opsCleanupJobName, LastRunAt: &runAt, LastErrorAt: &now, LastError: &msg, LastDurationMs: &durMs, }) }