refactor(ops-cleanup): 拆分 executor + table-driven + 提取常量 + 补测试
代码审查反馈: 1. 文件行数超标:ops_cleanup_service.go 594→413 行。 拆 opsCleanupPlan / deleteOldRowsByID / truncateOpsTable / isMissingRelationError + counts struct 到 ops_cleanup_executor.go (164 行)。 2. runCleanupOnce 89 行→30 行(table-driven): 用 []opsCleanupTarget 循环替代三组重复的 opsCleanupPlan → runOne → assign。 3. 魔法值提取常量: opsCleanupDefaultSchedule / opsCleanupBatchSize / opsCleanupCronStopTimeout / opsCleanupRunTimeout / opsCleanupHeartbeatTimeout。 ops_settings.go 中 "0 2 * * *" 也统一引用 opsCleanupDefaultSchedule。 4. 补 5 个缺失测试: - Reload 未 Start 时 no-op - Reload 已 Stop 后 no-op - cleanupReloader==nil 时 Update 不 panic - Start 重复调用幂等 - refreshEffectiveBeforeRun 正确更新 snapshot
This commit is contained in:
parent
c4598aa9b6
commit
d218b6c2aa
164
backend/internal/service/ops_cleanup_executor.go
Normal file
164
backend/internal/service/ops_cleanup_executor.go
Normal file
@ -0,0 +1,164 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
opsCleanupDefaultSchedule = "0 2 * * *"
|
||||||
|
opsCleanupBatchSize = 5000
|
||||||
|
opsCleanupCronStopTimeout = 3 * time.Second
|
||||||
|
opsCleanupRunTimeout = 30 * time.Minute
|
||||||
|
opsCleanupHeartbeatTimeout = 2 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
type opsCleanupTarget struct {
|
||||||
|
retentionDays int
|
||||||
|
table string
|
||||||
|
timeCol string
|
||||||
|
castDate bool
|
||||||
|
counter *int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type opsCleanupDeletedCounts struct {
|
||||||
|
errorLogs int64
|
||||||
|
retryAttempts int64
|
||||||
|
alertEvents int64
|
||||||
|
systemLogs int64
|
||||||
|
logAudits int64
|
||||||
|
systemMetrics int64
|
||||||
|
hourlyPreagg int64
|
||||||
|
dailyPreagg int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c opsCleanupDeletedCounts) String() string {
|
||||||
|
return fmt.Sprintf(
|
||||||
|
"error_logs=%d retry_attempts=%d alert_events=%d system_logs=%d log_audits=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d",
|
||||||
|
c.errorLogs,
|
||||||
|
c.retryAttempts,
|
||||||
|
c.alertEvents,
|
||||||
|
c.systemLogs,
|
||||||
|
c.logAudits,
|
||||||
|
c.systemMetrics,
|
||||||
|
c.hourlyPreagg,
|
||||||
|
c.dailyPreagg,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// opsCleanupPlan 把"保留天数"翻译成具体的清理动作。
|
||||||
|
// - days < 0 → 跳过该项清理(ok=false),保留兼容老数据
|
||||||
|
// - days == 0 → TRUNCATE TABLE(O(1) 全清),truncate=true
|
||||||
|
// - days > 0 → 批量 DELETE 早于 now-N天 的行,cutoff = now - N 天
|
||||||
|
func opsCleanupPlan(now time.Time, days int) (cutoff time.Time, truncate, ok bool) {
|
||||||
|
if days < 0 {
|
||||||
|
return time.Time{}, false, false
|
||||||
|
}
|
||||||
|
if days == 0 {
|
||||||
|
return time.Time{}, true, true
|
||||||
|
}
|
||||||
|
return now.AddDate(0, 0, -days), false, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func opsCleanupRunOne(
|
||||||
|
ctx context.Context,
|
||||||
|
db *sql.DB,
|
||||||
|
truncate bool,
|
||||||
|
cutoff time.Time,
|
||||||
|
table, timeCol string,
|
||||||
|
castDate bool,
|
||||||
|
batchSize int,
|
||||||
|
) (int64, error) {
|
||||||
|
if truncate {
|
||||||
|
return truncateOpsTable(ctx, db, table)
|
||||||
|
}
|
||||||
|
return deleteOldRowsByID(ctx, db, table, timeCol, cutoff, batchSize, castDate)
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteOldRowsByID(
|
||||||
|
ctx context.Context,
|
||||||
|
db *sql.DB,
|
||||||
|
table string,
|
||||||
|
timeColumn string,
|
||||||
|
cutoff time.Time,
|
||||||
|
batchSize int,
|
||||||
|
castCutoffToDate bool,
|
||||||
|
) (int64, error) {
|
||||||
|
if db == nil {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
if batchSize <= 0 {
|
||||||
|
batchSize = opsCleanupBatchSize
|
||||||
|
}
|
||||||
|
|
||||||
|
where := fmt.Sprintf("%s < $1", timeColumn)
|
||||||
|
if castCutoffToDate {
|
||||||
|
where = fmt.Sprintf("%s < $1::date", timeColumn)
|
||||||
|
}
|
||||||
|
|
||||||
|
q := fmt.Sprintf(`
|
||||||
|
WITH batch AS (
|
||||||
|
SELECT id FROM %s
|
||||||
|
WHERE %s
|
||||||
|
ORDER BY id
|
||||||
|
LIMIT $2
|
||||||
|
)
|
||||||
|
DELETE FROM %s
|
||||||
|
WHERE id IN (SELECT id FROM batch)
|
||||||
|
`, table, where, table)
|
||||||
|
|
||||||
|
var total int64
|
||||||
|
for {
|
||||||
|
res, err := db.ExecContext(ctx, q, cutoff, batchSize)
|
||||||
|
if err != nil {
|
||||||
|
if isMissingRelationError(err) {
|
||||||
|
return total, nil
|
||||||
|
}
|
||||||
|
return total, err
|
||||||
|
}
|
||||||
|
affected, err := res.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return total, err
|
||||||
|
}
|
||||||
|
total += affected
|
||||||
|
if affected == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return total, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// truncateOpsTable 用 TRUNCATE TABLE 清空指定表,先 SELECT COUNT(*) 取得清空前行数用于 heartbeat。
|
||||||
|
func truncateOpsTable(ctx context.Context, db *sql.DB, table string) (int64, error) {
|
||||||
|
if db == nil {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
var count int64
|
||||||
|
if err := db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)).Scan(&count); err != nil {
|
||||||
|
if isMissingRelationError(err) {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
return 0, fmt.Errorf("count %s: %w", table, err)
|
||||||
|
}
|
||||||
|
if count == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
if _, err := db.ExecContext(ctx, fmt.Sprintf("TRUNCATE TABLE %s", table)); err != nil {
|
||||||
|
if isMissingRelationError(err) {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
return 0, fmt.Errorf("truncate %s: %w", table, err)
|
||||||
|
}
|
||||||
|
return count, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func isMissingRelationError(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
s := strings.ToLower(err.Error())
|
||||||
|
return strings.Contains(s, "does not exist") && strings.Contains(s, "relation")
|
||||||
|
}
|
||||||
@ -194,3 +194,64 @@ func TestUpdateOpsAdvancedSettings_TriggersReload(t *testing.T) {
|
|||||||
t.Fatalf("expected reloader.Reload called once, got %d", reloader.calls)
|
t.Fatalf("expected reloader.Reload called once, got %d", reloader.calls)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReload_BeforeStart_IsNoop(t *testing.T) {
|
||||||
|
svc := &OpsCleanupService{}
|
||||||
|
if err := svc.Reload(context.Background()); err != nil {
|
||||||
|
t.Fatalf("Reload before Start should return nil, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReload_AfterStop_IsNoop(t *testing.T) {
|
||||||
|
svc := &OpsCleanupService{started: true, stopped: true}
|
||||||
|
if err := svc.Reload(context.Background()); err != nil {
|
||||||
|
t.Fatalf("Reload after Stop should return nil, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdateOpsAdvancedSettings_NilReloader_NoPanic(t *testing.T) {
|
||||||
|
repo := newRuntimeSettingRepoStub()
|
||||||
|
svc := &OpsService{settingRepo: repo}
|
||||||
|
// cleanupReloader intentionally nil
|
||||||
|
|
||||||
|
cfg := defaultOpsAdvancedSettings()
|
||||||
|
cfg.DataRetention.ErrorLogRetentionDays = 7
|
||||||
|
|
||||||
|
// should not panic
|
||||||
|
if _, err := svc.UpdateOpsAdvancedSettings(context.Background(), cfg); err != nil {
|
||||||
|
t.Fatalf("update with nil reloader: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStart_IdempotentSecondCall(t *testing.T) {
|
||||||
|
svc := &OpsCleanupService{started: true}
|
||||||
|
svc.Start() // second call should be noop, not panic
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRefreshEffectiveBeforeRun_UpdatesSnapshot(t *testing.T) {
|
||||||
|
repo := newRuntimeSettingRepoStub()
|
||||||
|
base := config.OpsCleanupConfig{
|
||||||
|
Enabled: true,
|
||||||
|
Schedule: "0 2 * * *",
|
||||||
|
ErrorLogRetentionDays: 30,
|
||||||
|
}
|
||||||
|
svc := makeOverlayService(repo, base)
|
||||||
|
svc.computeEffectiveLocked(context.Background())
|
||||||
|
|
||||||
|
if svc.effective.ErrorLogRetentionDays != 30 {
|
||||||
|
t.Fatalf("initial retention should be 30, got %d", svc.effective.ErrorLogRetentionDays)
|
||||||
|
}
|
||||||
|
|
||||||
|
// simulate UI change
|
||||||
|
writeAdvancedSettings(t, repo, OpsDataRetentionSettings{
|
||||||
|
CleanupEnabled: true,
|
||||||
|
CleanupSchedule: "0 * * * *",
|
||||||
|
ErrorLogRetentionDays: 7,
|
||||||
|
})
|
||||||
|
|
||||||
|
svc.refreshEffectiveBeforeRun(context.Background())
|
||||||
|
snap := svc.snapshotEffective()
|
||||||
|
if snap.ErrorLogRetentionDays != 7 {
|
||||||
|
t.Fatalf("after refresh, retention should be 7, got %d", snap.ErrorLogRetentionDays)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -129,7 +129,7 @@ func (s *OpsCleanupService) stopCronLocked() {
|
|||||||
ctx := s.cron.Stop()
|
ctx := s.cron.Stop()
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
case <-time.After(3 * time.Second):
|
case <-time.After(opsCleanupCronStopTimeout):
|
||||||
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cron stop timed out")
|
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cron stop timed out")
|
||||||
}
|
}
|
||||||
s.cron = nil
|
s.cron = nil
|
||||||
@ -148,7 +148,7 @@ func (s *OpsCleanupService) applyScheduleLocked(ctx context.Context) error {
|
|||||||
|
|
||||||
schedule := strings.TrimSpace(s.effective.Schedule)
|
schedule := strings.TrimSpace(s.effective.Schedule)
|
||||||
if schedule == "" {
|
if schedule == "" {
|
||||||
schedule = "0 2 * * *"
|
schedule = opsCleanupDefaultSchedule
|
||||||
}
|
}
|
||||||
|
|
||||||
loc := time.Local
|
loc := time.Local
|
||||||
@ -261,7 +261,7 @@ func (s *OpsCleanupService) runScheduled() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
|
ctx, cancel := context.WithTimeout(context.Background(), opsCleanupRunTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// 让 retention 改动当次生效(schedule/enabled 改动需要 Reload)。
|
// 让 retention 改动当次生效(schedule/enabled 改动需要 Reload)。
|
||||||
@ -288,50 +288,6 @@ func (s *OpsCleanupService) runScheduled() {
|
|||||||
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cleanup complete: %s", counts)
|
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cleanup complete: %s", counts)
|
||||||
}
|
}
|
||||||
|
|
||||||
type opsCleanupDeletedCounts struct {
|
|
||||||
errorLogs int64
|
|
||||||
retryAttempts int64
|
|
||||||
alertEvents int64
|
|
||||||
systemLogs int64
|
|
||||||
logAudits int64
|
|
||||||
systemMetrics int64
|
|
||||||
hourlyPreagg int64
|
|
||||||
dailyPreagg int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c opsCleanupDeletedCounts) String() string {
|
|
||||||
return fmt.Sprintf(
|
|
||||||
"error_logs=%d retry_attempts=%d alert_events=%d system_logs=%d log_audits=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d",
|
|
||||||
c.errorLogs,
|
|
||||||
c.retryAttempts,
|
|
||||||
c.alertEvents,
|
|
||||||
c.systemLogs,
|
|
||||||
c.logAudits,
|
|
||||||
c.systemMetrics,
|
|
||||||
c.hourlyPreagg,
|
|
||||||
c.dailyPreagg,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// opsCleanupPlan 把"保留天数"翻译成具体的清理动作。
|
|
||||||
// - days < 0 → 跳过该项清理(ok=false),保留兼容老数据
|
|
||||||
// - days == 0 → TRUNCATE TABLE(O(1) 全清),truncate=true
|
|
||||||
// - days > 0 → 批量 DELETE 早于 now-N天 的行,cutoff = now - N 天
|
|
||||||
//
|
|
||||||
// 之所以 days==0 走 TRUNCATE 而非"now+24h cutoff + DELETE":
|
|
||||||
// - 速度从 O(N) 降到 O(1),对百万行级表毫秒完成
|
|
||||||
// - 无 WAL 写入、无后续 VACUUM 压力
|
|
||||||
// - 这些 ops 表只有 cleanup 任务自己写,TRUNCATE 的 ACCESS EXCLUSIVE 锁影响可忽略
|
|
||||||
func opsCleanupPlan(now time.Time, days int) (cutoff time.Time, truncate, ok bool) {
|
|
||||||
if days < 0 {
|
|
||||||
return time.Time{}, false, false
|
|
||||||
}
|
|
||||||
if days == 0 {
|
|
||||||
return time.Time{}, true, true
|
|
||||||
}
|
|
||||||
return now.AddDate(0, 0, -days), false, true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDeletedCounts, error) {
|
func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDeletedCounts, error) {
|
||||||
out := opsCleanupDeletedCounts{}
|
out := opsCleanupDeletedCounts{}
|
||||||
if s == nil || s.db == nil || s.cfg == nil {
|
if s == nil || s.db == nil || s.cfg == nil {
|
||||||
@ -339,75 +295,29 @@ func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDelet
|
|||||||
}
|
}
|
||||||
|
|
||||||
effective := s.snapshotEffective()
|
effective := s.snapshotEffective()
|
||||||
|
|
||||||
batchSize := 5000
|
|
||||||
|
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
|
|
||||||
// runOne 把"truncate? cutoff? batched delete?"封装到一处,
|
targets := []opsCleanupTarget{
|
||||||
// 让三组清理(错误日志类 / 分钟指标 / 小时+日预聚合)调用方只关心表名和列名。
|
{effective.ErrorLogRetentionDays, "ops_error_logs", "created_at", false, &out.errorLogs},
|
||||||
runOne := func(truncate bool, cutoff time.Time, table, timeCol string, castDate bool) (int64, error) {
|
{effective.ErrorLogRetentionDays, "ops_retry_attempts", "created_at", false, &out.retryAttempts},
|
||||||
if truncate {
|
{effective.ErrorLogRetentionDays, "ops_alert_events", "created_at", false, &out.alertEvents},
|
||||||
return truncateOpsTable(ctx, s.db, table)
|
{effective.ErrorLogRetentionDays, "ops_system_logs", "created_at", false, &out.systemLogs},
|
||||||
}
|
{effective.ErrorLogRetentionDays, "ops_system_log_cleanup_audits", "created_at", false, &out.logAudits},
|
||||||
return deleteOldRowsByID(ctx, s.db, table, timeCol, cutoff, batchSize, castDate)
|
{effective.MinuteMetricsRetentionDays, "ops_system_metrics", "created_at", false, &out.systemMetrics},
|
||||||
|
{effective.HourlyMetricsRetentionDays, "ops_metrics_hourly", "bucket_start", false, &out.hourlyPreagg},
|
||||||
|
{effective.HourlyMetricsRetentionDays, "ops_metrics_daily", "bucket_date", true, &out.dailyPreagg},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Error-like tables: error logs / retry attempts / alert events / system logs / cleanup audits.
|
for _, t := range targets {
|
||||||
if cutoff, truncate, ok := opsCleanupPlan(now, effective.ErrorLogRetentionDays); ok {
|
cutoff, truncate, ok := opsCleanupPlan(now, t.retentionDays)
|
||||||
n, err := runOne(truncate, cutoff, "ops_error_logs", "created_at", false)
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
n, err := opsCleanupRunOne(ctx, s.db, truncate, cutoff, t.table, t.timeCol, t.castDate, opsCleanupBatchSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return out, err
|
return out, err
|
||||||
}
|
}
|
||||||
out.errorLogs = n
|
*t.counter = n
|
||||||
|
|
||||||
n, err = runOne(truncate, cutoff, "ops_retry_attempts", "created_at", false)
|
|
||||||
if err != nil {
|
|
||||||
return out, err
|
|
||||||
}
|
|
||||||
out.retryAttempts = n
|
|
||||||
|
|
||||||
n, err = runOne(truncate, cutoff, "ops_alert_events", "created_at", false)
|
|
||||||
if err != nil {
|
|
||||||
return out, err
|
|
||||||
}
|
|
||||||
out.alertEvents = n
|
|
||||||
|
|
||||||
n, err = runOne(truncate, cutoff, "ops_system_logs", "created_at", false)
|
|
||||||
if err != nil {
|
|
||||||
return out, err
|
|
||||||
}
|
|
||||||
out.systemLogs = n
|
|
||||||
|
|
||||||
n, err = runOne(truncate, cutoff, "ops_system_log_cleanup_audits", "created_at", false)
|
|
||||||
if err != nil {
|
|
||||||
return out, err
|
|
||||||
}
|
|
||||||
out.logAudits = n
|
|
||||||
}
|
|
||||||
|
|
||||||
// Minute-level metrics snapshots.
|
|
||||||
if cutoff, truncate, ok := opsCleanupPlan(now, effective.MinuteMetricsRetentionDays); ok {
|
|
||||||
n, err := runOne(truncate, cutoff, "ops_system_metrics", "created_at", false)
|
|
||||||
if err != nil {
|
|
||||||
return out, err
|
|
||||||
}
|
|
||||||
out.systemMetrics = n
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pre-aggregation tables (hourly/daily).
|
|
||||||
if cutoff, truncate, ok := opsCleanupPlan(now, effective.HourlyMetricsRetentionDays); ok {
|
|
||||||
n, err := runOne(truncate, cutoff, "ops_metrics_hourly", "bucket_start", false)
|
|
||||||
if err != nil {
|
|
||||||
return out, err
|
|
||||||
}
|
|
||||||
out.hourlyPreagg = n
|
|
||||||
|
|
||||||
n, err = runOne(truncate, cutoff, "ops_metrics_daily", "bucket_date", true)
|
|
||||||
if err != nil {
|
|
||||||
return out, err
|
|
||||||
}
|
|
||||||
out.dailyPreagg = n
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Channel monitor 每日维护(聚合昨日明细 + 软删过期明细/聚合)。
|
// Channel monitor 每日维护(聚合昨日明细 + 软删过期明细/聚合)。
|
||||||
@ -422,100 +332,6 @@ func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDelet
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteOldRowsByID(
|
|
||||||
ctx context.Context,
|
|
||||||
db *sql.DB,
|
|
||||||
table string,
|
|
||||||
timeColumn string,
|
|
||||||
cutoff time.Time,
|
|
||||||
batchSize int,
|
|
||||||
castCutoffToDate bool,
|
|
||||||
) (int64, error) {
|
|
||||||
if db == nil {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
if batchSize <= 0 {
|
|
||||||
batchSize = 5000
|
|
||||||
}
|
|
||||||
|
|
||||||
where := fmt.Sprintf("%s < $1", timeColumn)
|
|
||||||
if castCutoffToDate {
|
|
||||||
where = fmt.Sprintf("%s < $1::date", timeColumn)
|
|
||||||
}
|
|
||||||
|
|
||||||
q := fmt.Sprintf(`
|
|
||||||
WITH batch AS (
|
|
||||||
SELECT id FROM %s
|
|
||||||
WHERE %s
|
|
||||||
ORDER BY id
|
|
||||||
LIMIT $2
|
|
||||||
)
|
|
||||||
DELETE FROM %s
|
|
||||||
WHERE id IN (SELECT id FROM batch)
|
|
||||||
`, table, where, table)
|
|
||||||
|
|
||||||
var total int64
|
|
||||||
for {
|
|
||||||
res, err := db.ExecContext(ctx, q, cutoff, batchSize)
|
|
||||||
if err != nil {
|
|
||||||
// If ops tables aren't present yet (partial deployments), treat as no-op.
|
|
||||||
if isMissingRelationError(err) {
|
|
||||||
return total, nil
|
|
||||||
}
|
|
||||||
return total, err
|
|
||||||
}
|
|
||||||
affected, err := res.RowsAffected()
|
|
||||||
if err != nil {
|
|
||||||
return total, err
|
|
||||||
}
|
|
||||||
total += affected
|
|
||||||
if affected == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return total, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// truncateOpsTable 用 TRUNCATE TABLE 清空指定表,先 SELECT COUNT(*) 取得清空前行数用于 heartbeat。
|
|
||||||
//
|
|
||||||
// 与 deleteOldRowsByID 的差异:
|
|
||||||
// - 不可指定 WHERE 条件,仅用于 days==0 的"清空全部"语义
|
|
||||||
// - O(1) 释放表的物理存储页,毫秒级完成,无 WAL 写入、无 VACUUM 压力
|
|
||||||
// - 需要 ACCESS EXCLUSIVE 锁,但 ops 表只有清理任务自己写入,瞬间锁影响可忽略
|
|
||||||
//
|
|
||||||
// 表不存在(部分部署)静默返回 0,与 deleteOldRowsByID 保持一致。
|
|
||||||
func truncateOpsTable(ctx context.Context, db *sql.DB, table string) (int64, error) {
|
|
||||||
if db == nil {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
var count int64
|
|
||||||
if err := db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)).Scan(&count); err != nil {
|
|
||||||
if isMissingRelationError(err) {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
return 0, fmt.Errorf("count %s: %w", table, err)
|
|
||||||
}
|
|
||||||
if count == 0 {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
if _, err := db.ExecContext(ctx, fmt.Sprintf("TRUNCATE TABLE %s", table)); err != nil {
|
|
||||||
if isMissingRelationError(err) {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
return 0, fmt.Errorf("truncate %s: %w", table, err)
|
|
||||||
}
|
|
||||||
return count, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// isMissingRelationError 判断 PG 报错是否为"表不存在",用于让清理任务在部分部署场景静默跳过。
|
|
||||||
func isMissingRelationError(err error) bool {
|
|
||||||
if err == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
s := strings.ToLower(err.Error())
|
|
||||||
return strings.Contains(s, "does not exist") && strings.Contains(s, "relation")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *OpsCleanupService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) {
|
func (s *OpsCleanupService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) {
|
||||||
if s == nil {
|
if s == nil {
|
||||||
return nil, false
|
return nil, false
|
||||||
@ -564,7 +380,7 @@ func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration tim
|
|||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
durMs := duration.Milliseconds()
|
durMs := duration.Milliseconds()
|
||||||
result := truncateString(counts.String(), 2048)
|
result := truncateString(counts.String(), 2048)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), opsCleanupHeartbeatTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
|
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
|
||||||
JobName: opsCleanupJobName,
|
JobName: opsCleanupJobName,
|
||||||
@ -582,7 +398,7 @@ func (s *OpsCleanupService) recordHeartbeatError(runAt time.Time, duration time.
|
|||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
durMs := duration.Milliseconds()
|
durMs := duration.Milliseconds()
|
||||||
msg := truncateString(err.Error(), 2048)
|
msg := truncateString(err.Error(), 2048)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), opsCleanupHeartbeatTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
|
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
|
||||||
JobName: opsCleanupJobName,
|
JobName: opsCleanupJobName,
|
||||||
|
|||||||
@ -361,7 +361,7 @@ func defaultOpsAdvancedSettings() *OpsAdvancedSettings {
|
|||||||
return &OpsAdvancedSettings{
|
return &OpsAdvancedSettings{
|
||||||
DataRetention: OpsDataRetentionSettings{
|
DataRetention: OpsDataRetentionSettings{
|
||||||
CleanupEnabled: false,
|
CleanupEnabled: false,
|
||||||
CleanupSchedule: "0 2 * * *",
|
CleanupSchedule: opsCleanupDefaultSchedule,
|
||||||
ErrorLogRetentionDays: 30,
|
ErrorLogRetentionDays: 30,
|
||||||
MinuteMetricsRetentionDays: 30,
|
MinuteMetricsRetentionDays: 30,
|
||||||
HourlyMetricsRetentionDays: 30,
|
HourlyMetricsRetentionDays: 30,
|
||||||
@ -386,7 +386,7 @@ func normalizeOpsAdvancedSettings(cfg *OpsAdvancedSettings) {
|
|||||||
}
|
}
|
||||||
cfg.DataRetention.CleanupSchedule = strings.TrimSpace(cfg.DataRetention.CleanupSchedule)
|
cfg.DataRetention.CleanupSchedule = strings.TrimSpace(cfg.DataRetention.CleanupSchedule)
|
||||||
if cfg.DataRetention.CleanupSchedule == "" {
|
if cfg.DataRetention.CleanupSchedule == "" {
|
||||||
cfg.DataRetention.CleanupSchedule = "0 2 * * *"
|
cfg.DataRetention.CleanupSchedule = opsCleanupDefaultSchedule
|
||||||
}
|
}
|
||||||
// 保留天数:0 表示每次定时清理全部(清空所有),> 0 表示按天数保留;
|
// 保留天数:0 表示每次定时清理全部(清空所有),> 0 表示按天数保留;
|
||||||
// 仅在拿到非法的负数时回填默认值,避免覆盖用户主动设的 0。
|
// 仅在拿到非法的负数时回填默认值,避免覆盖用户主动设的 0。
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user