diff --git a/backend/internal/handler/openai_gateway_handler_test.go b/backend/internal/handler/openai_gateway_handler_test.go index d7d21fac..7de30e9c 100644 --- a/backend/internal/handler/openai_gateway_handler_test.go +++ b/backend/internal/handler/openai_gateway_handler_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httptest" "strings" + "sync" "testing" "time" @@ -740,16 +741,31 @@ func (r *contentModerationHandlerSettingRepo) Delete(ctx context.Context, key st } type contentModerationHandlerTestRepo struct { + mu sync.Mutex logs []service.ContentModerationLog } func (r *contentModerationHandlerTestRepo) CreateLog(ctx context.Context, log *service.ContentModerationLog) error { if log != nil { + r.mu.Lock() + defer r.mu.Unlock() r.logs = append(r.logs, *log) } return nil } +func (r *contentModerationHandlerTestRepo) resetLogs() { + r.mu.Lock() + defer r.mu.Unlock() + r.logs = nil +} + +func (r *contentModerationHandlerTestRepo) logSnapshot() []service.ContentModerationLog { + r.mu.Lock() + defer r.mu.Unlock() + return append([]service.ContentModerationLog(nil), r.logs...) +} + func (r *contentModerationHandlerTestRepo) ListLogs(ctx context.Context, filter service.ContentModerationLogFilter) ([]service.ContentModerationLog, *pagination.PaginationResult, error) { return nil, nil, nil } @@ -808,7 +824,10 @@ func TestOpenAIResponsesWebSocket_ContentModerationBlocksFirstFrame(t *testing.T }) require.NoError(t, err) require.True(t, decision.Blocked) - repo.logs = nil + require.Eventually(t, func() bool { + return len(repo.logSnapshot()) == 1 + }, time.Second, 10*time.Millisecond) + repo.resetLogs() h := &OpenAIGatewayHandler{ gatewayService: &service.OpenAIGatewayService{}, billingCacheService: &service.BillingCacheService{}, @@ -848,10 +867,11 @@ func TestOpenAIResponsesWebSocket_ContentModerationBlocksFirstFrame(t *testing.T require.Equal(t, coderws.StatusPolicyViolation, closeErr.Code) require.Contains(t, closeErr.Reason, "内容审计测试阻断") } - require.Len(t, repo.logs, 1) - require.True(t, repo.logs[0].Flagged) - require.Equal(t, service.ContentModerationActionBlock, repo.logs[0].Action) - require.Equal(t, "bad prompt", repo.logs[0].InputExcerpt) + logs := repo.logSnapshot() + require.Len(t, logs, 1) + require.True(t, logs[0].Flagged) + require.Equal(t, service.ContentModerationActionBlock, logs[0].Action) + require.Equal(t, "bad prompt", logs[0].InputExcerpt) } func TestOpenAIResponsesWebSocket_PassthroughUsageLogPersistsUserAgentAndReasoningEffort(t *testing.T) { diff --git a/backend/internal/repository/content_moderation_repo.go b/backend/internal/repository/content_moderation_repo.go index 6ada004a..9b19cce9 100644 --- a/backend/internal/repository/content_moderation_repo.go +++ b/backend/internal/repository/content_moderation_repo.go @@ -192,6 +192,7 @@ SELECT COUNT(*) FROM content_moderation_logs WHERE user_id = $1 AND flagged = TRUE + AND action <> 'hash_block' AND created_at >= $2 AND created_at > COALESCE((SELECT at FROM last_auto_ban), '-infinity'::timestamptz) `, userID, since).Scan(&count) @@ -246,7 +247,7 @@ func buildContentModerationLogWhere(filter service.ContentModerationLogFilter) ( case "hit", "flagged": where = append(where, "l.flagged = TRUE") case "blocked", "block": - where = append(where, "l.action = 'block'") + where = append(where, "l.action IN ('block', 'keyword_block', 'hash_block')") case "pass", "allow": where = append(where, "l.flagged = FALSE AND l.error = ''") case "error": diff --git a/backend/internal/repository/content_moderation_repo_test.go b/backend/internal/repository/content_moderation_repo_test.go new file mode 100644 index 00000000..6d5faa12 --- /dev/null +++ b/backend/internal/repository/content_moderation_repo_test.go @@ -0,0 +1,40 @@ +package repository + +import ( + "context" + "regexp" + "strings" + "testing" + "time" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/Wei-Shaw/sub2api/internal/service" + "github.com/stretchr/testify/require" +) + +func TestBuildContentModerationLogWhere_BlockedIncludesAllBlockActions(t *testing.T) { + where, args := buildContentModerationLogWhere(service.ContentModerationLogFilter{Result: "blocked"}) + + require.Empty(t, args) + sql := strings.Join(where, " AND ") + require.Contains(t, sql, "l.action IN ('block', 'keyword_block', 'hash_block')") + require.NotContains(t, sql, "l.action = 'block'") +} + +func TestContentModerationRepositoryCountFlaggedByUserSince_ExcludesHashBlock(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer func() { _ = db.Close() }() + + repo := NewContentModerationRepository(db) + since := time.Now().Add(-time.Hour) + mock.ExpectQuery(regexp.QuoteMeta("AND action <> 'hash_block'")). + WithArgs(int64(1001), since). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(2)) + + count, err := repo.CountFlaggedByUserSince(context.Background(), 1001, since) + + require.NoError(t, err) + require.Equal(t, 2, count) + require.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/backend/internal/service/content_moderation.go b/backend/internal/service/content_moderation.go index a5a84d7b..ee1fca41 100644 --- a/backend/internal/service/content_moderation.go +++ b/backend/internal/service/content_moderation.go @@ -211,6 +211,20 @@ type ContentModerationAPIKeyStatus struct { Configured bool `json:"configured"` } +type ContentModerationAPIKeyLoad struct { + Index int `json:"index"` + KeyHash string `json:"key_hash"` + Masked string `json:"masked"` + Status string `json:"status"` + Active int64 `json:"active"` + Total int64 `json:"total"` + Success int64 `json:"success"` + Errors int64 `json:"errors"` + AvgLatencyMS int64 `json:"avg_latency_ms"` + LastLatencyMS int `json:"last_latency_ms"` + LastHTTPStatus int `json:"last_http_status"` +} + type TestContentModerationAPIKeysInput struct { APIKeys []string `json:"api_keys"` BaseURL string `json:"base_url"` @@ -399,25 +413,35 @@ type ContentModerationCleanupResult struct { } type ContentModerationRuntimeStatus struct { - Enabled bool `json:"enabled"` - RiskControlEnabled bool `json:"risk_control_enabled"` - Mode string `json:"mode"` - WorkerCount int `json:"worker_count"` - MaxWorkers int `json:"max_workers"` - ActiveWorkers int `json:"active_workers"` - IdleWorkers int `json:"idle_workers"` - QueueSize int `json:"queue_size"` - QueueLength int `json:"queue_length"` - QueueUsagePercent float64 `json:"queue_usage_percent"` - Enqueued int64 `json:"enqueued"` - Dropped int64 `json:"dropped"` - Processed int64 `json:"processed"` - Errors int64 `json:"errors"` - APIKeyStatuses []ContentModerationAPIKeyStatus `json:"api_key_statuses"` - FlaggedHashCount int64 `json:"flagged_hash_count"` - LastCleanupAt *time.Time `json:"last_cleanup_at,omitempty"` - LastCleanupDeletedHit int64 `json:"last_cleanup_deleted_hit"` - LastCleanupDeletedNonHit int64 `json:"last_cleanup_deleted_non_hit"` + Enabled bool `json:"enabled"` + RiskControlEnabled bool `json:"risk_control_enabled"` + Mode string `json:"mode"` + WorkerCount int `json:"worker_count"` + MaxWorkers int `json:"max_workers"` + ActiveWorkers int `json:"active_workers"` + IdleWorkers int `json:"idle_workers"` + QueueSize int `json:"queue_size"` + QueueLength int `json:"queue_length"` + QueueUsagePercent float64 `json:"queue_usage_percent"` + Enqueued int64 `json:"enqueued"` + Dropped int64 `json:"dropped"` + Processed int64 `json:"processed"` + Errors int64 `json:"errors"` + PreBlockActive int `json:"pre_block_active"` + PreBlockChecked int64 `json:"pre_block_checked"` + PreBlockAllowed int64 `json:"pre_block_allowed"` + PreBlockBlocked int64 `json:"pre_block_blocked"` + PreBlockErrors int64 `json:"pre_block_errors"` + PreBlockAvgLatencyMS int64 `json:"pre_block_avg_latency_ms"` + PreBlockAPIKeyActive int64 `json:"pre_block_api_key_active"` + PreBlockAPIKeyAvailableCount int64 `json:"pre_block_api_key_available_count"` + PreBlockAPIKeyTotalCalls int64 `json:"pre_block_api_key_total_calls"` + PreBlockAPIKeyLoads []ContentModerationAPIKeyLoad `json:"pre_block_api_key_loads"` + APIKeyStatuses []ContentModerationAPIKeyStatus `json:"api_key_statuses"` + FlaggedHashCount int64 `json:"flagged_hash_count"` + LastCleanupAt *time.Time `json:"last_cleanup_at,omitempty"` + LastCleanupDeletedHit int64 `json:"last_cleanup_deleted_hit"` + LastCleanupDeletedNonHit int64 `json:"last_cleanup_deleted_non_hit"` } type ContentModerationUnbanUserResult struct { @@ -466,6 +490,12 @@ type ContentModerationService struct { asyncDropped atomic.Int64 asyncProcessed atomic.Int64 asyncErrors atomic.Int64 + preBlockActive atomic.Int64 + preBlockChecked atomic.Int64 + preBlockAllowed atomic.Int64 + preBlockBlocked atomic.Int64 + preBlockErrors atomic.Int64 + preBlockLatencyTotalMS atomic.Int64 lastCleanupUnix atomic.Int64 lastCleanupDeletedHit atomic.Int64 lastCleanupDeletedNonHit atomic.Int64 @@ -474,10 +504,14 @@ type ContentModerationService struct { } type contentModerationTask struct { - input ContentModerationCheckInput - content ContentModerationInput - inputHash string - enqueuedAt time.Time + input ContentModerationCheckInput + content ContentModerationInput + inputHash string + log *ContentModerationLog + config *ContentModerationConfig + recordHash bool + applySideEffects bool + enqueuedAt time.Time } type contentModerationKeyHealth struct { @@ -491,6 +525,11 @@ type contentModerationKeyHealth struct { LastLatencyMS int LastHTTPStatus int LastTested bool + SyncActive int64 + SyncTotal int64 + SyncSuccess int64 + SyncErrors int64 + SyncLatencyMS int64 } func NewContentModerationService( @@ -827,9 +866,11 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer "protocol", input.Protocol, "text_runes", len([]rune(content.Text)), "image_count", len(content.Images)) + hashText := content.Hash() if cfg.Mode == ContentModerationModePreBlock { if cfg.KeywordBlockingMode != ContentModerationKeywordModeAPIOnly && len(cfg.BlockedKeywords) > 0 { if keyword, hit := matchBlockedKeyword(content.Text, cfg.BlockedKeywords); hit { + s.recordPreBlockSyncMetric(0, ContentModerationActionKeywordBlock) slog.Info("content_moderation.keyword_block", "user_id", input.UserID, "api_key_id", input.APIKeyID, @@ -840,8 +881,7 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer "keyword", keyword) scores := map[string]float64{contentModerationKeywordCategory: 1.0} log := s.buildLog(input, cfg, ContentModerationActionKeywordBlock, true, contentModerationKeywordCategory, 1.0, scores, content.ExcerptText(), nil, nil, "") - s.applyFlaggedSideEffects(ctx, cfg, log) - _ = s.repo.CreateLog(ctx, log) + s.enqueueRecord(input, cfg, log, hashText, false, true) return &ContentModerationDecision{ Allowed: false, Blocked: true, @@ -856,6 +896,7 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer } } if cfg.KeywordBlockingMode == ContentModerationKeywordModeKeywordOnly { + s.recordPreBlockSyncMetric(0, ContentModerationActionAllow) slog.Info("content_moderation.skip_api_keyword_only", "user_id", input.UserID, "api_key_id", input.APIKeyID, @@ -865,13 +906,15 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer return allow, nil } } - hashText := content.Hash() if cfg.PreHashCheckEnabled && s.hashCache != nil { matched, err := s.hashCache.HasFlaggedInputHash(ctx, hashText) if err != nil { slog.Warn("content_moderation.hash_check_failed", "user_id", input.UserID, "endpoint", input.Endpoint, "error", err) } if matched { + if cfg.Mode == ContentModerationModePreBlock { + s.recordPreBlockSyncMetric(0, ContentModerationActionHashBlock) + } slog.Info("content_moderation.hash_block", "user_id", input.UserID, "api_key_id", input.APIKeyID, @@ -883,6 +926,9 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer if message != "" { message = fmt.Sprintf("%s(hash: %s)", message, hashText) } + scores := map[string]float64{"hash": 1.0} + log := s.buildLog(input, cfg, ContentModerationActionHashBlock, true, "hash", 1.0, scores, content.ExcerptText(), nil, nil, "") + s.enqueueRecord(input, cfg, log, hashText, false, false) return &ContentModerationDecision{ Allowed: false, Blocked: true, @@ -895,6 +941,9 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer } } if !cfg.shouldSample(hashText) { + if cfg.Mode == ContentModerationModePreBlock { + s.recordPreBlockSyncMetric(0, ContentModerationActionAllow) + } slog.Info("content_moderation.skip_sample_rate", "user_id", input.UserID, "api_key_id", input.APIKeyID, @@ -905,6 +954,9 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer return allow, nil } if len(cfg.apiKeys()) == 0 { + if cfg.Mode == ContentModerationModePreBlock { + s.recordPreBlockSyncMetric(0, ContentModerationActionError) + } slog.Warn("content_moderation.skip_no_audit_api_keys", "user_id", input.UserID, "api_key_id", input.APIKeyID, @@ -930,10 +982,18 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer func (s *ContentModerationService) checkSync(ctx context.Context, input ContentModerationCheckInput, cfg *ContentModerationConfig, content ContentModerationInput, hashText string, queueDelay *int, allowBlock bool) *ContentModerationDecision { allow := &ContentModerationDecision{Allowed: true, Action: ContentModerationActionAllow} + trackPreBlock := queueDelay == nil && allowBlock && cfg != nil && cfg.Mode == ContentModerationModePreBlock + if trackPreBlock { + s.preBlockActive.Add(1) + defer s.preBlockActive.Add(-1) + } start := time.Now() - result, err := s.callModeration(ctx, cfg, content.ModerationInput()) + result, err := s.callModeration(ctx, cfg, content.ModerationInput(), trackPreBlock) latency := int(time.Since(start).Milliseconds()) if err != nil { + if trackPreBlock { + s.recordPreBlockSyncMetric(latency, ContentModerationActionError) + } slog.Warn("content_moderation.audit_api_failed", "user_id", input.UserID, "api_key_id", input.APIKeyID, @@ -962,6 +1022,9 @@ func (s *ContentModerationService) checkSync(ctx context.Context, input ContentM action = ContentModerationActionBlock blocked = true } + if trackPreBlock { + s.recordPreBlockSyncMetric(latency, action) + } slog.Info("content_moderation.audit_result", "user_id", input.UserID, "api_key_id", input.APIKeyID, @@ -980,13 +1043,11 @@ func (s *ContentModerationService) checkSync(ctx context.Context, input ContentM "queue_delay_ms", queueDelay) if flagged || cfg.RecordNonHits { log := s.buildLog(input, cfg, action, flagged, highestCategory, highestScore, result.CategoryScores, content.ExcerptText(), &latency, queueDelay, "") - if flagged && s.hashCache != nil { - if err := s.hashCache.RecordFlaggedInputHash(ctx, hashText); err != nil { - slog.Warn("content_moderation.record_hash_failed", "user_id", input.UserID, "endpoint", input.Endpoint, "error", err) - } + if queueDelay == nil && cfg.Mode == ContentModerationModePreBlock { + s.enqueueRecord(input, cfg, log, hashText, flagged, flagged) + } else { + s.persistContentModerationLog(ctx, cfg, log, hashText, flagged, flagged) } - s.applyFlaggedSideEffects(ctx, cfg, log) - _ = s.repo.CreateLog(ctx, log) } if blocked { return &ContentModerationDecision{ @@ -1012,6 +1073,25 @@ func (s *ContentModerationService) checkSync(ctx context.Context, input ContentM } } +func (s *ContentModerationService) recordPreBlockSyncMetric(latencyMS int, action string) { + if s == nil { + return + } + s.preBlockChecked.Add(1) + if latencyMS < 0 { + latencyMS = 0 + } + s.preBlockLatencyTotalMS.Add(int64(latencyMS)) + switch action { + case ContentModerationActionBlock, ContentModerationActionHashBlock, ContentModerationActionKeywordBlock: + s.preBlockBlocked.Add(1) + case ContentModerationActionError: + s.preBlockErrors.Add(1) + default: + s.preBlockAllowed.Add(1) + } +} + func (s *ContentModerationService) enqueueAsync(input ContentModerationCheckInput, cfg *ContentModerationConfig, content ContentModerationInput, hashText string) { if s == nil || s.asyncQueue == nil { return @@ -1040,11 +1120,49 @@ func (s *ContentModerationService) enqueueAsync(input ContentModerationCheckInpu } } +func (s *ContentModerationService) enqueueRecord(input ContentModerationCheckInput, cfg *ContentModerationConfig, log *ContentModerationLog, inputHash string, recordHash bool, applySideEffects bool) { + if s == nil || s.asyncQueue == nil || log == nil { + return + } + queueSize := defaultContentModerationQueueSize + if cfg != nil && cfg.QueueSize > 0 { + queueSize = cfg.QueueSize + } + if len(s.asyncQueue) >= queueSize { + slog.Warn("content_moderation.record_queue_full", + "user_id", input.UserID, + "endpoint", input.Endpoint, + "action", log.Action, + "queue_size", queueSize) + s.asyncDropped.Add(1) + return + } + task := contentModerationTask{ + input: input, + inputHash: inputHash, + log: log, + config: cloneContentModerationConfig(cfg), + recordHash: recordHash, + applySideEffects: applySideEffects, + enqueuedAt: time.Now(), + } + select { + case s.asyncQueue <- task: + s.asyncEnqueued.Add(1) + default: + slog.Warn("content_moderation.record_queue_full", + "user_id", input.UserID, + "endpoint", input.Endpoint, + "action", log.Action) + s.asyncDropped.Add(1) + } +} + func (s *ContentModerationService) worker(id int) { for { ctx, cancel := context.WithTimeout(context.Background(), maxContentModerationTimeoutMS*time.Millisecond+10*time.Second) cfg, err := s.loadConfig(ctx) - if err != nil || !cfg.Enabled || cfg.Mode == ContentModerationModeOff || len(cfg.apiKeys()) == 0 || id >= cfg.WorkerCount { + if err != nil || id >= cfg.WorkerCount { cancel() time.Sleep(time.Second) continue @@ -1061,6 +1179,22 @@ func (s *ContentModerationService) worker(id int) { slog.Error("content_moderation.worker_panic", "worker_id", id, "recover", r) } }() + if task.log != nil { + s.asyncActive.Add(1) + defer s.asyncActive.Add(-1) + queueDelay := int(time.Since(task.enqueuedAt).Milliseconds()) + task.log.QueueDelayMS = &queueDelay + taskCfg := task.config + if taskCfg == nil { + taskCfg = cfg + } + s.persistContentModerationLog(ctx, taskCfg, task.log, task.inputHash, task.recordHash, task.applySideEffects) + s.asyncProcessed.Add(1) + return + } + if !cfg.Enabled || cfg.Mode == ContentModerationModeOff || len(cfg.apiKeys()) == 0 { + return + } if !cfg.includesGroup(task.input.GroupID) { return } @@ -1186,6 +1320,15 @@ func (s *ContentModerationService) GetStatus(ctx context.Context) (*ContentModer if active > cfg.WorkerCount { active = cfg.WorkerCount } + preBlockActive := int(s.preBlockActive.Load()) + if preBlockActive < 0 { + preBlockActive = 0 + } + preBlockChecked := s.preBlockChecked.Load() + preBlockAvgLatency := int64(0) + if preBlockChecked > 0 { + preBlockAvgLatency = s.preBlockLatencyTotalMS.Load() / preBlockChecked + } queueLength := 0 if s.asyncQueue != nil { queueLength = len(s.asyncQueue) @@ -1208,25 +1351,35 @@ func (s *ContentModerationService) GetStatus(ctx context.Context) (*ContentModer lastCleanupAt = &t } return &ContentModerationRuntimeStatus{ - Enabled: cfg.Enabled, - RiskControlEnabled: riskEnabled, - Mode: cfg.Mode, - WorkerCount: cfg.WorkerCount, - MaxWorkers: maxContentModerationWorkerCount, - ActiveWorkers: active, - IdleWorkers: cfg.WorkerCount - active, - QueueSize: cfg.QueueSize, - QueueLength: queueLength, - QueueUsagePercent: queueUsage, - Enqueued: s.asyncEnqueued.Load(), - Dropped: s.asyncDropped.Load(), - Processed: s.asyncProcessed.Load(), - Errors: s.asyncErrors.Load(), - APIKeyStatuses: s.apiKeyStatuses(cfg.apiKeys()), - FlaggedHashCount: flaggedHashCount, - LastCleanupAt: lastCleanupAt, - LastCleanupDeletedHit: s.lastCleanupDeletedHit.Load(), - LastCleanupDeletedNonHit: s.lastCleanupDeletedNonHit.Load(), + Enabled: cfg.Enabled, + RiskControlEnabled: riskEnabled, + Mode: cfg.Mode, + WorkerCount: cfg.WorkerCount, + MaxWorkers: maxContentModerationWorkerCount, + ActiveWorkers: active, + IdleWorkers: cfg.WorkerCount - active, + QueueSize: cfg.QueueSize, + QueueLength: queueLength, + QueueUsagePercent: queueUsage, + Enqueued: s.asyncEnqueued.Load(), + Dropped: s.asyncDropped.Load(), + Processed: s.asyncProcessed.Load(), + Errors: s.asyncErrors.Load(), + PreBlockActive: preBlockActive, + PreBlockChecked: preBlockChecked, + PreBlockAllowed: s.preBlockAllowed.Load(), + PreBlockBlocked: s.preBlockBlocked.Load(), + PreBlockErrors: s.preBlockErrors.Load(), + PreBlockAvgLatencyMS: preBlockAvgLatency, + PreBlockAPIKeyActive: s.preBlockAPIKeyActive(cfg.apiKeys()), + PreBlockAPIKeyAvailableCount: s.preBlockAPIKeyAvailableCount(cfg.apiKeys()), + PreBlockAPIKeyTotalCalls: s.preBlockAPIKeyTotalCalls(cfg.apiKeys()), + PreBlockAPIKeyLoads: s.preBlockAPIKeyLoads(cfg.apiKeys()), + APIKeyStatuses: s.apiKeyStatuses(cfg.apiKeys()), + FlaggedHashCount: flaggedHashCount, + LastCleanupAt: lastCleanupAt, + LastCleanupDeletedHit: s.lastCleanupDeletedHit.Load(), + LastCleanupDeletedNonHit: s.lastCleanupDeletedNonHit.Load(), }, nil } @@ -1325,7 +1478,7 @@ func (s *ContentModerationService) validateConfig(ctx context.Context, cfg *Cont return nil } -func (s *ContentModerationService) callModeration(ctx context.Context, cfg *ContentModerationConfig, input any) (*moderationAPIResult, error) { +func (s *ContentModerationService) callModeration(ctx context.Context, cfg *ContentModerationConfig, input any, trackKeyLoad ...bool) (*moderationAPIResult, error) { attempts := cfg.RetryCount + 1 if attempts <= 0 { attempts = 1 @@ -1333,6 +1486,7 @@ func (s *ContentModerationService) callModeration(ctx context.Context, cfg *Cont if attempts > maxContentModerationRetryCount+1 { attempts = maxContentModerationRetryCount + 1 } + trackLoad := len(trackKeyLoad) > 0 && trackKeyLoad[0] var lastErr error for attempt := 0; attempt < attempts; attempt++ { key, ok := s.nextUsableAPIKey(cfg) @@ -1340,14 +1494,23 @@ func (s *ContentModerationService) callModeration(ctx context.Context, cfg *Cont lastErr = errors.New("no moderation api key available") break } + if trackLoad { + s.beginModerationAPIKeyCall(key) + } start := time.Now() httpStatus := 0 result, err := s.callModerationOnceWithInput(ctx, cfg, key, input, &httpStatus) latency := int(time.Since(start).Milliseconds()) if err == nil { + if trackLoad { + s.finishModerationAPIKeyCall(key, latency, true) + } s.markAPIKeySuccess(key, latency, httpStatus) return result, nil } + if trackLoad { + s.finishModerationAPIKeyCall(key, latency, false) + } s.markAPIKeyError(key, err.Error(), latency, httpStatus) lastErr = err if httpStatus == http.StatusBadRequest { @@ -1452,10 +1615,32 @@ func (s *ContentModerationService) buildLog(input ContentModerationCheckInput, c } } -func (s *ContentModerationService) applyFlaggedSideEffects(ctx context.Context, cfg *ContentModerationConfig, log *ContentModerationLog) { - if s == nil || cfg == nil || log == nil || !log.Flagged || log.UserID == nil || *log.UserID <= 0 { +func (s *ContentModerationService) persistContentModerationLog(ctx context.Context, cfg *ContentModerationConfig, log *ContentModerationLog, hashText string, recordHash bool, applySideEffects bool) { + if s == nil || log == nil { return } + if recordHash && s.hashCache != nil { + if err := s.hashCache.RecordFlaggedInputHash(ctx, hashText); err != nil { + slog.Warn("content_moderation.record_hash_failed", "user_id", contentModerationEmailUserID(log), "endpoint", log.Endpoint, "error", err) + } + } + autoBanJustApplied := false + if applySideEffects { + autoBanJustApplied = s.applyFlaggedAccountSideEffects(ctx, cfg, log) + s.sendFlaggedNotificationSideEffects(ctx, cfg, log, autoBanJustApplied) + } + if s.repo != nil { + if err := s.repo.CreateLog(ctx, log); err != nil { + slog.Warn("content_moderation.create_log_failed", "user_id", contentModerationEmailUserID(log), "endpoint", log.Endpoint, "action", log.Action, "error", err) + return + } + } +} + +func (s *ContentModerationService) applyFlaggedAccountSideEffects(ctx context.Context, cfg *ContentModerationConfig, log *ContentModerationLog) bool { + if s == nil || cfg == nil || log == nil || !log.Flagged || log.UserID == nil || *log.UserID <= 0 { + return false + } count := 1 if s.repo != nil && cfg.ViolationWindowHours > 0 { since := time.Now().Add(-time.Duration(cfg.ViolationWindowHours) * time.Hour) @@ -1469,13 +1654,13 @@ func (s *ContentModerationService) applyFlaggedSideEffects(ctx context.Context, user, err := s.userRepo.GetByID(ctx, *log.UserID) if err != nil { slog.Warn("content_moderation.ban_get_user_failed", "user_id", *log.UserID, "error", err) - return + return false } if user.Status != StatusDisabled { user.Status = StatusDisabled if err := s.userRepo.Update(ctx, user); err != nil { slog.Warn("content_moderation.ban_update_user_failed", "user_id", *log.UserID, "error", err) - return + return false } if s.authCacheInvalidator != nil { s.authCacheInvalidator.InvalidateAuthCacheByUserID(ctx, *log.UserID) @@ -1484,7 +1669,13 @@ func (s *ContentModerationService) applyFlaggedSideEffects(ctx context.Context, } log.AutoBanned = true } + return autoBanJustApplied +} +func (s *ContentModerationService) sendFlaggedNotificationSideEffects(ctx context.Context, cfg *ContentModerationConfig, log *ContentModerationLog, autoBanJustApplied bool) { + if s == nil || cfg == nil || log == nil || !log.Flagged { + return + } if s.emailService == nil || strings.TrimSpace(log.UserEmail) == "" { return } @@ -1642,6 +1833,22 @@ func defaultContentModerationConfig() *ContentModerationConfig { } } +func cloneContentModerationConfig(cfg *ContentModerationConfig) *ContentModerationConfig { + if cfg == nil { + return nil + } + clone := *cfg + clone.APIKeys = append([]string(nil), cfg.APIKeys...) + clone.GroupIDs = append([]int64(nil), cfg.GroupIDs...) + clone.BlockedKeywords = append([]string(nil), cfg.BlockedKeywords...) + clone.Thresholds = cloneFloatMap(cfg.Thresholds) + clone.ModelFilter = ContentModerationModelFilter{ + Type: cfg.ModelFilter.Type, + Models: append([]string(nil), cfg.ModelFilter.Models...), + } + return &clone +} + func (cfg *ContentModerationConfig) normalize() { if cfg.APIKey != "" { cfg.APIKeys = normalizeModerationAPIKeys(append(cfg.APIKeys, cfg.APIKey)) @@ -1807,6 +2014,40 @@ func (s *ContentModerationService) isAPIKeyFrozen(key string, now time.Time) boo return state != nil && state.FrozenUntil.After(now) } +func (s *ContentModerationService) beginModerationAPIKeyCall(key string) { + hash := moderationAPIKeyHash(key) + if hash == "" || s == nil { + return + } + s.keyHealthMu.Lock() + defer s.keyHealthMu.Unlock() + state := s.ensureAPIKeyHealthLocked(hash, maskSecretTail(key)) + state.SyncActive++ +} + +func (s *ContentModerationService) finishModerationAPIKeyCall(key string, latencyMS int, success bool) { + hash := moderationAPIKeyHash(key) + if hash == "" || s == nil { + return + } + if latencyMS < 0 { + latencyMS = 0 + } + s.keyHealthMu.Lock() + defer s.keyHealthMu.Unlock() + state := s.ensureAPIKeyHealthLocked(hash, maskSecretTail(key)) + if state.SyncActive > 0 { + state.SyncActive-- + } + state.SyncTotal++ + state.SyncLatencyMS += int64(latencyMS) + if success { + state.SyncSuccess++ + return + } + state.SyncErrors++ +} + func (s *ContentModerationService) markAPIKeySuccess(key string, latencyMS int, httpStatus int) { hash := moderationAPIKeyHash(key) if hash == "" || s == nil { @@ -1926,6 +2167,71 @@ func (s *ContentModerationService) apiKeyStatuses(keys []string) []ContentModera return out } +func (s *ContentModerationService) preBlockAPIKeyLoads(keys []string) []ContentModerationAPIKeyLoad { + out := make([]ContentModerationAPIKeyLoad, 0, len(keys)) + for idx, key := range keys { + out = append(out, s.preBlockAPIKeyLoadForHash(idx, moderationAPIKeyHash(key), maskSecretTail(key))) + } + return out +} + +func (s *ContentModerationService) preBlockAPIKeyActive(keys []string) int64 { + var total int64 + for _, item := range s.preBlockAPIKeyLoads(keys) { + total += item.Active + } + return total +} + +func (s *ContentModerationService) preBlockAPIKeyAvailableCount(keys []string) int64 { + now := time.Now() + var count int64 + for _, key := range keys { + if !s.isAPIKeyFrozen(key, now) { + count++ + } + } + return count +} + +func (s *ContentModerationService) preBlockAPIKeyTotalCalls(keys []string) int64 { + var total int64 + for _, item := range s.preBlockAPIKeyLoads(keys) { + total += item.Total + } + return total +} + +func (s *ContentModerationService) preBlockAPIKeyLoadForHash(index int, hash string, masked string) ContentModerationAPIKeyLoad { + load := ContentModerationAPIKeyLoad{ + Index: index, + KeyHash: hash, + Masked: masked, + Status: "unknown", + } + status := s.apiKeyStatusForHash(index, hash, masked, true) + load.Status = status.Status + load.LastLatencyMS = status.LastLatencyMS + load.LastHTTPStatus = status.LastHTTPStatus + if hash == "" || s == nil { + return load + } + s.keyHealthMu.Lock() + defer s.keyHealthMu.Unlock() + state := s.keyHealth[hash] + if state == nil { + return load + } + load.Active = state.SyncActive + load.Total = state.SyncTotal + load.Success = state.SyncSuccess + load.Errors = state.SyncErrors + if state.SyncTotal > 0 { + load.AvgLatencyMS = state.SyncLatencyMS / state.SyncTotal + } + return load +} + func (s *ContentModerationService) apiKeyStatusForHash(index int, hash string, masked string, configured bool) ContentModerationAPIKeyStatus { status := ContentModerationAPIKeyStatus{ Index: index, diff --git a/backend/internal/service/content_moderation_test.go b/backend/internal/service/content_moderation_test.go index 20fce3ec..1fb72f36 100644 --- a/backend/internal/service/content_moderation_test.go +++ b/backend/internal/service/content_moderation_test.go @@ -3,9 +3,11 @@ package service import ( "context" "encoding/json" + "fmt" "net/http" "net/http/httptest" "strings" + "sync" "testing" "time" @@ -73,10 +75,13 @@ func (r *contentModerationTestSettingRepo) Delete(ctx context.Context, key strin } type contentModerationTestRepo struct { + mu sync.Mutex logs []ContentModerationLog } func (r *contentModerationTestRepo) CreateLog(ctx context.Context, log *ContentModerationLog) error { + r.mu.Lock() + defer r.mu.Unlock() if log != nil { r.logs = append(r.logs, *log) } @@ -88,14 +93,55 @@ func (r *contentModerationTestRepo) ListLogs(ctx context.Context, filter Content } func (r *contentModerationTestRepo) CountFlaggedByUserSince(ctx context.Context, userID int64, since time.Time) (int, error) { - return 0, nil + r.mu.Lock() + defer r.mu.Unlock() + count := 0 + for _, log := range r.logs { + if log.UserID == nil || *log.UserID != userID || !log.Flagged || log.Action == ContentModerationActionHashBlock { + continue + } + if log.CreatedAt.IsZero() || log.CreatedAt.Before(since) { + continue + } + count++ + } + return count, nil } func (r *contentModerationTestRepo) CleanupExpiredLogs(ctx context.Context, hitBefore time.Time, nonHitBefore time.Time) (*ContentModerationCleanupResult, error) { return &ContentModerationCleanupResult{}, nil } +func (r *contentModerationTestRepo) snapshotLogs() []ContentModerationLog { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]ContentModerationLog, len(r.logs)) + copy(out, r.logs) + return out +} + +func requireContentModerationLogCount(t *testing.T, repo *contentModerationTestRepo, want int) []ContentModerationLog { + t.Helper() + var logs []ContentModerationLog + require.Eventually(t, func() bool { + logs = repo.snapshotLogs() + return len(logs) == want + }, time.Second, 10*time.Millisecond) + return logs +} + +func requireRecordedHashCount(t *testing.T, cache *contentModerationTestHashCache, want int) []string { + t.Helper() + var hashes []string + require.Eventually(t, func() bool { + hashes = cache.snapshotRecorded() + return len(hashes) == want + }, time.Second, 10*time.Millisecond) + return hashes +} + type contentModerationTestHashCache struct { + mu sync.Mutex hashes map[string]struct{} recorded []string checked []string @@ -246,6 +292,8 @@ func (i *contentModerationTestAuthCacheInvalidator) InvalidateAuthCacheByGroupID } func (c *contentModerationTestHashCache) RecordFlaggedInputHash(ctx context.Context, inputHash string) error { + c.mu.Lock() + defer c.mu.Unlock() if c.hashes == nil { c.hashes = map[string]struct{}{} } @@ -255,6 +303,8 @@ func (c *contentModerationTestHashCache) RecordFlaggedInputHash(ctx context.Cont } func (c *contentModerationTestHashCache) HasFlaggedInputHash(ctx context.Context, inputHash string) (bool, error) { + c.mu.Lock() + defer c.mu.Unlock() c.checked = append(c.checked, inputHash) if c.hasResultUsed { return c.hasResult, nil @@ -264,6 +314,8 @@ func (c *contentModerationTestHashCache) HasFlaggedInputHash(ctx context.Context } func (c *contentModerationTestHashCache) DeleteFlaggedInputHash(ctx context.Context, inputHash string) (bool, error) { + c.mu.Lock() + defer c.mu.Unlock() c.deleted = append(c.deleted, inputHash) if c.hashes == nil { return false, nil @@ -276,15 +328,50 @@ func (c *contentModerationTestHashCache) DeleteFlaggedInputHash(ctx context.Cont } func (c *contentModerationTestHashCache) ClearFlaggedInputHashes(ctx context.Context) (int64, error) { + c.mu.Lock() + defer c.mu.Unlock() deleted := int64(len(c.hashes)) c.hashes = map[string]struct{}{} return deleted, nil } func (c *contentModerationTestHashCache) CountFlaggedInputHashes(ctx context.Context) (int64, error) { + c.mu.Lock() + defer c.mu.Unlock() return int64(len(c.hashes)), nil } +func (c *contentModerationTestHashCache) snapshotRecorded() []string { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]string, len(c.recorded)) + copy(out, c.recorded) + return out +} + +func (c *contentModerationTestHashCache) snapshotChecked() []string { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]string, len(c.checked)) + copy(out, c.checked) + return out +} + +func (c *contentModerationTestHashCache) hasHash(inputHash string) bool { + c.mu.Lock() + defer c.mu.Unlock() + _, ok := c.hashes[inputHash] + return ok +} + +func (c *contentModerationTestHashCache) snapshotDeleted() []string { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]string, len(c.deleted)) + copy(out, c.deleted) + return out +} + func TestBuildContentModerationLog_RedactsInputExcerpt(t *testing.T) { svc := &ContentModerationService{} cfg := defaultContentModerationConfig() @@ -381,10 +468,10 @@ func TestContentModerationCheck_PreBlockKeywordHitSkipsUpstreamCall(t *testing.T require.True(t, decision.Blocked) require.Equal(t, ContentModerationActionKeywordBlock, decision.Action) require.False(t, upstreamCalled, "keyword block must short-circuit upstream moderation call") - require.Len(t, repo.logs, 1) - require.True(t, repo.logs[0].Flagged) - require.Equal(t, ContentModerationActionKeywordBlock, repo.logs[0].Action) - require.Equal(t, contentModerationKeywordCategory, repo.logs[0].HighestCategory) + logs := requireContentModerationLogCount(t, repo, 1) + require.True(t, logs[0].Flagged) + require.Equal(t, ContentModerationActionKeywordBlock, logs[0].Action) + require.Equal(t, contentModerationKeywordCategory, logs[0].HighestCategory) } func TestContentModerationCheck_KeywordsIgnoredInObserveMode(t *testing.T) { @@ -474,7 +561,7 @@ func TestContentModerationCheck_KeywordOnlyStrategySkipsAPIOnMiss(t *testing.T) require.NoError(t, err) require.True(t, decision.Allowed, "keyword-only must allow misses without calling the API") require.False(t, upstreamCalled, "keyword-only must not call the upstream moderation API") - require.Len(t, repo.logs, 0) + require.Len(t, repo.snapshotLogs(), 0) } func TestContentModerationCheck_APIOnlyStrategyIgnoresKeywordList(t *testing.T) { @@ -545,7 +632,7 @@ func TestContentModerationCheck_ModelFilterAllAuditsEveryModel(t *testing.T) { require.True(t, decision.Blocked) require.Equal(t, ContentModerationActionKeywordBlock, decision.Action) } - require.Len(t, repo.logs, 2) + requireContentModerationLogCount(t, repo, 2) } func TestContentModerationCheck_ModelFilterIncludeOnlyAuditsListedModels(t *testing.T) { @@ -571,8 +658,8 @@ func TestContentModerationCheck_ModelFilterIncludeOnlyAuditsListedModels(t *test require.True(t, decision.Allowed) require.False(t, decision.Blocked) require.Equal(t, ContentModerationActionAllow, decision.Action) - require.Len(t, repo.logs, 1) - require.Equal(t, "gpt-5.5", repo.logs[0].Model) + logs := requireContentModerationLogCount(t, repo, 1) + require.Equal(t, "gpt-5.5", logs[0].Model) } func TestContentModerationCheck_ModelFilterExcludeSkipsListedModels(t *testing.T) { @@ -598,8 +685,8 @@ func TestContentModerationCheck_ModelFilterExcludeSkipsListedModels(t *testing.T require.True(t, decision.Allowed) require.False(t, decision.Blocked) require.Equal(t, ContentModerationActionAllow, decision.Action) - require.Len(t, repo.logs, 1) - require.Equal(t, "gpt-5.5", repo.logs[0].Model) + logs := requireContentModerationLogCount(t, repo, 1) + require.Equal(t, "gpt-5.5", logs[0].Model) } func TestContentModerationLoadConfig_LegacyConfigDefaultsModelFilterToAll(t *testing.T) { @@ -639,8 +726,8 @@ func TestContentModerationCheck_ModelFilterUsesRequestedModelNotBodyModel(t *tes require.NoError(t, err) require.True(t, decision.Blocked) require.Equal(t, ContentModerationActionKeywordBlock, decision.Action) - require.Len(t, repo.logs, 1) - require.Equal(t, "gpt-5.5", repo.logs[0].Model) + logs := requireContentModerationLogCount(t, repo, 1) + require.Equal(t, "gpt-5.5", logs[0].Model) } func defaultContentModerationModelFilterTestConfig() *ContentModerationConfig { @@ -939,11 +1026,11 @@ func TestContentModerationCheck_OpenAIResponsesRecordsNonHitForCodexPayload(t *t require.NoError(t, err) require.False(t, decision.Blocked) - require.Len(t, repo.logs, 1) - require.False(t, repo.logs[0].Flagged) - require.Equal(t, ContentModerationActionAllow, repo.logs[0].Action) - require.Equal(t, "/responses", repo.logs[0].Endpoint) - require.Equal(t, "last user prompt", repo.logs[0].InputExcerpt) + logs := requireContentModerationLogCount(t, repo, 1) + require.False(t, logs[0].Flagged) + require.Equal(t, ContentModerationActionAllow, logs[0].Action) + require.Equal(t, "/responses", logs[0].Endpoint) + require.Equal(t, "last user prompt", logs[0].InputExcerpt) require.Equal(t, "last user prompt", moderationRequest.Input) } @@ -1007,14 +1094,164 @@ func TestContentModerationCheck_PreBlockBlocksCodexResponsesLatestUserInput(t *t require.Equal(t, ContentModerationActionBlock, decision.Action) require.Equal(t, http.StatusUnavailableForLegalReasons, decision.StatusCode) require.Equal(t, "内容审计测试阻断", decision.Message) - require.Len(t, repo.logs, 1) - require.True(t, repo.logs[0].Flagged) - require.Equal(t, ContentModerationActionBlock, repo.logs[0].Action) - require.Equal(t, ContentModerationModePreBlock, repo.logs[0].Mode) - require.Equal(t, "latest blocked prompt", repo.logs[0].InputExcerpt) + logs := requireContentModerationLogCount(t, repo, 1) + require.True(t, logs[0].Flagged) + require.Equal(t, ContentModerationActionBlock, logs[0].Action) + require.Equal(t, ContentModerationModePreBlock, logs[0].Mode) + require.Equal(t, "latest blocked prompt", logs[0].InputExcerpt) require.Equal(t, "latest blocked prompt", moderationRequest.Input) } +func TestContentModerationStatusTracksPreBlockSyncMetrics(t *testing.T) { + var requestCount int + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestCount++ + score := 0.01 + if requestCount == 1 { + score = 0.9 + } + time.Sleep(5 * time.Millisecond) + _ = json.NewEncoder(w).Encode(moderationAPIResponse{ + Results: []moderationAPIResult{{ + CategoryScores: map[string]float64{"sexual": score}, + }}, + }) + })) + defer server.Close() + + cfg := defaultContentModerationConfig() + cfg.Enabled = true + cfg.Mode = ContentModerationModePreBlock + cfg.BaseURL = server.URL + cfg.APIKeys = []string{"sk-test"} + rawCfg, err := json.Marshal(cfg) + require.NoError(t, err) + + svc := NewContentModerationService( + &contentModerationTestSettingRepo{values: map[string]string{ + SettingKeyRiskControlEnabled: "true", + SettingKeyContentModerationConfig: string(rawCfg), + }}, + &contentModerationTestRepo{}, + &contentModerationTestHashCache{}, + nil, + nil, + nil, + nil, + ) + + for _, prompt := range []string{"blocked prompt", "clean prompt"} { + _, err := svc.Check(context.Background(), ContentModerationCheckInput{ + UserID: 1001, + Protocol: ContentModerationProtocolOpenAIChat, + Body: []byte(fmt.Sprintf(`{"messages":[{"role":"user","content":%q}]}`, prompt)), + }) + require.NoError(t, err) + } + + status, err := svc.GetStatus(context.Background()) + require.NoError(t, err) + require.Equal(t, int64(2), status.PreBlockChecked) + require.Equal(t, int64(1), status.PreBlockAllowed) + require.Equal(t, int64(1), status.PreBlockBlocked) + require.Equal(t, int64(0), status.PreBlockErrors) + require.Equal(t, 0, status.PreBlockActive) + require.GreaterOrEqual(t, status.PreBlockAvgLatencyMS, int64(1)) +} + +func TestContentModerationStatusTracksPreBlockAPIKeyLoad(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = json.NewEncoder(w).Encode(moderationAPIResponse{ + Results: []moderationAPIResult{{ + CategoryScores: map[string]float64{"sexual": 0.01}, + }}, + }) + })) + defer server.Close() + + cfg := defaultContentModerationConfig() + cfg.Enabled = true + cfg.Mode = ContentModerationModePreBlock + cfg.BaseURL = server.URL + cfg.APIKeys = []string{"sk-one", "sk-two"} + rawCfg, err := json.Marshal(cfg) + require.NoError(t, err) + + svc := NewContentModerationService( + &contentModerationTestSettingRepo{values: map[string]string{ + SettingKeyRiskControlEnabled: "true", + SettingKeyContentModerationConfig: string(rawCfg), + }}, + &contentModerationTestRepo{}, + &contentModerationTestHashCache{}, + nil, + nil, + nil, + nil, + ) + + for idx := 0; idx < 4; idx++ { + _, err := svc.Check(context.Background(), ContentModerationCheckInput{ + UserID: 1001, + Protocol: ContentModerationProtocolOpenAIChat, + Body: []byte(fmt.Sprintf(`{"messages":[{"role":"user","content":"prompt %d"}]}`, idx)), + }) + require.NoError(t, err) + } + + status, err := svc.GetStatus(context.Background()) + require.NoError(t, err) + require.Len(t, status.PreBlockAPIKeyLoads, 2) + require.Equal(t, int64(4), status.PreBlockAPIKeyTotalCalls) + require.Equal(t, int64(2), status.PreBlockAPIKeyAvailableCount) + require.Equal(t, int64(0), status.PreBlockAPIKeyActive) + require.Equal(t, int64(0), status.PreBlockAPIKeyLoads[0].Active) + require.Equal(t, int64(2), status.PreBlockAPIKeyLoads[0].Total) + require.Equal(t, int64(2), status.PreBlockAPIKeyLoads[0].Success) + require.Equal(t, int64(0), status.PreBlockAPIKeyLoads[0].Errors) + require.Equal(t, int64(2), status.PreBlockAPIKeyLoads[1].Total) + require.Equal(t, int64(2), status.PreBlockAPIKeyLoads[1].Success) +} + +func TestContentModerationStatusTracksPreBlockLocalBlocks(t *testing.T) { + cfg := defaultContentModerationConfig() + cfg.Enabled = true + cfg.Mode = ContentModerationModePreBlock + cfg.KeywordBlockingMode = ContentModerationKeywordModeKeywordOnly + cfg.BlockedKeywords = []string{"blocked"} + rawCfg, err := json.Marshal(cfg) + require.NoError(t, err) + + svc := NewContentModerationService( + &contentModerationTestSettingRepo{values: map[string]string{ + SettingKeyRiskControlEnabled: "true", + SettingKeyContentModerationConfig: string(rawCfg), + }}, + &contentModerationTestRepo{}, + &contentModerationTestHashCache{}, + nil, + nil, + nil, + nil, + ) + + for _, prompt := range []string{"blocked prompt", "clean prompt"} { + _, err := svc.Check(context.Background(), ContentModerationCheckInput{ + UserID: 1001, + Protocol: ContentModerationProtocolOpenAIChat, + Body: []byte(fmt.Sprintf(`{"messages":[{"role":"user","content":%q}]}`, prompt)), + }) + require.NoError(t, err) + } + + status, err := svc.GetStatus(context.Background()) + require.NoError(t, err) + require.Equal(t, int64(2), status.PreBlockChecked) + require.Equal(t, int64(1), status.PreBlockAllowed) + require.Equal(t, int64(1), status.PreBlockBlocked) + require.Equal(t, int64(0), status.PreBlockErrors) +} + func TestBuildContentModerationTestAuditResult_UsesConfiguredThresholdsOnly(t *testing.T) { result := buildContentModerationTestAuditResult(&moderationAPIResult{ Flagged: true, @@ -1137,6 +1374,8 @@ func TestContentModerationCheck_PreHashUsesRedisHashCache(t *testing.T) { cfg.APIKeys = []string{"sk-test"} cfg.BlockStatus = http.StatusConflict cfg.BlockMessage = "命中历史风险输入" + cfg.AutoBanEnabled = true + cfg.BanThreshold = 1 rawCfg, err := json.Marshal(cfg) require.NoError(t, err) @@ -1145,20 +1384,23 @@ func TestContentModerationCheck_PreHashUsesRedisHashCache(t *testing.T) { content.Normalize() hashCache.hashes[content.Hash()] = struct{}{} + repo := &contentModerationTestRepo{} + userRepo := &contentModerationTestUserRepo{user: &User{ID: 1001, Status: StatusActive}} svc := NewContentModerationService( &contentModerationTestSettingRepo{values: map[string]string{ SettingKeyRiskControlEnabled: "true", SettingKeyContentModerationConfig: string(rawCfg), }}, - &contentModerationTestRepo{}, + repo, hashCache, nil, - nil, + userRepo, nil, nil, ) decision, err := svc.Check(context.Background(), ContentModerationCheckInput{ + UserID: 1001, Protocol: ContentModerationProtocolOpenAIChat, Body: []byte(`{"messages":[{"role":"user","content":"blocked prompt"}]}`), }) @@ -1169,7 +1411,73 @@ func TestContentModerationCheck_PreHashUsesRedisHashCache(t *testing.T) { require.Equal(t, content.Hash(), decision.InputHash) require.Contains(t, decision.Message, "命中历史风险输入") require.Contains(t, decision.Message, content.Hash()) - require.Len(t, hashCache.checked, 1) + require.Len(t, hashCache.snapshotChecked(), 1) + logs := requireContentModerationLogCount(t, repo, 1) + require.True(t, logs[0].Flagged) + require.Equal(t, ContentModerationActionHashBlock, logs[0].Action) + require.Equal(t, 1.0, logs[0].CategoryScores["hash"]) + require.Equal(t, ContentModerationModePreBlock, logs[0].Mode) + require.Zero(t, logs[0].ViolationCount) + require.False(t, logs[0].AutoBanned) + require.Empty(t, userRepo.updated) +} + +func TestContentModerationCheck_HashBlockLogsDoNotIncreaseNextViolationCount(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = json.NewEncoder(w).Encode(moderationAPIResponse{ + Results: []moderationAPIResult{{ + CategoryScores: map[string]float64{"sexual": 0.9}, + }}, + }) + })) + defer server.Close() + + cfg := defaultContentModerationConfig() + cfg.Enabled = true + cfg.Mode = ContentModerationModePreBlock + cfg.BaseURL = server.URL + cfg.APIKeys = []string{"sk-test"} + cfg.AutoBanEnabled = false + rawCfg, err := json.Marshal(cfg) + require.NoError(t, err) + + userID := int64(1001) + repo := &contentModerationTestRepo{} + hashLog := &ContentModerationLog{ + UserID: &userID, + Action: ContentModerationActionHashBlock, + Flagged: true, + HighestCategory: "hash", + HighestScore: 1, + CreatedAt: time.Now(), + } + require.NoError(t, repo.CreateLog(context.Background(), hashLog)) + + svc := NewContentModerationService( + &contentModerationTestSettingRepo{values: map[string]string{ + SettingKeyRiskControlEnabled: "true", + SettingKeyContentModerationConfig: string(rawCfg), + }}, + repo, + &contentModerationTestHashCache{}, + nil, + nil, + nil, + nil, + ) + + decision, err := svc.Check(context.Background(), ContentModerationCheckInput{ + UserID: userID, + Protocol: ContentModerationProtocolOpenAIChat, + Body: []byte(`{"messages":[{"role":"user","content":"new blocked prompt"}]}`), + }) + + require.NoError(t, err) + require.True(t, decision.Blocked) + logs := requireContentModerationLogCount(t, repo, 2) + require.Equal(t, ContentModerationActionHashBlock, logs[0].Action) + require.Equal(t, ContentModerationActionBlock, logs[1].Action) + require.Equal(t, 1, logs[1].ViolationCount) } func TestContentModerationCheck_PreBlockFlaggedWritesRedisHashCache(t *testing.T) { @@ -1219,8 +1527,8 @@ func TestContentModerationCheck_PreBlockFlaggedWritesRedisHashCache(t *testing.T require.True(t, decision.Blocked) require.Equal(t, ContentModerationActionBlock, decision.Action) require.Equal(t, 1, requestCount) - require.Len(t, hashCache.recorded, 1) - require.Len(t, repo.logs, 1) + recorded := requireRecordedHashCount(t, hashCache, 1) + requireContentModerationLogCount(t, repo, 1) decision, err = svc.Check(context.Background(), ContentModerationCheckInput{ Protocol: ContentModerationProtocolOpenAIChat, @@ -1229,9 +1537,11 @@ func TestContentModerationCheck_PreBlockFlaggedWritesRedisHashCache(t *testing.T require.NoError(t, err) require.True(t, decision.Blocked) require.Equal(t, ContentModerationActionHashBlock, decision.Action) - require.Equal(t, hashCache.recorded[0], decision.InputHash) + require.Equal(t, recorded[0], decision.InputHash) require.Equal(t, 1, requestCount) - require.Len(t, repo.logs, 1) + logs := requireContentModerationLogCount(t, repo, 2) + require.Equal(t, ContentModerationActionBlock, logs[0].Action) + require.Equal(t, ContentModerationActionHashBlock, logs[1].Action) } func TestContentModerationDeleteFlaggedInputHash_NormalizesAndDeletes(t *testing.T) { @@ -1246,8 +1556,8 @@ func TestContentModerationDeleteFlaggedInputHash_NormalizesAndDeletes(t *testing require.NoError(t, err) require.Equal(t, existingHash, result.InputHash) require.True(t, result.Deleted) - require.NotContains(t, hashCache.hashes, existingHash) - require.Equal(t, []string{existingHash}, hashCache.deleted) + require.False(t, hashCache.hasHash(existingHash)) + require.Equal(t, []string{existingHash}, hashCache.snapshotDeleted()) result, err = svc.DeleteFlaggedInputHash(context.Background(), existingHash) @@ -1327,8 +1637,8 @@ func TestContentModerationCheck_AsyncFlaggedWritesRedisHashCache(t *testing.T) { }, cfg, ContentModerationInput{Text: "bad prompt"}, strings.Repeat("b", 64), contentModerationIntPtr(25), false) require.False(t, decision.Blocked) - require.Len(t, hashCache.recorded, 1) - require.Len(t, repo.logs, 1) + requireRecordedHashCount(t, hashCache, 1) + requireContentModerationLogCount(t, repo, 1) } func TestBuildContentModerationAccountDisabledEmailBody_ContainsBanDetails(t *testing.T) { diff --git a/frontend/src/api/admin/riskControl.ts b/frontend/src/api/admin/riskControl.ts index 521114c2..aefd1618 100644 --- a/frontend/src/api/admin/riskControl.ts +++ b/frontend/src/api/admin/riskControl.ts @@ -132,6 +132,16 @@ export interface ContentModerationRuntimeStatus { dropped: number processed: number errors: number + pre_block_active: number + pre_block_checked: number + pre_block_allowed: number + pre_block_blocked: number + pre_block_errors: number + pre_block_avg_latency_ms: number + pre_block_api_key_active: number + pre_block_api_key_available_count: number + pre_block_api_key_total_calls: number + pre_block_api_key_loads: ContentModerationAPIKeyLoad[] api_key_statuses: ContentModerationAPIKeyStatus[] flagged_hash_count: number last_cleanup_at?: string @@ -139,6 +149,20 @@ export interface ContentModerationRuntimeStatus { last_cleanup_deleted_non_hit: number } +export interface ContentModerationAPIKeyLoad { + index: number + key_hash: string + masked: string + status: ContentModerationAPIKeyStatusValue + active: number + total: number + success: number + errors: number + avg_latency_ms: number + last_latency_ms: number + last_http_status: number +} + export interface ContentModerationLog { id: number request_id: string diff --git a/frontend/src/i18n/__tests__/riskControlLocales.spec.ts b/frontend/src/i18n/__tests__/riskControlLocales.spec.ts new file mode 100644 index 00000000..eab94fe6 --- /dev/null +++ b/frontend/src/i18n/__tests__/riskControlLocales.spec.ts @@ -0,0 +1,24 @@ +import { describe, expect, it } from 'vitest' + +import en from '../locales/en' +import zh from '../locales/zh' + +describe('risk control locale copy', () => { + it('describes worker runtime as audit and pre-block record processing', () => { + expect(zh.admin.riskControl.workerStatusHint).toContain('前置拦截记录任务') + expect(zh.admin.riskControl.workerStatusHint).not.toContain('异步观察任务') + expect(en.admin.riskControl.workerStatusHint).toContain('pre-block record tasks') + expect(en.admin.riskControl.workerStatusHint).not.toContain('observation tasks') + }) + + it('keeps pre-block audit key summary aware of async worker load', () => { + expect(zh.admin.riskControl.preBlockAPIKeyLoadSummary).toContain('worker:{workerActive} / {workerTotal}') + expect(en.admin.riskControl.preBlockAPIKeyLoadSummary).toContain('worker: {workerActive} / {workerTotal}') + }) + + it('does not describe pre-block audit key polling as bypassing the worker pool', () => { + expect(zh.admin.riskControl.preBlockAPIKeyLoadHint).toBe('同步前置拦截直接轮询可用审核 Key。') + expect(zh.admin.riskControl.preBlockAPIKeyLoadHint).not.toContain('Worker 池') + expect(en.admin.riskControl.preBlockAPIKeyLoadHint).not.toContain('worker pool') + }) +}) diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index ff5ea651..41c3c495 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -2599,14 +2599,37 @@ export default { modelFilterIncludeSummary: 'Applies to {count} models', modelFilterExcludeSummary: 'Excludes {count} models', emptyLogs: 'No audit records', + preBlockSyncStatus: 'Pre-Block Sync Status', + preBlockSyncHint: 'Live counters for the synchronous moderation path, excluding async record tasks.', + preBlockActive: 'Sync Processing', + preBlockActiveHint: 'Currently checking', + preBlockChecked: 'Checked', + preBlockCheckedHint: 'Entered pre-block path', + preBlockAllowed: 'Allowed', + preBlockAllowedHint: 'No block triggered', + preBlockBlocked: 'Blocked', + preBlockBlockedHint: 'Rejected after hit', + preBlockErrors: 'Audit Errors', + preBlockErrorsHint: 'Failed or no usable key', + preBlockAvgLatency: 'Avg Latency', + preBlockAvgLatencyHint: 'Synchronous path average', + preBlockAPIKeyLoad: 'Audit Key Load', + preBlockAPIKeyLoadHint: 'Synchronous pre-block checks round-robin usable audit keys directly.', + preBlockAPIKeyLoadSummary: 'Sync active {active} / usable keys {available}, {total} total, worker: {workerActive} / {workerTotal}', + preBlockAPIKeyTotals: 'Total {total}, success {success}, errors {errors}', + preBlockAPIKeyLoadEmpty: 'No audit key load data yet', + preBlockKeyActiveShort: 'Active', + preBlockKeyTotalShort: 'Total', + preBlockKeyAvgShort: 'Avg', + preBlockKeyLastShort: 'Last', workerStatus: 'Worker Runtime', - workerStatusHint: 'Queue and worker pool status for asynchronous observation tasks.', + workerStatusHint: 'Queue and worker pool status for async audit tasks and pre-block record tasks, excluding synchronous pre-block checks.', workerPool: 'Worker Pool', workerPoolMeta: '{active} processing, {idle} idle and ready, {total} total', queueUsage: 'Queue Usage', activeWorkers: 'Processing', idleWorkers: 'Idle Ready', - workerActive: 'Processing an asynchronous audit task', + workerActive: 'Processing an async audit or record task', workerIdle: 'Started, idle and ready', workerDisabled: 'Risk control or content audit is disabled', processed: 'Processed', diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index b8ac7d2c..8ff8ea80 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -2676,14 +2676,37 @@ export default { modelFilterIncludeSummary: '仅 {count} 个模型生效', modelFilterExcludeSummary: '排除 {count} 个模型', emptyLogs: '暂无审核记录', + preBlockSyncStatus: '前置拦截同步状态', + preBlockSyncHint: '同步审核链路的实时计数,不包含异步写记录任务。', + preBlockActive: '同步处理中', + preBlockActiveHint: '当前正在审核', + preBlockChecked: '已检查', + preBlockCheckedHint: '进入前置拦截链路', + preBlockAllowed: '已放行', + preBlockAllowedHint: '未触发拦截', + preBlockBlocked: '已拦截', + preBlockBlockedHint: '命中后拒绝请求', + preBlockErrors: '审核异常', + preBlockErrorsHint: '失败或无可用 Key', + preBlockAvgLatency: '平均耗时', + preBlockAvgLatencyHint: '同步链路平均值', + preBlockAPIKeyLoad: '审核 Key 负载', + preBlockAPIKeyLoadHint: '同步前置拦截直接轮询可用审核 Key。', + preBlockAPIKeyLoadSummary: '同步并发 {active} / 可用 Key {available},累计 {total} 次,worker:{workerActive} / {workerTotal}', + preBlockAPIKeyTotals: '累计 {total},成功 {success},异常 {errors}', + preBlockAPIKeyLoadEmpty: '暂无审核 Key 负载数据', + preBlockKeyActiveShort: '并发', + preBlockKeyTotalShort: '累计', + preBlockKeyAvgShort: '平均', + preBlockKeyLastShort: '最近', workerStatus: 'Worker 运行状态', - workerStatusHint: '异步观察任务的队列和 worker 池状态。', + workerStatusHint: '异步审计任务和前置拦截记录任务的队列与 Worker 池状态,不包含同步前置拦截审核请求。', workerPool: 'Worker 池', workerPoolMeta: '{active} 个处理中,{idle} 个空闲可用,共 {total} 个', queueUsage: '队列占用', activeWorkers: '处理中', idleWorkers: '空闲可用', - workerActive: '正在处理异步审计任务', + workerActive: '正在处理异步审计或记录任务', workerIdle: '已启动,当前空闲可用', workerDisabled: '风控或内容审计未启用', processed: '已处理', diff --git a/frontend/src/views/admin/RiskControlView.vue b/frontend/src/views/admin/RiskControlView.vue index 36a04756..b6d62767 100644 --- a/frontend/src/views/admin/RiskControlView.vue +++ b/frontend/src/views/admin/RiskControlView.vue @@ -53,7 +53,105 @@ -
+
+
+
+
+

{{ t('admin.riskControl.preBlockSyncStatus') }}

+

{{ t('admin.riskControl.preBlockSyncHint') }}

+
+ + {{ modeLabel(status?.mode ?? configForm.mode) }} + +
+ +
+
+
+

{{ item.label }}

+

{{ item.value }}

+

{{ item.meta }}

+
+
+
+
+ +
+
+
+

{{ t('admin.riskControl.preBlockAPIKeyLoad') }}

+

+ {{ t('admin.riskControl.preBlockAPIKeyLoadHint') }} +

+
+ + {{ preBlockAPIKeyLoadSummaryText }} + +
+ +
+
+
+
+
+
+ #{{ item.index + 1 }} + {{ item.masked || '-' }} + +
+

+ {{ t('admin.riskControl.preBlockAPIKeyTotals', { total: formatNumber(item.total), success: formatNumber(item.success), errors: formatNumber(item.errors) }) }} +

+
+
+
+

{{ t('admin.riskControl.preBlockKeyActiveShort') }}

+

{{ formatNumber(item.active) }}

+
+
+

{{ t('admin.riskControl.preBlockKeyTotalShort') }}

+

{{ formatNumber(item.total) }}

+
+
+

{{ t('admin.riskControl.preBlockKeyAvgShort') }}

+

{{ formatNumber(item.avg_latency_ms) }} ms

+
+
+

{{ t('admin.riskControl.preBlockKeyLastShort') }}

+

{{ formatNumber(item.last_latency_ms) }} ms

+
+
+
+
+
+
+
+
+

+ {{ t('admin.riskControl.preBlockAPIKeyLoadEmpty') }} +

+
+
+
+ +

{{ t('admin.riskControl.workerStatus') }}

@@ -1013,6 +1111,7 @@ import Pagination from '@/components/common/Pagination.vue' import ModelWhitelistSelector from '@/components/account/ModelWhitelistSelector.vue' import { adminAPI } from '@/api/admin' import type { + ContentModerationAPIKeyLoad, ContentModerationAPIKeyStatus, ContentModerationConfig, ContentModerationLog, @@ -1472,6 +1571,81 @@ const queueUsageStyle = computed(() => ({ width: queueUsagePercent.value, })) +const runtimeMode = computed(() => status.value?.mode ?? configForm.mode) + +const showPreBlockRuntimeCard = computed(() => runtimeMode.value === 'pre_block') + +const showWorkerRuntimeCard = computed(() => runtimeMode.value === 'observe') + +const preBlockMetricItems = computed(() => [ + { + key: 'active', + label: t('admin.riskControl.preBlockActive'), + value: formatNumber(status.value?.pre_block_active ?? 0), + meta: t('admin.riskControl.preBlockActiveHint'), + class: 'bg-sky-50 dark:bg-sky-900/10', + valueClass: 'text-sky-700 dark:text-sky-300', + }, + { + key: 'checked', + label: t('admin.riskControl.preBlockChecked'), + value: formatNumber(status.value?.pre_block_checked ?? 0), + meta: t('admin.riskControl.preBlockCheckedHint'), + class: 'bg-gray-50 dark:bg-dark-700/50', + valueClass: 'text-gray-900 dark:text-white', + }, + { + key: 'allowed', + label: t('admin.riskControl.preBlockAllowed'), + value: formatNumber(status.value?.pre_block_allowed ?? 0), + meta: t('admin.riskControl.preBlockAllowedHint'), + class: 'bg-emerald-50 dark:bg-emerald-900/10', + valueClass: 'text-emerald-700 dark:text-emerald-300', + }, + { + key: 'blocked', + label: t('admin.riskControl.preBlockBlocked'), + value: formatNumber(status.value?.pre_block_blocked ?? 0), + meta: t('admin.riskControl.preBlockBlockedHint'), + class: 'bg-rose-50 dark:bg-rose-900/10', + valueClass: 'text-rose-700 dark:text-rose-300', + }, + { + key: 'errors', + label: t('admin.riskControl.preBlockErrors'), + value: formatNumber(status.value?.pre_block_errors ?? 0), + meta: t('admin.riskControl.preBlockErrorsHint'), + class: 'bg-amber-50 dark:bg-amber-900/10', + valueClass: 'text-amber-700 dark:text-amber-300', + }, + { + key: 'latency', + label: t('admin.riskControl.preBlockAvgLatency'), + value: `${formatNumber(status.value?.pre_block_avg_latency_ms ?? 0)} ms`, + meta: t('admin.riskControl.preBlockAvgLatencyHint'), + class: 'bg-violet-50 dark:bg-violet-900/10', + valueClass: 'text-violet-700 dark:text-violet-300', + }, +]) + +const preBlockAPIKeyLoads = computed(() => ( + [...(status.value?.pre_block_api_key_loads ?? [])].sort((a, b) => a.index - b.index) +)) + +const preBlockAPIKeyMaxTotal = computed(() => Math.max(1, ...preBlockAPIKeyLoads.value.map((item) => item.total || 0))) + +const preBlockAPIKeyLoadSummaryText = computed(() => t('admin.riskControl.preBlockAPIKeyLoadSummary', { + active: formatNumber(status.value?.pre_block_api_key_active ?? 0), + available: formatNumber(status.value?.pre_block_api_key_available_count ?? 0), + total: formatNumber(status.value?.pre_block_api_key_total_calls ?? 0), + workerActive: formatNumber(status.value?.active_workers ?? 0), + workerTotal: formatNumber(status.value?.worker_count ?? configForm.worker_count), +})) + +function preBlockAPIKeyLoadWidth(total: number): string { + return `${Math.min(100, Math.max(0, (total / preBlockAPIKeyMaxTotal.value) * 100)).toFixed(1)}%` +} + const workerSlots = computed(() => { const total = Math.max(0, status.value?.worker_count ?? configForm.worker_count) const active = Math.max(0, status.value?.active_workers ?? 0) diff --git a/frontend/src/views/admin/__tests__/RiskControlView.spec.ts b/frontend/src/views/admin/__tests__/RiskControlView.spec.ts index 3c6aa0e9..5f1798a4 100644 --- a/frontend/src/views/admin/__tests__/RiskControlView.spec.ts +++ b/frontend/src/views/admin/__tests__/RiskControlView.spec.ts @@ -58,8 +58,12 @@ vi.mock('vue-i18n', async () => { return { ...actual, useI18n: () => ({ - t: (key: string, params?: Record) => - key.replace(/\{(\w+)\}/g, (_, token) => String(params?.[token] ?? `{${token}}`)), + t: (key: string, params?: Record) => { + if (key === 'admin.riskControl.preBlockAPIKeyLoadSummary') { + return `同步并发 ${params?.active} / 可用 Key ${params?.available},累计 ${params?.total} 次,worker:${params?.workerActive} / ${params?.workerTotal}` + } + return key.replace(/\{(\w+)\}/g, (_, token) => String(params?.[token] ?? `{${token}}`)) + }, }), } }) @@ -118,6 +122,16 @@ const runtimeStatus = () => ({ dropped: 0, processed: 0, errors: 0, + pre_block_active: 0, + pre_block_checked: 0, + pre_block_allowed: 0, + pre_block_blocked: 0, + pre_block_errors: 0, + pre_block_avg_latency_ms: 0, + pre_block_api_key_active: 0, + pre_block_api_key_available_count: 0, + pre_block_api_key_total_calls: 0, + pre_block_api_key_loads: [], api_key_statuses: [], flagged_hash_count: 0, last_cleanup_deleted_hit: 0, @@ -261,4 +275,133 @@ describe('admin RiskControlView', () => { })) expect(showError).not.toHaveBeenCalled() }) + + it('describes worker runtime as async audit and pre-block record processing', async () => { + getStatus.mockResolvedValue({ + ...runtimeStatus(), + mode: 'observe', + processed: 12, + queue_length: 2, + }) + + const wrapper = mount(RiskControlView, { + global: { + stubs: { + AppLayout: AppLayoutStub, + BaseDialog: BaseDialogStub, + Icon: true, + Select: true, + Toggle: true, + Pagination: true, + ModelWhitelistSelector: ModelWhitelistSelectorStub, + }, + }, + }) + + await flushPromises() + + expect(wrapper.text()).toContain('admin.riskControl.workerStatusHint') + expect(wrapper.text()).not.toContain('admin.riskControl.preBlockSyncStatus') + expect(wrapper.text()).toContain('admin.riskControl.records') + expect(wrapper.text()).toContain('12') + expect(wrapper.text()).toContain('2 / 32,768') + }) + + it('shows pre-block synchronous moderation metrics separately from worker queue', async () => { + getStatus.mockResolvedValue({ + ...runtimeStatus(), + pre_block_active: 2, + pre_block_checked: 128, + pre_block_allowed: 120, + pre_block_blocked: 8, + pre_block_errors: 1, + pre_block_avg_latency_ms: 86, + pre_block_api_key_active: 2, + pre_block_api_key_available_count: 2, + pre_block_api_key_total_calls: 128, + active_workers: 3, + worker_count: 7, + pre_block_api_key_loads: [ + { + index: 0, + key_hash: 'hash-one', + masked: 'sk-...one', + status: 'ok', + active: 1, + total: 72, + success: 70, + errors: 2, + avg_latency_ms: 84, + last_latency_ms: 80, + last_http_status: 200, + }, + { + index: 1, + key_hash: 'hash-two', + masked: 'sk-...two', + status: 'ok', + active: 1, + total: 56, + success: 56, + errors: 0, + avg_latency_ms: 90, + last_latency_ms: 92, + last_http_status: 200, + }, + ], + }) + + const wrapper = mount(RiskControlView, { + global: { + stubs: { + AppLayout: AppLayoutStub, + BaseDialog: BaseDialogStub, + Icon: true, + Select: true, + Toggle: true, + Pagination: true, + ModelWhitelistSelector: ModelWhitelistSelectorStub, + }, + }, + }) + + await flushPromises() + + expect(wrapper.text()).toContain('admin.riskControl.preBlockSyncStatus') + expect(wrapper.text()).toContain('admin.riskControl.preBlockSyncHint') + expect(wrapper.text()).not.toContain('admin.riskControl.workerStatus') + expect(wrapper.text()).toContain('admin.riskControl.records') + expect(wrapper.text()).toContain('128') + expect(wrapper.text()).toContain('120') + expect(wrapper.text()).toContain('8') + expect(wrapper.text()).toContain('86 ms') + expect(wrapper.text()).toContain('admin.riskControl.preBlockAPIKeyLoad') + expect(wrapper.text()).toContain('sk-...one') + expect(wrapper.text()).toContain('sk-...two') + expect(wrapper.text()).toContain('72') + expect(wrapper.text()).toContain('56') + expect(wrapper.text()).toContain('同步并发 2 / 可用 Key 2,累计 128 次,worker:3 / 7') + + const runtimeCards = wrapper.get('[data-test="pre-block-runtime-cards"]') + const syncCard = wrapper.get('[data-test="pre-block-sync-card"]') + const apiKeyLoadCard = wrapper.get('[data-test="pre-block-api-key-load-card"]') + + expect(runtimeCards.classes()).toEqual(expect.arrayContaining([ + 'grid', + 'grid-cols-1', + 'xl:grid-cols-[minmax(0,520px)_minmax(0,1fr)]', + ])) + expect(syncCard.element.parentElement).toBe(runtimeCards.element) + expect(apiKeyLoadCard.element.parentElement).toBe(runtimeCards.element) + expect(syncCard.classes()).toContain('card') + expect(apiKeyLoadCard.classes()).toContain('card') + expect(syncCard.get('h2').text()).toBe('admin.riskControl.preBlockSyncStatus') + expect(syncCard.text()).toContain('admin.riskControl.preBlockSyncHint') + expect(apiKeyLoadCard.get('h2').text()).toBe('admin.riskControl.preBlockAPIKeyLoad') + expect(apiKeyLoadCard.text()).toContain('admin.riskControl.preBlockAPIKeyLoadHint') + expect(wrapper.get('[data-test="pre-block-api-key-load-list"]').classes()).toEqual(expect.arrayContaining([ + 'max-h-[280px]', + 'overflow-y-auto', + ])) + }) })