Merge pull request #2862 from lyen1688/feat/pre-block-risk-control-runtime

feat: 完善前置拦截模式的风控运行态与审核记录
This commit is contained in:
Wesley Liddick 2026-05-28 20:25:43 +08:00 committed by GitHub
commit c3cd2b9f47
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1193 additions and 105 deletions

View File

@ -7,6 +7,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -740,16 +741,31 @@ func (r *contentModerationHandlerSettingRepo) Delete(ctx context.Context, key st
} }
type contentModerationHandlerTestRepo struct { type contentModerationHandlerTestRepo struct {
mu sync.Mutex
logs []service.ContentModerationLog logs []service.ContentModerationLog
} }
func (r *contentModerationHandlerTestRepo) CreateLog(ctx context.Context, log *service.ContentModerationLog) error { func (r *contentModerationHandlerTestRepo) CreateLog(ctx context.Context, log *service.ContentModerationLog) error {
if log != nil { if log != nil {
r.mu.Lock()
defer r.mu.Unlock()
r.logs = append(r.logs, *log) r.logs = append(r.logs, *log)
} }
return nil return nil
} }
func (r *contentModerationHandlerTestRepo) resetLogs() {
r.mu.Lock()
defer r.mu.Unlock()
r.logs = nil
}
func (r *contentModerationHandlerTestRepo) logSnapshot() []service.ContentModerationLog {
r.mu.Lock()
defer r.mu.Unlock()
return append([]service.ContentModerationLog(nil), r.logs...)
}
func (r *contentModerationHandlerTestRepo) ListLogs(ctx context.Context, filter service.ContentModerationLogFilter) ([]service.ContentModerationLog, *pagination.PaginationResult, error) { func (r *contentModerationHandlerTestRepo) ListLogs(ctx context.Context, filter service.ContentModerationLogFilter) ([]service.ContentModerationLog, *pagination.PaginationResult, error) {
return nil, nil, nil return nil, nil, nil
} }
@ -808,7 +824,10 @@ func TestOpenAIResponsesWebSocket_ContentModerationBlocksFirstFrame(t *testing.T
}) })
require.NoError(t, err) require.NoError(t, err)
require.True(t, decision.Blocked) require.True(t, decision.Blocked)
repo.logs = nil require.Eventually(t, func() bool {
return len(repo.logSnapshot()) == 1
}, time.Second, 10*time.Millisecond)
repo.resetLogs()
h := &OpenAIGatewayHandler{ h := &OpenAIGatewayHandler{
gatewayService: &service.OpenAIGatewayService{}, gatewayService: &service.OpenAIGatewayService{},
billingCacheService: &service.BillingCacheService{}, billingCacheService: &service.BillingCacheService{},
@ -848,10 +867,11 @@ func TestOpenAIResponsesWebSocket_ContentModerationBlocksFirstFrame(t *testing.T
require.Equal(t, coderws.StatusPolicyViolation, closeErr.Code) require.Equal(t, coderws.StatusPolicyViolation, closeErr.Code)
require.Contains(t, closeErr.Reason, "内容审计测试阻断") require.Contains(t, closeErr.Reason, "内容审计测试阻断")
} }
require.Len(t, repo.logs, 1) logs := repo.logSnapshot()
require.True(t, repo.logs[0].Flagged) require.Len(t, logs, 1)
require.Equal(t, service.ContentModerationActionBlock, repo.logs[0].Action) require.True(t, logs[0].Flagged)
require.Equal(t, "bad prompt", repo.logs[0].InputExcerpt) require.Equal(t, service.ContentModerationActionBlock, logs[0].Action)
require.Equal(t, "bad prompt", logs[0].InputExcerpt)
} }
func TestOpenAIResponsesWebSocket_PassthroughUsageLogPersistsUserAgentAndReasoningEffort(t *testing.T) { func TestOpenAIResponsesWebSocket_PassthroughUsageLogPersistsUserAgentAndReasoningEffort(t *testing.T) {

View File

@ -192,6 +192,7 @@ SELECT COUNT(*)
FROM content_moderation_logs FROM content_moderation_logs
WHERE user_id = $1 WHERE user_id = $1
AND flagged = TRUE AND flagged = TRUE
AND action <> 'hash_block'
AND created_at >= $2 AND created_at >= $2
AND created_at > COALESCE((SELECT at FROM last_auto_ban), '-infinity'::timestamptz) AND created_at > COALESCE((SELECT at FROM last_auto_ban), '-infinity'::timestamptz)
`, userID, since).Scan(&count) `, userID, since).Scan(&count)
@ -246,7 +247,7 @@ func buildContentModerationLogWhere(filter service.ContentModerationLogFilter) (
case "hit", "flagged": case "hit", "flagged":
where = append(where, "l.flagged = TRUE") where = append(where, "l.flagged = TRUE")
case "blocked", "block": case "blocked", "block":
where = append(where, "l.action = 'block'") where = append(where, "l.action IN ('block', 'keyword_block', 'hash_block')")
case "pass", "allow": case "pass", "allow":
where = append(where, "l.flagged = FALSE AND l.error = ''") where = append(where, "l.flagged = FALSE AND l.error = ''")
case "error": case "error":

View File

@ -0,0 +1,40 @@
package repository
import (
"context"
"regexp"
"strings"
"testing"
"time"
sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/stretchr/testify/require"
)
func TestBuildContentModerationLogWhere_BlockedIncludesAllBlockActions(t *testing.T) {
where, args := buildContentModerationLogWhere(service.ContentModerationLogFilter{Result: "blocked"})
require.Empty(t, args)
sql := strings.Join(where, " AND ")
require.Contains(t, sql, "l.action IN ('block', 'keyword_block', 'hash_block')")
require.NotContains(t, sql, "l.action = 'block'")
}
func TestContentModerationRepositoryCountFlaggedByUserSince_ExcludesHashBlock(t *testing.T) {
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer func() { _ = db.Close() }()
repo := NewContentModerationRepository(db)
since := time.Now().Add(-time.Hour)
mock.ExpectQuery(regexp.QuoteMeta("AND action <> 'hash_block'")).
WithArgs(int64(1001), since).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(2))
count, err := repo.CountFlaggedByUserSince(context.Background(), 1001, since)
require.NoError(t, err)
require.Equal(t, 2, count)
require.NoError(t, mock.ExpectationsWereMet())
}

View File

@ -211,6 +211,20 @@ type ContentModerationAPIKeyStatus struct {
Configured bool `json:"configured"` Configured bool `json:"configured"`
} }
type ContentModerationAPIKeyLoad struct {
Index int `json:"index"`
KeyHash string `json:"key_hash"`
Masked string `json:"masked"`
Status string `json:"status"`
Active int64 `json:"active"`
Total int64 `json:"total"`
Success int64 `json:"success"`
Errors int64 `json:"errors"`
AvgLatencyMS int64 `json:"avg_latency_ms"`
LastLatencyMS int `json:"last_latency_ms"`
LastHTTPStatus int `json:"last_http_status"`
}
type TestContentModerationAPIKeysInput struct { type TestContentModerationAPIKeysInput struct {
APIKeys []string `json:"api_keys"` APIKeys []string `json:"api_keys"`
BaseURL string `json:"base_url"` BaseURL string `json:"base_url"`
@ -399,25 +413,35 @@ type ContentModerationCleanupResult struct {
} }
type ContentModerationRuntimeStatus struct { type ContentModerationRuntimeStatus struct {
Enabled bool `json:"enabled"` Enabled bool `json:"enabled"`
RiskControlEnabled bool `json:"risk_control_enabled"` RiskControlEnabled bool `json:"risk_control_enabled"`
Mode string `json:"mode"` Mode string `json:"mode"`
WorkerCount int `json:"worker_count"` WorkerCount int `json:"worker_count"`
MaxWorkers int `json:"max_workers"` MaxWorkers int `json:"max_workers"`
ActiveWorkers int `json:"active_workers"` ActiveWorkers int `json:"active_workers"`
IdleWorkers int `json:"idle_workers"` IdleWorkers int `json:"idle_workers"`
QueueSize int `json:"queue_size"` QueueSize int `json:"queue_size"`
QueueLength int `json:"queue_length"` QueueLength int `json:"queue_length"`
QueueUsagePercent float64 `json:"queue_usage_percent"` QueueUsagePercent float64 `json:"queue_usage_percent"`
Enqueued int64 `json:"enqueued"` Enqueued int64 `json:"enqueued"`
Dropped int64 `json:"dropped"` Dropped int64 `json:"dropped"`
Processed int64 `json:"processed"` Processed int64 `json:"processed"`
Errors int64 `json:"errors"` Errors int64 `json:"errors"`
APIKeyStatuses []ContentModerationAPIKeyStatus `json:"api_key_statuses"` PreBlockActive int `json:"pre_block_active"`
FlaggedHashCount int64 `json:"flagged_hash_count"` PreBlockChecked int64 `json:"pre_block_checked"`
LastCleanupAt *time.Time `json:"last_cleanup_at,omitempty"` PreBlockAllowed int64 `json:"pre_block_allowed"`
LastCleanupDeletedHit int64 `json:"last_cleanup_deleted_hit"` PreBlockBlocked int64 `json:"pre_block_blocked"`
LastCleanupDeletedNonHit int64 `json:"last_cleanup_deleted_non_hit"` PreBlockErrors int64 `json:"pre_block_errors"`
PreBlockAvgLatencyMS int64 `json:"pre_block_avg_latency_ms"`
PreBlockAPIKeyActive int64 `json:"pre_block_api_key_active"`
PreBlockAPIKeyAvailableCount int64 `json:"pre_block_api_key_available_count"`
PreBlockAPIKeyTotalCalls int64 `json:"pre_block_api_key_total_calls"`
PreBlockAPIKeyLoads []ContentModerationAPIKeyLoad `json:"pre_block_api_key_loads"`
APIKeyStatuses []ContentModerationAPIKeyStatus `json:"api_key_statuses"`
FlaggedHashCount int64 `json:"flagged_hash_count"`
LastCleanupAt *time.Time `json:"last_cleanup_at,omitempty"`
LastCleanupDeletedHit int64 `json:"last_cleanup_deleted_hit"`
LastCleanupDeletedNonHit int64 `json:"last_cleanup_deleted_non_hit"`
} }
type ContentModerationUnbanUserResult struct { type ContentModerationUnbanUserResult struct {
@ -466,6 +490,12 @@ type ContentModerationService struct {
asyncDropped atomic.Int64 asyncDropped atomic.Int64
asyncProcessed atomic.Int64 asyncProcessed atomic.Int64
asyncErrors atomic.Int64 asyncErrors atomic.Int64
preBlockActive atomic.Int64
preBlockChecked atomic.Int64
preBlockAllowed atomic.Int64
preBlockBlocked atomic.Int64
preBlockErrors atomic.Int64
preBlockLatencyTotalMS atomic.Int64
lastCleanupUnix atomic.Int64 lastCleanupUnix atomic.Int64
lastCleanupDeletedHit atomic.Int64 lastCleanupDeletedHit atomic.Int64
lastCleanupDeletedNonHit atomic.Int64 lastCleanupDeletedNonHit atomic.Int64
@ -474,10 +504,14 @@ type ContentModerationService struct {
} }
type contentModerationTask struct { type contentModerationTask struct {
input ContentModerationCheckInput input ContentModerationCheckInput
content ContentModerationInput content ContentModerationInput
inputHash string inputHash string
enqueuedAt time.Time log *ContentModerationLog
config *ContentModerationConfig
recordHash bool
applySideEffects bool
enqueuedAt time.Time
} }
type contentModerationKeyHealth struct { type contentModerationKeyHealth struct {
@ -491,6 +525,11 @@ type contentModerationKeyHealth struct {
LastLatencyMS int LastLatencyMS int
LastHTTPStatus int LastHTTPStatus int
LastTested bool LastTested bool
SyncActive int64
SyncTotal int64
SyncSuccess int64
SyncErrors int64
SyncLatencyMS int64
} }
func NewContentModerationService( func NewContentModerationService(
@ -827,9 +866,11 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer
"protocol", input.Protocol, "protocol", input.Protocol,
"text_runes", len([]rune(content.Text)), "text_runes", len([]rune(content.Text)),
"image_count", len(content.Images)) "image_count", len(content.Images))
hashText := content.Hash()
if cfg.Mode == ContentModerationModePreBlock { if cfg.Mode == ContentModerationModePreBlock {
if cfg.KeywordBlockingMode != ContentModerationKeywordModeAPIOnly && len(cfg.BlockedKeywords) > 0 { if cfg.KeywordBlockingMode != ContentModerationKeywordModeAPIOnly && len(cfg.BlockedKeywords) > 0 {
if keyword, hit := matchBlockedKeyword(content.Text, cfg.BlockedKeywords); hit { if keyword, hit := matchBlockedKeyword(content.Text, cfg.BlockedKeywords); hit {
s.recordPreBlockSyncMetric(0, ContentModerationActionKeywordBlock)
slog.Info("content_moderation.keyword_block", slog.Info("content_moderation.keyword_block",
"user_id", input.UserID, "user_id", input.UserID,
"api_key_id", input.APIKeyID, "api_key_id", input.APIKeyID,
@ -840,8 +881,7 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer
"keyword", keyword) "keyword", keyword)
scores := map[string]float64{contentModerationKeywordCategory: 1.0} scores := map[string]float64{contentModerationKeywordCategory: 1.0}
log := s.buildLog(input, cfg, ContentModerationActionKeywordBlock, true, contentModerationKeywordCategory, 1.0, scores, content.ExcerptText(), nil, nil, "") log := s.buildLog(input, cfg, ContentModerationActionKeywordBlock, true, contentModerationKeywordCategory, 1.0, scores, content.ExcerptText(), nil, nil, "")
s.applyFlaggedSideEffects(ctx, cfg, log) s.enqueueRecord(input, cfg, log, hashText, false, true)
_ = s.repo.CreateLog(ctx, log)
return &ContentModerationDecision{ return &ContentModerationDecision{
Allowed: false, Allowed: false,
Blocked: true, Blocked: true,
@ -856,6 +896,7 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer
} }
} }
if cfg.KeywordBlockingMode == ContentModerationKeywordModeKeywordOnly { if cfg.KeywordBlockingMode == ContentModerationKeywordModeKeywordOnly {
s.recordPreBlockSyncMetric(0, ContentModerationActionAllow)
slog.Info("content_moderation.skip_api_keyword_only", slog.Info("content_moderation.skip_api_keyword_only",
"user_id", input.UserID, "user_id", input.UserID,
"api_key_id", input.APIKeyID, "api_key_id", input.APIKeyID,
@ -865,13 +906,15 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer
return allow, nil return allow, nil
} }
} }
hashText := content.Hash()
if cfg.PreHashCheckEnabled && s.hashCache != nil { if cfg.PreHashCheckEnabled && s.hashCache != nil {
matched, err := s.hashCache.HasFlaggedInputHash(ctx, hashText) matched, err := s.hashCache.HasFlaggedInputHash(ctx, hashText)
if err != nil { if err != nil {
slog.Warn("content_moderation.hash_check_failed", "user_id", input.UserID, "endpoint", input.Endpoint, "error", err) slog.Warn("content_moderation.hash_check_failed", "user_id", input.UserID, "endpoint", input.Endpoint, "error", err)
} }
if matched { if matched {
if cfg.Mode == ContentModerationModePreBlock {
s.recordPreBlockSyncMetric(0, ContentModerationActionHashBlock)
}
slog.Info("content_moderation.hash_block", slog.Info("content_moderation.hash_block",
"user_id", input.UserID, "user_id", input.UserID,
"api_key_id", input.APIKeyID, "api_key_id", input.APIKeyID,
@ -883,6 +926,9 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer
if message != "" { if message != "" {
message = fmt.Sprintf("%shash: %s", message, hashText) message = fmt.Sprintf("%shash: %s", message, hashText)
} }
scores := map[string]float64{"hash": 1.0}
log := s.buildLog(input, cfg, ContentModerationActionHashBlock, true, "hash", 1.0, scores, content.ExcerptText(), nil, nil, "")
s.enqueueRecord(input, cfg, log, hashText, false, false)
return &ContentModerationDecision{ return &ContentModerationDecision{
Allowed: false, Allowed: false,
Blocked: true, Blocked: true,
@ -895,6 +941,9 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer
} }
} }
if !cfg.shouldSample(hashText) { if !cfg.shouldSample(hashText) {
if cfg.Mode == ContentModerationModePreBlock {
s.recordPreBlockSyncMetric(0, ContentModerationActionAllow)
}
slog.Info("content_moderation.skip_sample_rate", slog.Info("content_moderation.skip_sample_rate",
"user_id", input.UserID, "user_id", input.UserID,
"api_key_id", input.APIKeyID, "api_key_id", input.APIKeyID,
@ -905,6 +954,9 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer
return allow, nil return allow, nil
} }
if len(cfg.apiKeys()) == 0 { if len(cfg.apiKeys()) == 0 {
if cfg.Mode == ContentModerationModePreBlock {
s.recordPreBlockSyncMetric(0, ContentModerationActionError)
}
slog.Warn("content_moderation.skip_no_audit_api_keys", slog.Warn("content_moderation.skip_no_audit_api_keys",
"user_id", input.UserID, "user_id", input.UserID,
"api_key_id", input.APIKeyID, "api_key_id", input.APIKeyID,
@ -930,10 +982,18 @@ func (s *ContentModerationService) Check(ctx context.Context, input ContentModer
func (s *ContentModerationService) checkSync(ctx context.Context, input ContentModerationCheckInput, cfg *ContentModerationConfig, content ContentModerationInput, hashText string, queueDelay *int, allowBlock bool) *ContentModerationDecision { func (s *ContentModerationService) checkSync(ctx context.Context, input ContentModerationCheckInput, cfg *ContentModerationConfig, content ContentModerationInput, hashText string, queueDelay *int, allowBlock bool) *ContentModerationDecision {
allow := &ContentModerationDecision{Allowed: true, Action: ContentModerationActionAllow} allow := &ContentModerationDecision{Allowed: true, Action: ContentModerationActionAllow}
trackPreBlock := queueDelay == nil && allowBlock && cfg != nil && cfg.Mode == ContentModerationModePreBlock
if trackPreBlock {
s.preBlockActive.Add(1)
defer s.preBlockActive.Add(-1)
}
start := time.Now() start := time.Now()
result, err := s.callModeration(ctx, cfg, content.ModerationInput()) result, err := s.callModeration(ctx, cfg, content.ModerationInput(), trackPreBlock)
latency := int(time.Since(start).Milliseconds()) latency := int(time.Since(start).Milliseconds())
if err != nil { if err != nil {
if trackPreBlock {
s.recordPreBlockSyncMetric(latency, ContentModerationActionError)
}
slog.Warn("content_moderation.audit_api_failed", slog.Warn("content_moderation.audit_api_failed",
"user_id", input.UserID, "user_id", input.UserID,
"api_key_id", input.APIKeyID, "api_key_id", input.APIKeyID,
@ -962,6 +1022,9 @@ func (s *ContentModerationService) checkSync(ctx context.Context, input ContentM
action = ContentModerationActionBlock action = ContentModerationActionBlock
blocked = true blocked = true
} }
if trackPreBlock {
s.recordPreBlockSyncMetric(latency, action)
}
slog.Info("content_moderation.audit_result", slog.Info("content_moderation.audit_result",
"user_id", input.UserID, "user_id", input.UserID,
"api_key_id", input.APIKeyID, "api_key_id", input.APIKeyID,
@ -980,13 +1043,11 @@ func (s *ContentModerationService) checkSync(ctx context.Context, input ContentM
"queue_delay_ms", queueDelay) "queue_delay_ms", queueDelay)
if flagged || cfg.RecordNonHits { if flagged || cfg.RecordNonHits {
log := s.buildLog(input, cfg, action, flagged, highestCategory, highestScore, result.CategoryScores, content.ExcerptText(), &latency, queueDelay, "") log := s.buildLog(input, cfg, action, flagged, highestCategory, highestScore, result.CategoryScores, content.ExcerptText(), &latency, queueDelay, "")
if flagged && s.hashCache != nil { if queueDelay == nil && cfg.Mode == ContentModerationModePreBlock {
if err := s.hashCache.RecordFlaggedInputHash(ctx, hashText); err != nil { s.enqueueRecord(input, cfg, log, hashText, flagged, flagged)
slog.Warn("content_moderation.record_hash_failed", "user_id", input.UserID, "endpoint", input.Endpoint, "error", err) } else {
} s.persistContentModerationLog(ctx, cfg, log, hashText, flagged, flagged)
} }
s.applyFlaggedSideEffects(ctx, cfg, log)
_ = s.repo.CreateLog(ctx, log)
} }
if blocked { if blocked {
return &ContentModerationDecision{ return &ContentModerationDecision{
@ -1012,6 +1073,25 @@ func (s *ContentModerationService) checkSync(ctx context.Context, input ContentM
} }
} }
func (s *ContentModerationService) recordPreBlockSyncMetric(latencyMS int, action string) {
if s == nil {
return
}
s.preBlockChecked.Add(1)
if latencyMS < 0 {
latencyMS = 0
}
s.preBlockLatencyTotalMS.Add(int64(latencyMS))
switch action {
case ContentModerationActionBlock, ContentModerationActionHashBlock, ContentModerationActionKeywordBlock:
s.preBlockBlocked.Add(1)
case ContentModerationActionError:
s.preBlockErrors.Add(1)
default:
s.preBlockAllowed.Add(1)
}
}
func (s *ContentModerationService) enqueueAsync(input ContentModerationCheckInput, cfg *ContentModerationConfig, content ContentModerationInput, hashText string) { func (s *ContentModerationService) enqueueAsync(input ContentModerationCheckInput, cfg *ContentModerationConfig, content ContentModerationInput, hashText string) {
if s == nil || s.asyncQueue == nil { if s == nil || s.asyncQueue == nil {
return return
@ -1040,11 +1120,49 @@ func (s *ContentModerationService) enqueueAsync(input ContentModerationCheckInpu
} }
} }
func (s *ContentModerationService) enqueueRecord(input ContentModerationCheckInput, cfg *ContentModerationConfig, log *ContentModerationLog, inputHash string, recordHash bool, applySideEffects bool) {
if s == nil || s.asyncQueue == nil || log == nil {
return
}
queueSize := defaultContentModerationQueueSize
if cfg != nil && cfg.QueueSize > 0 {
queueSize = cfg.QueueSize
}
if len(s.asyncQueue) >= queueSize {
slog.Warn("content_moderation.record_queue_full",
"user_id", input.UserID,
"endpoint", input.Endpoint,
"action", log.Action,
"queue_size", queueSize)
s.asyncDropped.Add(1)
return
}
task := contentModerationTask{
input: input,
inputHash: inputHash,
log: log,
config: cloneContentModerationConfig(cfg),
recordHash: recordHash,
applySideEffects: applySideEffects,
enqueuedAt: time.Now(),
}
select {
case s.asyncQueue <- task:
s.asyncEnqueued.Add(1)
default:
slog.Warn("content_moderation.record_queue_full",
"user_id", input.UserID,
"endpoint", input.Endpoint,
"action", log.Action)
s.asyncDropped.Add(1)
}
}
func (s *ContentModerationService) worker(id int) { func (s *ContentModerationService) worker(id int) {
for { for {
ctx, cancel := context.WithTimeout(context.Background(), maxContentModerationTimeoutMS*time.Millisecond+10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), maxContentModerationTimeoutMS*time.Millisecond+10*time.Second)
cfg, err := s.loadConfig(ctx) cfg, err := s.loadConfig(ctx)
if err != nil || !cfg.Enabled || cfg.Mode == ContentModerationModeOff || len(cfg.apiKeys()) == 0 || id >= cfg.WorkerCount { if err != nil || id >= cfg.WorkerCount {
cancel() cancel()
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
@ -1061,6 +1179,22 @@ func (s *ContentModerationService) worker(id int) {
slog.Error("content_moderation.worker_panic", "worker_id", id, "recover", r) slog.Error("content_moderation.worker_panic", "worker_id", id, "recover", r)
} }
}() }()
if task.log != nil {
s.asyncActive.Add(1)
defer s.asyncActive.Add(-1)
queueDelay := int(time.Since(task.enqueuedAt).Milliseconds())
task.log.QueueDelayMS = &queueDelay
taskCfg := task.config
if taskCfg == nil {
taskCfg = cfg
}
s.persistContentModerationLog(ctx, taskCfg, task.log, task.inputHash, task.recordHash, task.applySideEffects)
s.asyncProcessed.Add(1)
return
}
if !cfg.Enabled || cfg.Mode == ContentModerationModeOff || len(cfg.apiKeys()) == 0 {
return
}
if !cfg.includesGroup(task.input.GroupID) { if !cfg.includesGroup(task.input.GroupID) {
return return
} }
@ -1186,6 +1320,15 @@ func (s *ContentModerationService) GetStatus(ctx context.Context) (*ContentModer
if active > cfg.WorkerCount { if active > cfg.WorkerCount {
active = cfg.WorkerCount active = cfg.WorkerCount
} }
preBlockActive := int(s.preBlockActive.Load())
if preBlockActive < 0 {
preBlockActive = 0
}
preBlockChecked := s.preBlockChecked.Load()
preBlockAvgLatency := int64(0)
if preBlockChecked > 0 {
preBlockAvgLatency = s.preBlockLatencyTotalMS.Load() / preBlockChecked
}
queueLength := 0 queueLength := 0
if s.asyncQueue != nil { if s.asyncQueue != nil {
queueLength = len(s.asyncQueue) queueLength = len(s.asyncQueue)
@ -1208,25 +1351,35 @@ func (s *ContentModerationService) GetStatus(ctx context.Context) (*ContentModer
lastCleanupAt = &t lastCleanupAt = &t
} }
return &ContentModerationRuntimeStatus{ return &ContentModerationRuntimeStatus{
Enabled: cfg.Enabled, Enabled: cfg.Enabled,
RiskControlEnabled: riskEnabled, RiskControlEnabled: riskEnabled,
Mode: cfg.Mode, Mode: cfg.Mode,
WorkerCount: cfg.WorkerCount, WorkerCount: cfg.WorkerCount,
MaxWorkers: maxContentModerationWorkerCount, MaxWorkers: maxContentModerationWorkerCount,
ActiveWorkers: active, ActiveWorkers: active,
IdleWorkers: cfg.WorkerCount - active, IdleWorkers: cfg.WorkerCount - active,
QueueSize: cfg.QueueSize, QueueSize: cfg.QueueSize,
QueueLength: queueLength, QueueLength: queueLength,
QueueUsagePercent: queueUsage, QueueUsagePercent: queueUsage,
Enqueued: s.asyncEnqueued.Load(), Enqueued: s.asyncEnqueued.Load(),
Dropped: s.asyncDropped.Load(), Dropped: s.asyncDropped.Load(),
Processed: s.asyncProcessed.Load(), Processed: s.asyncProcessed.Load(),
Errors: s.asyncErrors.Load(), Errors: s.asyncErrors.Load(),
APIKeyStatuses: s.apiKeyStatuses(cfg.apiKeys()), PreBlockActive: preBlockActive,
FlaggedHashCount: flaggedHashCount, PreBlockChecked: preBlockChecked,
LastCleanupAt: lastCleanupAt, PreBlockAllowed: s.preBlockAllowed.Load(),
LastCleanupDeletedHit: s.lastCleanupDeletedHit.Load(), PreBlockBlocked: s.preBlockBlocked.Load(),
LastCleanupDeletedNonHit: s.lastCleanupDeletedNonHit.Load(), PreBlockErrors: s.preBlockErrors.Load(),
PreBlockAvgLatencyMS: preBlockAvgLatency,
PreBlockAPIKeyActive: s.preBlockAPIKeyActive(cfg.apiKeys()),
PreBlockAPIKeyAvailableCount: s.preBlockAPIKeyAvailableCount(cfg.apiKeys()),
PreBlockAPIKeyTotalCalls: s.preBlockAPIKeyTotalCalls(cfg.apiKeys()),
PreBlockAPIKeyLoads: s.preBlockAPIKeyLoads(cfg.apiKeys()),
APIKeyStatuses: s.apiKeyStatuses(cfg.apiKeys()),
FlaggedHashCount: flaggedHashCount,
LastCleanupAt: lastCleanupAt,
LastCleanupDeletedHit: s.lastCleanupDeletedHit.Load(),
LastCleanupDeletedNonHit: s.lastCleanupDeletedNonHit.Load(),
}, nil }, nil
} }
@ -1325,7 +1478,7 @@ func (s *ContentModerationService) validateConfig(ctx context.Context, cfg *Cont
return nil return nil
} }
func (s *ContentModerationService) callModeration(ctx context.Context, cfg *ContentModerationConfig, input any) (*moderationAPIResult, error) { func (s *ContentModerationService) callModeration(ctx context.Context, cfg *ContentModerationConfig, input any, trackKeyLoad ...bool) (*moderationAPIResult, error) {
attempts := cfg.RetryCount + 1 attempts := cfg.RetryCount + 1
if attempts <= 0 { if attempts <= 0 {
attempts = 1 attempts = 1
@ -1333,6 +1486,7 @@ func (s *ContentModerationService) callModeration(ctx context.Context, cfg *Cont
if attempts > maxContentModerationRetryCount+1 { if attempts > maxContentModerationRetryCount+1 {
attempts = maxContentModerationRetryCount + 1 attempts = maxContentModerationRetryCount + 1
} }
trackLoad := len(trackKeyLoad) > 0 && trackKeyLoad[0]
var lastErr error var lastErr error
for attempt := 0; attempt < attempts; attempt++ { for attempt := 0; attempt < attempts; attempt++ {
key, ok := s.nextUsableAPIKey(cfg) key, ok := s.nextUsableAPIKey(cfg)
@ -1340,14 +1494,23 @@ func (s *ContentModerationService) callModeration(ctx context.Context, cfg *Cont
lastErr = errors.New("no moderation api key available") lastErr = errors.New("no moderation api key available")
break break
} }
if trackLoad {
s.beginModerationAPIKeyCall(key)
}
start := time.Now() start := time.Now()
httpStatus := 0 httpStatus := 0
result, err := s.callModerationOnceWithInput(ctx, cfg, key, input, &httpStatus) result, err := s.callModerationOnceWithInput(ctx, cfg, key, input, &httpStatus)
latency := int(time.Since(start).Milliseconds()) latency := int(time.Since(start).Milliseconds())
if err == nil { if err == nil {
if trackLoad {
s.finishModerationAPIKeyCall(key, latency, true)
}
s.markAPIKeySuccess(key, latency, httpStatus) s.markAPIKeySuccess(key, latency, httpStatus)
return result, nil return result, nil
} }
if trackLoad {
s.finishModerationAPIKeyCall(key, latency, false)
}
s.markAPIKeyError(key, err.Error(), latency, httpStatus) s.markAPIKeyError(key, err.Error(), latency, httpStatus)
lastErr = err lastErr = err
if httpStatus == http.StatusBadRequest { if httpStatus == http.StatusBadRequest {
@ -1452,10 +1615,32 @@ func (s *ContentModerationService) buildLog(input ContentModerationCheckInput, c
} }
} }
func (s *ContentModerationService) applyFlaggedSideEffects(ctx context.Context, cfg *ContentModerationConfig, log *ContentModerationLog) { func (s *ContentModerationService) persistContentModerationLog(ctx context.Context, cfg *ContentModerationConfig, log *ContentModerationLog, hashText string, recordHash bool, applySideEffects bool) {
if s == nil || cfg == nil || log == nil || !log.Flagged || log.UserID == nil || *log.UserID <= 0 { if s == nil || log == nil {
return return
} }
if recordHash && s.hashCache != nil {
if err := s.hashCache.RecordFlaggedInputHash(ctx, hashText); err != nil {
slog.Warn("content_moderation.record_hash_failed", "user_id", contentModerationEmailUserID(log), "endpoint", log.Endpoint, "error", err)
}
}
autoBanJustApplied := false
if applySideEffects {
autoBanJustApplied = s.applyFlaggedAccountSideEffects(ctx, cfg, log)
s.sendFlaggedNotificationSideEffects(ctx, cfg, log, autoBanJustApplied)
}
if s.repo != nil {
if err := s.repo.CreateLog(ctx, log); err != nil {
slog.Warn("content_moderation.create_log_failed", "user_id", contentModerationEmailUserID(log), "endpoint", log.Endpoint, "action", log.Action, "error", err)
return
}
}
}
func (s *ContentModerationService) applyFlaggedAccountSideEffects(ctx context.Context, cfg *ContentModerationConfig, log *ContentModerationLog) bool {
if s == nil || cfg == nil || log == nil || !log.Flagged || log.UserID == nil || *log.UserID <= 0 {
return false
}
count := 1 count := 1
if s.repo != nil && cfg.ViolationWindowHours > 0 { if s.repo != nil && cfg.ViolationWindowHours > 0 {
since := time.Now().Add(-time.Duration(cfg.ViolationWindowHours) * time.Hour) since := time.Now().Add(-time.Duration(cfg.ViolationWindowHours) * time.Hour)
@ -1469,13 +1654,13 @@ func (s *ContentModerationService) applyFlaggedSideEffects(ctx context.Context,
user, err := s.userRepo.GetByID(ctx, *log.UserID) user, err := s.userRepo.GetByID(ctx, *log.UserID)
if err != nil { if err != nil {
slog.Warn("content_moderation.ban_get_user_failed", "user_id", *log.UserID, "error", err) slog.Warn("content_moderation.ban_get_user_failed", "user_id", *log.UserID, "error", err)
return return false
} }
if user.Status != StatusDisabled { if user.Status != StatusDisabled {
user.Status = StatusDisabled user.Status = StatusDisabled
if err := s.userRepo.Update(ctx, user); err != nil { if err := s.userRepo.Update(ctx, user); err != nil {
slog.Warn("content_moderation.ban_update_user_failed", "user_id", *log.UserID, "error", err) slog.Warn("content_moderation.ban_update_user_failed", "user_id", *log.UserID, "error", err)
return return false
} }
if s.authCacheInvalidator != nil { if s.authCacheInvalidator != nil {
s.authCacheInvalidator.InvalidateAuthCacheByUserID(ctx, *log.UserID) s.authCacheInvalidator.InvalidateAuthCacheByUserID(ctx, *log.UserID)
@ -1484,7 +1669,13 @@ func (s *ContentModerationService) applyFlaggedSideEffects(ctx context.Context,
} }
log.AutoBanned = true log.AutoBanned = true
} }
return autoBanJustApplied
}
func (s *ContentModerationService) sendFlaggedNotificationSideEffects(ctx context.Context, cfg *ContentModerationConfig, log *ContentModerationLog, autoBanJustApplied bool) {
if s == nil || cfg == nil || log == nil || !log.Flagged {
return
}
if s.emailService == nil || strings.TrimSpace(log.UserEmail) == "" { if s.emailService == nil || strings.TrimSpace(log.UserEmail) == "" {
return return
} }
@ -1642,6 +1833,22 @@ func defaultContentModerationConfig() *ContentModerationConfig {
} }
} }
func cloneContentModerationConfig(cfg *ContentModerationConfig) *ContentModerationConfig {
if cfg == nil {
return nil
}
clone := *cfg
clone.APIKeys = append([]string(nil), cfg.APIKeys...)
clone.GroupIDs = append([]int64(nil), cfg.GroupIDs...)
clone.BlockedKeywords = append([]string(nil), cfg.BlockedKeywords...)
clone.Thresholds = cloneFloatMap(cfg.Thresholds)
clone.ModelFilter = ContentModerationModelFilter{
Type: cfg.ModelFilter.Type,
Models: append([]string(nil), cfg.ModelFilter.Models...),
}
return &clone
}
func (cfg *ContentModerationConfig) normalize() { func (cfg *ContentModerationConfig) normalize() {
if cfg.APIKey != "" { if cfg.APIKey != "" {
cfg.APIKeys = normalizeModerationAPIKeys(append(cfg.APIKeys, cfg.APIKey)) cfg.APIKeys = normalizeModerationAPIKeys(append(cfg.APIKeys, cfg.APIKey))
@ -1807,6 +2014,40 @@ func (s *ContentModerationService) isAPIKeyFrozen(key string, now time.Time) boo
return state != nil && state.FrozenUntil.After(now) return state != nil && state.FrozenUntil.After(now)
} }
func (s *ContentModerationService) beginModerationAPIKeyCall(key string) {
hash := moderationAPIKeyHash(key)
if hash == "" || s == nil {
return
}
s.keyHealthMu.Lock()
defer s.keyHealthMu.Unlock()
state := s.ensureAPIKeyHealthLocked(hash, maskSecretTail(key))
state.SyncActive++
}
func (s *ContentModerationService) finishModerationAPIKeyCall(key string, latencyMS int, success bool) {
hash := moderationAPIKeyHash(key)
if hash == "" || s == nil {
return
}
if latencyMS < 0 {
latencyMS = 0
}
s.keyHealthMu.Lock()
defer s.keyHealthMu.Unlock()
state := s.ensureAPIKeyHealthLocked(hash, maskSecretTail(key))
if state.SyncActive > 0 {
state.SyncActive--
}
state.SyncTotal++
state.SyncLatencyMS += int64(latencyMS)
if success {
state.SyncSuccess++
return
}
state.SyncErrors++
}
func (s *ContentModerationService) markAPIKeySuccess(key string, latencyMS int, httpStatus int) { func (s *ContentModerationService) markAPIKeySuccess(key string, latencyMS int, httpStatus int) {
hash := moderationAPIKeyHash(key) hash := moderationAPIKeyHash(key)
if hash == "" || s == nil { if hash == "" || s == nil {
@ -1926,6 +2167,71 @@ func (s *ContentModerationService) apiKeyStatuses(keys []string) []ContentModera
return out return out
} }
func (s *ContentModerationService) preBlockAPIKeyLoads(keys []string) []ContentModerationAPIKeyLoad {
out := make([]ContentModerationAPIKeyLoad, 0, len(keys))
for idx, key := range keys {
out = append(out, s.preBlockAPIKeyLoadForHash(idx, moderationAPIKeyHash(key), maskSecretTail(key)))
}
return out
}
func (s *ContentModerationService) preBlockAPIKeyActive(keys []string) int64 {
var total int64
for _, item := range s.preBlockAPIKeyLoads(keys) {
total += item.Active
}
return total
}
func (s *ContentModerationService) preBlockAPIKeyAvailableCount(keys []string) int64 {
now := time.Now()
var count int64
for _, key := range keys {
if !s.isAPIKeyFrozen(key, now) {
count++
}
}
return count
}
func (s *ContentModerationService) preBlockAPIKeyTotalCalls(keys []string) int64 {
var total int64
for _, item := range s.preBlockAPIKeyLoads(keys) {
total += item.Total
}
return total
}
func (s *ContentModerationService) preBlockAPIKeyLoadForHash(index int, hash string, masked string) ContentModerationAPIKeyLoad {
load := ContentModerationAPIKeyLoad{
Index: index,
KeyHash: hash,
Masked: masked,
Status: "unknown",
}
status := s.apiKeyStatusForHash(index, hash, masked, true)
load.Status = status.Status
load.LastLatencyMS = status.LastLatencyMS
load.LastHTTPStatus = status.LastHTTPStatus
if hash == "" || s == nil {
return load
}
s.keyHealthMu.Lock()
defer s.keyHealthMu.Unlock()
state := s.keyHealth[hash]
if state == nil {
return load
}
load.Active = state.SyncActive
load.Total = state.SyncTotal
load.Success = state.SyncSuccess
load.Errors = state.SyncErrors
if state.SyncTotal > 0 {
load.AvgLatencyMS = state.SyncLatencyMS / state.SyncTotal
}
return load
}
func (s *ContentModerationService) apiKeyStatusForHash(index int, hash string, masked string, configured bool) ContentModerationAPIKeyStatus { func (s *ContentModerationService) apiKeyStatusForHash(index int, hash string, masked string, configured bool) ContentModerationAPIKeyStatus {
status := ContentModerationAPIKeyStatus{ status := ContentModerationAPIKeyStatus{
Index: index, Index: index,

View File

@ -3,9 +3,11 @@ package service
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -73,10 +75,13 @@ func (r *contentModerationTestSettingRepo) Delete(ctx context.Context, key strin
} }
type contentModerationTestRepo struct { type contentModerationTestRepo struct {
mu sync.Mutex
logs []ContentModerationLog logs []ContentModerationLog
} }
func (r *contentModerationTestRepo) CreateLog(ctx context.Context, log *ContentModerationLog) error { func (r *contentModerationTestRepo) CreateLog(ctx context.Context, log *ContentModerationLog) error {
r.mu.Lock()
defer r.mu.Unlock()
if log != nil { if log != nil {
r.logs = append(r.logs, *log) r.logs = append(r.logs, *log)
} }
@ -88,14 +93,55 @@ func (r *contentModerationTestRepo) ListLogs(ctx context.Context, filter Content
} }
func (r *contentModerationTestRepo) CountFlaggedByUserSince(ctx context.Context, userID int64, since time.Time) (int, error) { func (r *contentModerationTestRepo) CountFlaggedByUserSince(ctx context.Context, userID int64, since time.Time) (int, error) {
return 0, nil r.mu.Lock()
defer r.mu.Unlock()
count := 0
for _, log := range r.logs {
if log.UserID == nil || *log.UserID != userID || !log.Flagged || log.Action == ContentModerationActionHashBlock {
continue
}
if log.CreatedAt.IsZero() || log.CreatedAt.Before(since) {
continue
}
count++
}
return count, nil
} }
func (r *contentModerationTestRepo) CleanupExpiredLogs(ctx context.Context, hitBefore time.Time, nonHitBefore time.Time) (*ContentModerationCleanupResult, error) { func (r *contentModerationTestRepo) CleanupExpiredLogs(ctx context.Context, hitBefore time.Time, nonHitBefore time.Time) (*ContentModerationCleanupResult, error) {
return &ContentModerationCleanupResult{}, nil return &ContentModerationCleanupResult{}, nil
} }
func (r *contentModerationTestRepo) snapshotLogs() []ContentModerationLog {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]ContentModerationLog, len(r.logs))
copy(out, r.logs)
return out
}
func requireContentModerationLogCount(t *testing.T, repo *contentModerationTestRepo, want int) []ContentModerationLog {
t.Helper()
var logs []ContentModerationLog
require.Eventually(t, func() bool {
logs = repo.snapshotLogs()
return len(logs) == want
}, time.Second, 10*time.Millisecond)
return logs
}
func requireRecordedHashCount(t *testing.T, cache *contentModerationTestHashCache, want int) []string {
t.Helper()
var hashes []string
require.Eventually(t, func() bool {
hashes = cache.snapshotRecorded()
return len(hashes) == want
}, time.Second, 10*time.Millisecond)
return hashes
}
type contentModerationTestHashCache struct { type contentModerationTestHashCache struct {
mu sync.Mutex
hashes map[string]struct{} hashes map[string]struct{}
recorded []string recorded []string
checked []string checked []string
@ -246,6 +292,8 @@ func (i *contentModerationTestAuthCacheInvalidator) InvalidateAuthCacheByGroupID
} }
func (c *contentModerationTestHashCache) RecordFlaggedInputHash(ctx context.Context, inputHash string) error { func (c *contentModerationTestHashCache) RecordFlaggedInputHash(ctx context.Context, inputHash string) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.hashes == nil { if c.hashes == nil {
c.hashes = map[string]struct{}{} c.hashes = map[string]struct{}{}
} }
@ -255,6 +303,8 @@ func (c *contentModerationTestHashCache) RecordFlaggedInputHash(ctx context.Cont
} }
func (c *contentModerationTestHashCache) HasFlaggedInputHash(ctx context.Context, inputHash string) (bool, error) { func (c *contentModerationTestHashCache) HasFlaggedInputHash(ctx context.Context, inputHash string) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.checked = append(c.checked, inputHash) c.checked = append(c.checked, inputHash)
if c.hasResultUsed { if c.hasResultUsed {
return c.hasResult, nil return c.hasResult, nil
@ -264,6 +314,8 @@ func (c *contentModerationTestHashCache) HasFlaggedInputHash(ctx context.Context
} }
func (c *contentModerationTestHashCache) DeleteFlaggedInputHash(ctx context.Context, inputHash string) (bool, error) { func (c *contentModerationTestHashCache) DeleteFlaggedInputHash(ctx context.Context, inputHash string) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.deleted = append(c.deleted, inputHash) c.deleted = append(c.deleted, inputHash)
if c.hashes == nil { if c.hashes == nil {
return false, nil return false, nil
@ -276,15 +328,50 @@ func (c *contentModerationTestHashCache) DeleteFlaggedInputHash(ctx context.Cont
} }
func (c *contentModerationTestHashCache) ClearFlaggedInputHashes(ctx context.Context) (int64, error) { func (c *contentModerationTestHashCache) ClearFlaggedInputHashes(ctx context.Context) (int64, error) {
c.mu.Lock()
defer c.mu.Unlock()
deleted := int64(len(c.hashes)) deleted := int64(len(c.hashes))
c.hashes = map[string]struct{}{} c.hashes = map[string]struct{}{}
return deleted, nil return deleted, nil
} }
func (c *contentModerationTestHashCache) CountFlaggedInputHashes(ctx context.Context) (int64, error) { func (c *contentModerationTestHashCache) CountFlaggedInputHashes(ctx context.Context) (int64, error) {
c.mu.Lock()
defer c.mu.Unlock()
return int64(len(c.hashes)), nil return int64(len(c.hashes)), nil
} }
func (c *contentModerationTestHashCache) snapshotRecorded() []string {
c.mu.Lock()
defer c.mu.Unlock()
out := make([]string, len(c.recorded))
copy(out, c.recorded)
return out
}
func (c *contentModerationTestHashCache) snapshotChecked() []string {
c.mu.Lock()
defer c.mu.Unlock()
out := make([]string, len(c.checked))
copy(out, c.checked)
return out
}
func (c *contentModerationTestHashCache) hasHash(inputHash string) bool {
c.mu.Lock()
defer c.mu.Unlock()
_, ok := c.hashes[inputHash]
return ok
}
func (c *contentModerationTestHashCache) snapshotDeleted() []string {
c.mu.Lock()
defer c.mu.Unlock()
out := make([]string, len(c.deleted))
copy(out, c.deleted)
return out
}
func TestBuildContentModerationLog_RedactsInputExcerpt(t *testing.T) { func TestBuildContentModerationLog_RedactsInputExcerpt(t *testing.T) {
svc := &ContentModerationService{} svc := &ContentModerationService{}
cfg := defaultContentModerationConfig() cfg := defaultContentModerationConfig()
@ -381,10 +468,10 @@ func TestContentModerationCheck_PreBlockKeywordHitSkipsUpstreamCall(t *testing.T
require.True(t, decision.Blocked) require.True(t, decision.Blocked)
require.Equal(t, ContentModerationActionKeywordBlock, decision.Action) require.Equal(t, ContentModerationActionKeywordBlock, decision.Action)
require.False(t, upstreamCalled, "keyword block must short-circuit upstream moderation call") require.False(t, upstreamCalled, "keyword block must short-circuit upstream moderation call")
require.Len(t, repo.logs, 1) logs := requireContentModerationLogCount(t, repo, 1)
require.True(t, repo.logs[0].Flagged) require.True(t, logs[0].Flagged)
require.Equal(t, ContentModerationActionKeywordBlock, repo.logs[0].Action) require.Equal(t, ContentModerationActionKeywordBlock, logs[0].Action)
require.Equal(t, contentModerationKeywordCategory, repo.logs[0].HighestCategory) require.Equal(t, contentModerationKeywordCategory, logs[0].HighestCategory)
} }
func TestContentModerationCheck_KeywordsIgnoredInObserveMode(t *testing.T) { func TestContentModerationCheck_KeywordsIgnoredInObserveMode(t *testing.T) {
@ -474,7 +561,7 @@ func TestContentModerationCheck_KeywordOnlyStrategySkipsAPIOnMiss(t *testing.T)
require.NoError(t, err) require.NoError(t, err)
require.True(t, decision.Allowed, "keyword-only must allow misses without calling the API") require.True(t, decision.Allowed, "keyword-only must allow misses without calling the API")
require.False(t, upstreamCalled, "keyword-only must not call the upstream moderation API") require.False(t, upstreamCalled, "keyword-only must not call the upstream moderation API")
require.Len(t, repo.logs, 0) require.Len(t, repo.snapshotLogs(), 0)
} }
func TestContentModerationCheck_APIOnlyStrategyIgnoresKeywordList(t *testing.T) { func TestContentModerationCheck_APIOnlyStrategyIgnoresKeywordList(t *testing.T) {
@ -545,7 +632,7 @@ func TestContentModerationCheck_ModelFilterAllAuditsEveryModel(t *testing.T) {
require.True(t, decision.Blocked) require.True(t, decision.Blocked)
require.Equal(t, ContentModerationActionKeywordBlock, decision.Action) require.Equal(t, ContentModerationActionKeywordBlock, decision.Action)
} }
require.Len(t, repo.logs, 2) requireContentModerationLogCount(t, repo, 2)
} }
func TestContentModerationCheck_ModelFilterIncludeOnlyAuditsListedModels(t *testing.T) { func TestContentModerationCheck_ModelFilterIncludeOnlyAuditsListedModels(t *testing.T) {
@ -571,8 +658,8 @@ func TestContentModerationCheck_ModelFilterIncludeOnlyAuditsListedModels(t *test
require.True(t, decision.Allowed) require.True(t, decision.Allowed)
require.False(t, decision.Blocked) require.False(t, decision.Blocked)
require.Equal(t, ContentModerationActionAllow, decision.Action) require.Equal(t, ContentModerationActionAllow, decision.Action)
require.Len(t, repo.logs, 1) logs := requireContentModerationLogCount(t, repo, 1)
require.Equal(t, "gpt-5.5", repo.logs[0].Model) require.Equal(t, "gpt-5.5", logs[0].Model)
} }
func TestContentModerationCheck_ModelFilterExcludeSkipsListedModels(t *testing.T) { func TestContentModerationCheck_ModelFilterExcludeSkipsListedModels(t *testing.T) {
@ -598,8 +685,8 @@ func TestContentModerationCheck_ModelFilterExcludeSkipsListedModels(t *testing.T
require.True(t, decision.Allowed) require.True(t, decision.Allowed)
require.False(t, decision.Blocked) require.False(t, decision.Blocked)
require.Equal(t, ContentModerationActionAllow, decision.Action) require.Equal(t, ContentModerationActionAllow, decision.Action)
require.Len(t, repo.logs, 1) logs := requireContentModerationLogCount(t, repo, 1)
require.Equal(t, "gpt-5.5", repo.logs[0].Model) require.Equal(t, "gpt-5.5", logs[0].Model)
} }
func TestContentModerationLoadConfig_LegacyConfigDefaultsModelFilterToAll(t *testing.T) { func TestContentModerationLoadConfig_LegacyConfigDefaultsModelFilterToAll(t *testing.T) {
@ -639,8 +726,8 @@ func TestContentModerationCheck_ModelFilterUsesRequestedModelNotBodyModel(t *tes
require.NoError(t, err) require.NoError(t, err)
require.True(t, decision.Blocked) require.True(t, decision.Blocked)
require.Equal(t, ContentModerationActionKeywordBlock, decision.Action) require.Equal(t, ContentModerationActionKeywordBlock, decision.Action)
require.Len(t, repo.logs, 1) logs := requireContentModerationLogCount(t, repo, 1)
require.Equal(t, "gpt-5.5", repo.logs[0].Model) require.Equal(t, "gpt-5.5", logs[0].Model)
} }
func defaultContentModerationModelFilterTestConfig() *ContentModerationConfig { func defaultContentModerationModelFilterTestConfig() *ContentModerationConfig {
@ -939,11 +1026,11 @@ func TestContentModerationCheck_OpenAIResponsesRecordsNonHitForCodexPayload(t *t
require.NoError(t, err) require.NoError(t, err)
require.False(t, decision.Blocked) require.False(t, decision.Blocked)
require.Len(t, repo.logs, 1) logs := requireContentModerationLogCount(t, repo, 1)
require.False(t, repo.logs[0].Flagged) require.False(t, logs[0].Flagged)
require.Equal(t, ContentModerationActionAllow, repo.logs[0].Action) require.Equal(t, ContentModerationActionAllow, logs[0].Action)
require.Equal(t, "/responses", repo.logs[0].Endpoint) require.Equal(t, "/responses", logs[0].Endpoint)
require.Equal(t, "last user prompt", repo.logs[0].InputExcerpt) require.Equal(t, "last user prompt", logs[0].InputExcerpt)
require.Equal(t, "last user prompt", moderationRequest.Input) require.Equal(t, "last user prompt", moderationRequest.Input)
} }
@ -1007,14 +1094,164 @@ func TestContentModerationCheck_PreBlockBlocksCodexResponsesLatestUserInput(t *t
require.Equal(t, ContentModerationActionBlock, decision.Action) require.Equal(t, ContentModerationActionBlock, decision.Action)
require.Equal(t, http.StatusUnavailableForLegalReasons, decision.StatusCode) require.Equal(t, http.StatusUnavailableForLegalReasons, decision.StatusCode)
require.Equal(t, "内容审计测试阻断", decision.Message) require.Equal(t, "内容审计测试阻断", decision.Message)
require.Len(t, repo.logs, 1) logs := requireContentModerationLogCount(t, repo, 1)
require.True(t, repo.logs[0].Flagged) require.True(t, logs[0].Flagged)
require.Equal(t, ContentModerationActionBlock, repo.logs[0].Action) require.Equal(t, ContentModerationActionBlock, logs[0].Action)
require.Equal(t, ContentModerationModePreBlock, repo.logs[0].Mode) require.Equal(t, ContentModerationModePreBlock, logs[0].Mode)
require.Equal(t, "latest blocked prompt", repo.logs[0].InputExcerpt) require.Equal(t, "latest blocked prompt", logs[0].InputExcerpt)
require.Equal(t, "latest blocked prompt", moderationRequest.Input) require.Equal(t, "latest blocked prompt", moderationRequest.Input)
} }
func TestContentModerationStatusTracksPreBlockSyncMetrics(t *testing.T) {
var requestCount int
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
score := 0.01
if requestCount == 1 {
score = 0.9
}
time.Sleep(5 * time.Millisecond)
_ = json.NewEncoder(w).Encode(moderationAPIResponse{
Results: []moderationAPIResult{{
CategoryScores: map[string]float64{"sexual": score},
}},
})
}))
defer server.Close()
cfg := defaultContentModerationConfig()
cfg.Enabled = true
cfg.Mode = ContentModerationModePreBlock
cfg.BaseURL = server.URL
cfg.APIKeys = []string{"sk-test"}
rawCfg, err := json.Marshal(cfg)
require.NoError(t, err)
svc := NewContentModerationService(
&contentModerationTestSettingRepo{values: map[string]string{
SettingKeyRiskControlEnabled: "true",
SettingKeyContentModerationConfig: string(rawCfg),
}},
&contentModerationTestRepo{},
&contentModerationTestHashCache{},
nil,
nil,
nil,
nil,
)
for _, prompt := range []string{"blocked prompt", "clean prompt"} {
_, err := svc.Check(context.Background(), ContentModerationCheckInput{
UserID: 1001,
Protocol: ContentModerationProtocolOpenAIChat,
Body: []byte(fmt.Sprintf(`{"messages":[{"role":"user","content":%q}]}`, prompt)),
})
require.NoError(t, err)
}
status, err := svc.GetStatus(context.Background())
require.NoError(t, err)
require.Equal(t, int64(2), status.PreBlockChecked)
require.Equal(t, int64(1), status.PreBlockAllowed)
require.Equal(t, int64(1), status.PreBlockBlocked)
require.Equal(t, int64(0), status.PreBlockErrors)
require.Equal(t, 0, status.PreBlockActive)
require.GreaterOrEqual(t, status.PreBlockAvgLatencyMS, int64(1))
}
func TestContentModerationStatusTracksPreBlockAPIKeyLoad(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = json.NewEncoder(w).Encode(moderationAPIResponse{
Results: []moderationAPIResult{{
CategoryScores: map[string]float64{"sexual": 0.01},
}},
})
}))
defer server.Close()
cfg := defaultContentModerationConfig()
cfg.Enabled = true
cfg.Mode = ContentModerationModePreBlock
cfg.BaseURL = server.URL
cfg.APIKeys = []string{"sk-one", "sk-two"}
rawCfg, err := json.Marshal(cfg)
require.NoError(t, err)
svc := NewContentModerationService(
&contentModerationTestSettingRepo{values: map[string]string{
SettingKeyRiskControlEnabled: "true",
SettingKeyContentModerationConfig: string(rawCfg),
}},
&contentModerationTestRepo{},
&contentModerationTestHashCache{},
nil,
nil,
nil,
nil,
)
for idx := 0; idx < 4; idx++ {
_, err := svc.Check(context.Background(), ContentModerationCheckInput{
UserID: 1001,
Protocol: ContentModerationProtocolOpenAIChat,
Body: []byte(fmt.Sprintf(`{"messages":[{"role":"user","content":"prompt %d"}]}`, idx)),
})
require.NoError(t, err)
}
status, err := svc.GetStatus(context.Background())
require.NoError(t, err)
require.Len(t, status.PreBlockAPIKeyLoads, 2)
require.Equal(t, int64(4), status.PreBlockAPIKeyTotalCalls)
require.Equal(t, int64(2), status.PreBlockAPIKeyAvailableCount)
require.Equal(t, int64(0), status.PreBlockAPIKeyActive)
require.Equal(t, int64(0), status.PreBlockAPIKeyLoads[0].Active)
require.Equal(t, int64(2), status.PreBlockAPIKeyLoads[0].Total)
require.Equal(t, int64(2), status.PreBlockAPIKeyLoads[0].Success)
require.Equal(t, int64(0), status.PreBlockAPIKeyLoads[0].Errors)
require.Equal(t, int64(2), status.PreBlockAPIKeyLoads[1].Total)
require.Equal(t, int64(2), status.PreBlockAPIKeyLoads[1].Success)
}
func TestContentModerationStatusTracksPreBlockLocalBlocks(t *testing.T) {
cfg := defaultContentModerationConfig()
cfg.Enabled = true
cfg.Mode = ContentModerationModePreBlock
cfg.KeywordBlockingMode = ContentModerationKeywordModeKeywordOnly
cfg.BlockedKeywords = []string{"blocked"}
rawCfg, err := json.Marshal(cfg)
require.NoError(t, err)
svc := NewContentModerationService(
&contentModerationTestSettingRepo{values: map[string]string{
SettingKeyRiskControlEnabled: "true",
SettingKeyContentModerationConfig: string(rawCfg),
}},
&contentModerationTestRepo{},
&contentModerationTestHashCache{},
nil,
nil,
nil,
nil,
)
for _, prompt := range []string{"blocked prompt", "clean prompt"} {
_, err := svc.Check(context.Background(), ContentModerationCheckInput{
UserID: 1001,
Protocol: ContentModerationProtocolOpenAIChat,
Body: []byte(fmt.Sprintf(`{"messages":[{"role":"user","content":%q}]}`, prompt)),
})
require.NoError(t, err)
}
status, err := svc.GetStatus(context.Background())
require.NoError(t, err)
require.Equal(t, int64(2), status.PreBlockChecked)
require.Equal(t, int64(1), status.PreBlockAllowed)
require.Equal(t, int64(1), status.PreBlockBlocked)
require.Equal(t, int64(0), status.PreBlockErrors)
}
func TestBuildContentModerationTestAuditResult_UsesConfiguredThresholdsOnly(t *testing.T) { func TestBuildContentModerationTestAuditResult_UsesConfiguredThresholdsOnly(t *testing.T) {
result := buildContentModerationTestAuditResult(&moderationAPIResult{ result := buildContentModerationTestAuditResult(&moderationAPIResult{
Flagged: true, Flagged: true,
@ -1137,6 +1374,8 @@ func TestContentModerationCheck_PreHashUsesRedisHashCache(t *testing.T) {
cfg.APIKeys = []string{"sk-test"} cfg.APIKeys = []string{"sk-test"}
cfg.BlockStatus = http.StatusConflict cfg.BlockStatus = http.StatusConflict
cfg.BlockMessage = "命中历史风险输入" cfg.BlockMessage = "命中历史风险输入"
cfg.AutoBanEnabled = true
cfg.BanThreshold = 1
rawCfg, err := json.Marshal(cfg) rawCfg, err := json.Marshal(cfg)
require.NoError(t, err) require.NoError(t, err)
@ -1145,20 +1384,23 @@ func TestContentModerationCheck_PreHashUsesRedisHashCache(t *testing.T) {
content.Normalize() content.Normalize()
hashCache.hashes[content.Hash()] = struct{}{} hashCache.hashes[content.Hash()] = struct{}{}
repo := &contentModerationTestRepo{}
userRepo := &contentModerationTestUserRepo{user: &User{ID: 1001, Status: StatusActive}}
svc := NewContentModerationService( svc := NewContentModerationService(
&contentModerationTestSettingRepo{values: map[string]string{ &contentModerationTestSettingRepo{values: map[string]string{
SettingKeyRiskControlEnabled: "true", SettingKeyRiskControlEnabled: "true",
SettingKeyContentModerationConfig: string(rawCfg), SettingKeyContentModerationConfig: string(rawCfg),
}}, }},
&contentModerationTestRepo{}, repo,
hashCache, hashCache,
nil, nil,
nil, userRepo,
nil, nil,
nil, nil,
) )
decision, err := svc.Check(context.Background(), ContentModerationCheckInput{ decision, err := svc.Check(context.Background(), ContentModerationCheckInput{
UserID: 1001,
Protocol: ContentModerationProtocolOpenAIChat, Protocol: ContentModerationProtocolOpenAIChat,
Body: []byte(`{"messages":[{"role":"user","content":"blocked prompt"}]}`), Body: []byte(`{"messages":[{"role":"user","content":"blocked prompt"}]}`),
}) })
@ -1169,7 +1411,73 @@ func TestContentModerationCheck_PreHashUsesRedisHashCache(t *testing.T) {
require.Equal(t, content.Hash(), decision.InputHash) require.Equal(t, content.Hash(), decision.InputHash)
require.Contains(t, decision.Message, "命中历史风险输入") require.Contains(t, decision.Message, "命中历史风险输入")
require.Contains(t, decision.Message, content.Hash()) require.Contains(t, decision.Message, content.Hash())
require.Len(t, hashCache.checked, 1) require.Len(t, hashCache.snapshotChecked(), 1)
logs := requireContentModerationLogCount(t, repo, 1)
require.True(t, logs[0].Flagged)
require.Equal(t, ContentModerationActionHashBlock, logs[0].Action)
require.Equal(t, 1.0, logs[0].CategoryScores["hash"])
require.Equal(t, ContentModerationModePreBlock, logs[0].Mode)
require.Zero(t, logs[0].ViolationCount)
require.False(t, logs[0].AutoBanned)
require.Empty(t, userRepo.updated)
}
func TestContentModerationCheck_HashBlockLogsDoNotIncreaseNextViolationCount(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = json.NewEncoder(w).Encode(moderationAPIResponse{
Results: []moderationAPIResult{{
CategoryScores: map[string]float64{"sexual": 0.9},
}},
})
}))
defer server.Close()
cfg := defaultContentModerationConfig()
cfg.Enabled = true
cfg.Mode = ContentModerationModePreBlock
cfg.BaseURL = server.URL
cfg.APIKeys = []string{"sk-test"}
cfg.AutoBanEnabled = false
rawCfg, err := json.Marshal(cfg)
require.NoError(t, err)
userID := int64(1001)
repo := &contentModerationTestRepo{}
hashLog := &ContentModerationLog{
UserID: &userID,
Action: ContentModerationActionHashBlock,
Flagged: true,
HighestCategory: "hash",
HighestScore: 1,
CreatedAt: time.Now(),
}
require.NoError(t, repo.CreateLog(context.Background(), hashLog))
svc := NewContentModerationService(
&contentModerationTestSettingRepo{values: map[string]string{
SettingKeyRiskControlEnabled: "true",
SettingKeyContentModerationConfig: string(rawCfg),
}},
repo,
&contentModerationTestHashCache{},
nil,
nil,
nil,
nil,
)
decision, err := svc.Check(context.Background(), ContentModerationCheckInput{
UserID: userID,
Protocol: ContentModerationProtocolOpenAIChat,
Body: []byte(`{"messages":[{"role":"user","content":"new blocked prompt"}]}`),
})
require.NoError(t, err)
require.True(t, decision.Blocked)
logs := requireContentModerationLogCount(t, repo, 2)
require.Equal(t, ContentModerationActionHashBlock, logs[0].Action)
require.Equal(t, ContentModerationActionBlock, logs[1].Action)
require.Equal(t, 1, logs[1].ViolationCount)
} }
func TestContentModerationCheck_PreBlockFlaggedWritesRedisHashCache(t *testing.T) { func TestContentModerationCheck_PreBlockFlaggedWritesRedisHashCache(t *testing.T) {
@ -1219,8 +1527,8 @@ func TestContentModerationCheck_PreBlockFlaggedWritesRedisHashCache(t *testing.T
require.True(t, decision.Blocked) require.True(t, decision.Blocked)
require.Equal(t, ContentModerationActionBlock, decision.Action) require.Equal(t, ContentModerationActionBlock, decision.Action)
require.Equal(t, 1, requestCount) require.Equal(t, 1, requestCount)
require.Len(t, hashCache.recorded, 1) recorded := requireRecordedHashCount(t, hashCache, 1)
require.Len(t, repo.logs, 1) requireContentModerationLogCount(t, repo, 1)
decision, err = svc.Check(context.Background(), ContentModerationCheckInput{ decision, err = svc.Check(context.Background(), ContentModerationCheckInput{
Protocol: ContentModerationProtocolOpenAIChat, Protocol: ContentModerationProtocolOpenAIChat,
@ -1229,9 +1537,11 @@ func TestContentModerationCheck_PreBlockFlaggedWritesRedisHashCache(t *testing.T
require.NoError(t, err) require.NoError(t, err)
require.True(t, decision.Blocked) require.True(t, decision.Blocked)
require.Equal(t, ContentModerationActionHashBlock, decision.Action) require.Equal(t, ContentModerationActionHashBlock, decision.Action)
require.Equal(t, hashCache.recorded[0], decision.InputHash) require.Equal(t, recorded[0], decision.InputHash)
require.Equal(t, 1, requestCount) require.Equal(t, 1, requestCount)
require.Len(t, repo.logs, 1) logs := requireContentModerationLogCount(t, repo, 2)
require.Equal(t, ContentModerationActionBlock, logs[0].Action)
require.Equal(t, ContentModerationActionHashBlock, logs[1].Action)
} }
func TestContentModerationDeleteFlaggedInputHash_NormalizesAndDeletes(t *testing.T) { func TestContentModerationDeleteFlaggedInputHash_NormalizesAndDeletes(t *testing.T) {
@ -1246,8 +1556,8 @@ func TestContentModerationDeleteFlaggedInputHash_NormalizesAndDeletes(t *testing
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, existingHash, result.InputHash) require.Equal(t, existingHash, result.InputHash)
require.True(t, result.Deleted) require.True(t, result.Deleted)
require.NotContains(t, hashCache.hashes, existingHash) require.False(t, hashCache.hasHash(existingHash))
require.Equal(t, []string{existingHash}, hashCache.deleted) require.Equal(t, []string{existingHash}, hashCache.snapshotDeleted())
result, err = svc.DeleteFlaggedInputHash(context.Background(), existingHash) result, err = svc.DeleteFlaggedInputHash(context.Background(), existingHash)
@ -1327,8 +1637,8 @@ func TestContentModerationCheck_AsyncFlaggedWritesRedisHashCache(t *testing.T) {
}, cfg, ContentModerationInput{Text: "bad prompt"}, strings.Repeat("b", 64), contentModerationIntPtr(25), false) }, cfg, ContentModerationInput{Text: "bad prompt"}, strings.Repeat("b", 64), contentModerationIntPtr(25), false)
require.False(t, decision.Blocked) require.False(t, decision.Blocked)
require.Len(t, hashCache.recorded, 1) requireRecordedHashCount(t, hashCache, 1)
require.Len(t, repo.logs, 1) requireContentModerationLogCount(t, repo, 1)
} }
func TestBuildContentModerationAccountDisabledEmailBody_ContainsBanDetails(t *testing.T) { func TestBuildContentModerationAccountDisabledEmailBody_ContainsBanDetails(t *testing.T) {

View File

@ -132,6 +132,16 @@ export interface ContentModerationRuntimeStatus {
dropped: number dropped: number
processed: number processed: number
errors: number errors: number
pre_block_active: number
pre_block_checked: number
pre_block_allowed: number
pre_block_blocked: number
pre_block_errors: number
pre_block_avg_latency_ms: number
pre_block_api_key_active: number
pre_block_api_key_available_count: number
pre_block_api_key_total_calls: number
pre_block_api_key_loads: ContentModerationAPIKeyLoad[]
api_key_statuses: ContentModerationAPIKeyStatus[] api_key_statuses: ContentModerationAPIKeyStatus[]
flagged_hash_count: number flagged_hash_count: number
last_cleanup_at?: string last_cleanup_at?: string
@ -139,6 +149,20 @@ export interface ContentModerationRuntimeStatus {
last_cleanup_deleted_non_hit: number last_cleanup_deleted_non_hit: number
} }
export interface ContentModerationAPIKeyLoad {
index: number
key_hash: string
masked: string
status: ContentModerationAPIKeyStatusValue
active: number
total: number
success: number
errors: number
avg_latency_ms: number
last_latency_ms: number
last_http_status: number
}
export interface ContentModerationLog { export interface ContentModerationLog {
id: number id: number
request_id: string request_id: string

View File

@ -0,0 +1,24 @@
import { describe, expect, it } from 'vitest'
import en from '../locales/en'
import zh from '../locales/zh'
describe('risk control locale copy', () => {
it('describes worker runtime as audit and pre-block record processing', () => {
expect(zh.admin.riskControl.workerStatusHint).toContain('前置拦截记录任务')
expect(zh.admin.riskControl.workerStatusHint).not.toContain('异步观察任务')
expect(en.admin.riskControl.workerStatusHint).toContain('pre-block record tasks')
expect(en.admin.riskControl.workerStatusHint).not.toContain('observation tasks')
})
it('keeps pre-block audit key summary aware of async worker load', () => {
expect(zh.admin.riskControl.preBlockAPIKeyLoadSummary).toContain('worker{workerActive} / {workerTotal}')
expect(en.admin.riskControl.preBlockAPIKeyLoadSummary).toContain('worker: {workerActive} / {workerTotal}')
})
it('does not describe pre-block audit key polling as bypassing the worker pool', () => {
expect(zh.admin.riskControl.preBlockAPIKeyLoadHint).toBe('同步前置拦截直接轮询可用审核 Key。')
expect(zh.admin.riskControl.preBlockAPIKeyLoadHint).not.toContain('Worker 池')
expect(en.admin.riskControl.preBlockAPIKeyLoadHint).not.toContain('worker pool')
})
})

View File

@ -2599,14 +2599,37 @@ export default {
modelFilterIncludeSummary: 'Applies to {count} models', modelFilterIncludeSummary: 'Applies to {count} models',
modelFilterExcludeSummary: 'Excludes {count} models', modelFilterExcludeSummary: 'Excludes {count} models',
emptyLogs: 'No audit records', emptyLogs: 'No audit records',
preBlockSyncStatus: 'Pre-Block Sync Status',
preBlockSyncHint: 'Live counters for the synchronous moderation path, excluding async record tasks.',
preBlockActive: 'Sync Processing',
preBlockActiveHint: 'Currently checking',
preBlockChecked: 'Checked',
preBlockCheckedHint: 'Entered pre-block path',
preBlockAllowed: 'Allowed',
preBlockAllowedHint: 'No block triggered',
preBlockBlocked: 'Blocked',
preBlockBlockedHint: 'Rejected after hit',
preBlockErrors: 'Audit Errors',
preBlockErrorsHint: 'Failed or no usable key',
preBlockAvgLatency: 'Avg Latency',
preBlockAvgLatencyHint: 'Synchronous path average',
preBlockAPIKeyLoad: 'Audit Key Load',
preBlockAPIKeyLoadHint: 'Synchronous pre-block checks round-robin usable audit keys directly.',
preBlockAPIKeyLoadSummary: 'Sync active {active} / usable keys {available}, {total} total, worker: {workerActive} / {workerTotal}',
preBlockAPIKeyTotals: 'Total {total}, success {success}, errors {errors}',
preBlockAPIKeyLoadEmpty: 'No audit key load data yet',
preBlockKeyActiveShort: 'Active',
preBlockKeyTotalShort: 'Total',
preBlockKeyAvgShort: 'Avg',
preBlockKeyLastShort: 'Last',
workerStatus: 'Worker Runtime', workerStatus: 'Worker Runtime',
workerStatusHint: 'Queue and worker pool status for asynchronous observation tasks.', workerStatusHint: 'Queue and worker pool status for async audit tasks and pre-block record tasks, excluding synchronous pre-block checks.',
workerPool: 'Worker Pool', workerPool: 'Worker Pool',
workerPoolMeta: '{active} processing, {idle} idle and ready, {total} total', workerPoolMeta: '{active} processing, {idle} idle and ready, {total} total',
queueUsage: 'Queue Usage', queueUsage: 'Queue Usage',
activeWorkers: 'Processing', activeWorkers: 'Processing',
idleWorkers: 'Idle Ready', idleWorkers: 'Idle Ready',
workerActive: 'Processing an asynchronous audit task', workerActive: 'Processing an async audit or record task',
workerIdle: 'Started, idle and ready', workerIdle: 'Started, idle and ready',
workerDisabled: 'Risk control or content audit is disabled', workerDisabled: 'Risk control or content audit is disabled',
processed: 'Processed', processed: 'Processed',

View File

@ -2676,14 +2676,37 @@ export default {
modelFilterIncludeSummary: '仅 {count} 个模型生效', modelFilterIncludeSummary: '仅 {count} 个模型生效',
modelFilterExcludeSummary: '排除 {count} 个模型', modelFilterExcludeSummary: '排除 {count} 个模型',
emptyLogs: '暂无审核记录', emptyLogs: '暂无审核记录',
preBlockSyncStatus: '前置拦截同步状态',
preBlockSyncHint: '同步审核链路的实时计数,不包含异步写记录任务。',
preBlockActive: '同步处理中',
preBlockActiveHint: '当前正在审核',
preBlockChecked: '已检查',
preBlockCheckedHint: '进入前置拦截链路',
preBlockAllowed: '已放行',
preBlockAllowedHint: '未触发拦截',
preBlockBlocked: '已拦截',
preBlockBlockedHint: '命中后拒绝请求',
preBlockErrors: '审核异常',
preBlockErrorsHint: '失败或无可用 Key',
preBlockAvgLatency: '平均耗时',
preBlockAvgLatencyHint: '同步链路平均值',
preBlockAPIKeyLoad: '审核 Key 负载',
preBlockAPIKeyLoadHint: '同步前置拦截直接轮询可用审核 Key。',
preBlockAPIKeyLoadSummary: '同步并发 {active} / 可用 Key {available},累计 {total} 次worker{workerActive} / {workerTotal}',
preBlockAPIKeyTotals: '累计 {total},成功 {success},异常 {errors}',
preBlockAPIKeyLoadEmpty: '暂无审核 Key 负载数据',
preBlockKeyActiveShort: '并发',
preBlockKeyTotalShort: '累计',
preBlockKeyAvgShort: '平均',
preBlockKeyLastShort: '最近',
workerStatus: 'Worker 运行状态', workerStatus: 'Worker 运行状态',
workerStatusHint: '异步观察任务的队列和 worker 池状态。', workerStatusHint: '异步审计任务和前置拦截记录任务的队列与 Worker 池状态,不包含同步前置拦截审核请求。',
workerPool: 'Worker 池', workerPool: 'Worker 池',
workerPoolMeta: '{active} 个处理中,{idle} 个空闲可用,共 {total} 个', workerPoolMeta: '{active} 个处理中,{idle} 个空闲可用,共 {total} 个',
queueUsage: '队列占用', queueUsage: '队列占用',
activeWorkers: '处理中', activeWorkers: '处理中',
idleWorkers: '空闲可用', idleWorkers: '空闲可用',
workerActive: '正在处理异步审计任务', workerActive: '正在处理异步审计或记录任务',
workerIdle: '已启动,当前空闲可用', workerIdle: '已启动,当前空闲可用',
workerDisabled: '风控或内容审计未启用', workerDisabled: '风控或内容审计未启用',
processed: '已处理', processed: '已处理',

View File

@ -53,7 +53,105 @@
</div> </div>
</div> </div>
<div class="card"> <div
v-if="showPreBlockRuntimeCard"
data-test="pre-block-runtime-cards"
class="grid grid-cols-1 gap-6 xl:grid-cols-[minmax(0,520px)_minmax(0,1fr)]"
>
<div data-test="pre-block-sync-card" class="card">
<div class="flex flex-col gap-4 border-b border-gray-100 px-6 py-4 dark:border-dark-700 lg:flex-row lg:items-center lg:justify-between">
<div>
<h2 class="text-lg font-semibold text-gray-900 dark:text-white">{{ t('admin.riskControl.preBlockSyncStatus') }}</h2>
<p class="mt-1 text-sm text-gray-500 dark:text-gray-400">{{ t('admin.riskControl.preBlockSyncHint') }}</p>
</div>
<span class="inline-flex w-fit items-center rounded-full bg-gray-100 px-2.5 py-1 text-xs font-medium text-gray-600 dark:bg-dark-700 dark:text-gray-300">
{{ modeLabel(status?.mode ?? configForm.mode) }}
</span>
</div>
<div class="p-6">
<div data-test="pre-block-metric-grid" class="grid grid-cols-2 gap-3 md:grid-cols-3">
<div
v-for="item in preBlockMetricItems"
:key="item.key"
class="rounded-lg p-4"
:class="item.class"
>
<p class="text-xs text-gray-500 dark:text-gray-400">{{ item.label }}</p>
<p class="mt-2 truncate text-2xl font-semibold leading-8" :class="item.valueClass">{{ item.value }}</p>
<p v-if="item.meta" class="mt-1 truncate text-xs text-gray-500 dark:text-gray-400">{{ item.meta }}</p>
</div>
</div>
</div>
</div>
<div data-test="pre-block-api-key-load-card" class="card">
<div class="flex flex-col gap-4 border-b border-gray-100 px-6 py-4 dark:border-dark-700 lg:flex-row lg:items-center lg:justify-between">
<div>
<h2 class="text-lg font-semibold text-gray-900 dark:text-white">{{ t('admin.riskControl.preBlockAPIKeyLoad') }}</h2>
<p class="mt-1 text-sm text-gray-500 dark:text-gray-400">
{{ t('admin.riskControl.preBlockAPIKeyLoadHint') }}
</p>
</div>
<span class="inline-flex w-fit items-center rounded-full bg-gray-100 px-2.5 py-1 text-xs font-medium text-gray-600 dark:bg-dark-700 dark:text-gray-300">
{{ preBlockAPIKeyLoadSummaryText }}
</span>
</div>
<div class="p-6">
<div
v-if="preBlockAPIKeyLoads.length > 0"
data-test="pre-block-api-key-load-list"
class="max-h-[280px] space-y-3 overflow-y-auto pr-1"
>
<div
v-for="item in preBlockAPIKeyLoads"
:key="item.key_hash || item.index"
class="rounded-lg bg-gray-50 p-3 dark:bg-dark-700/50"
>
<div class="flex min-w-0 flex-col gap-2 sm:flex-row sm:items-center sm:justify-between">
<div class="min-w-0">
<div class="flex min-w-0 items-center gap-2">
<span class="font-mono text-sm font-semibold text-gray-900 dark:text-white">#{{ item.index + 1 }}</span>
<span class="truncate font-mono text-sm text-gray-700 dark:text-gray-200">{{ item.masked || '-' }}</span>
<span class="h-2 w-2 flex-shrink-0 rounded-full" :class="apiKeyStatusDotClass(item.status)"></span>
</div>
<p class="mt-1 text-xs text-gray-500 dark:text-gray-400">
{{ t('admin.riskControl.preBlockAPIKeyTotals', { total: formatNumber(item.total), success: formatNumber(item.success), errors: formatNumber(item.errors) }) }}
</p>
</div>
<div class="grid grid-cols-4 gap-2 text-right text-xs text-gray-500 dark:text-gray-400 sm:min-w-[280px]">
<div>
<p>{{ t('admin.riskControl.preBlockKeyActiveShort') }}</p>
<p class="mt-1 text-sm font-semibold text-sky-700 dark:text-sky-300">{{ formatNumber(item.active) }}</p>
</div>
<div>
<p>{{ t('admin.riskControl.preBlockKeyTotalShort') }}</p>
<p class="mt-1 text-sm font-semibold text-gray-900 dark:text-white">{{ formatNumber(item.total) }}</p>
</div>
<div>
<p>{{ t('admin.riskControl.preBlockKeyAvgShort') }}</p>
<p class="mt-1 text-sm font-semibold text-gray-900 dark:text-white">{{ formatNumber(item.avg_latency_ms) }} ms</p>
</div>
<div>
<p>{{ t('admin.riskControl.preBlockKeyLastShort') }}</p>
<p class="mt-1 text-sm font-semibold text-gray-900 dark:text-white">{{ formatNumber(item.last_latency_ms) }} ms</p>
</div>
</div>
</div>
<div class="mt-3 h-1.5 overflow-hidden rounded-full bg-white dark:bg-dark-900">
<div class="h-full rounded-full bg-sky-500" :style="{ width: preBlockAPIKeyLoadWidth(item.total) }"></div>
</div>
</div>
</div>
<p v-else class="rounded-lg bg-gray-50 p-4 text-sm text-gray-500 dark:bg-dark-700/50 dark:text-gray-400">
{{ t('admin.riskControl.preBlockAPIKeyLoadEmpty') }}
</p>
</div>
</div>
</div>
<div v-if="showWorkerRuntimeCard" class="card">
<div class="flex flex-col gap-4 border-b border-gray-100 px-6 py-4 dark:border-dark-700 lg:flex-row lg:items-center lg:justify-between"> <div class="flex flex-col gap-4 border-b border-gray-100 px-6 py-4 dark:border-dark-700 lg:flex-row lg:items-center lg:justify-between">
<div> <div>
<h2 class="text-lg font-semibold text-gray-900 dark:text-white">{{ t('admin.riskControl.workerStatus') }}</h2> <h2 class="text-lg font-semibold text-gray-900 dark:text-white">{{ t('admin.riskControl.workerStatus') }}</h2>
@ -1013,6 +1111,7 @@ import Pagination from '@/components/common/Pagination.vue'
import ModelWhitelistSelector from '@/components/account/ModelWhitelistSelector.vue' import ModelWhitelistSelector from '@/components/account/ModelWhitelistSelector.vue'
import { adminAPI } from '@/api/admin' import { adminAPI } from '@/api/admin'
import type { import type {
ContentModerationAPIKeyLoad,
ContentModerationAPIKeyStatus, ContentModerationAPIKeyStatus,
ContentModerationConfig, ContentModerationConfig,
ContentModerationLog, ContentModerationLog,
@ -1472,6 +1571,81 @@ const queueUsageStyle = computed(() => ({
width: queueUsagePercent.value, width: queueUsagePercent.value,
})) }))
const runtimeMode = computed<ModerationMode>(() => status.value?.mode ?? configForm.mode)
const showPreBlockRuntimeCard = computed(() => runtimeMode.value === 'pre_block')
const showWorkerRuntimeCard = computed(() => runtimeMode.value === 'observe')
const preBlockMetricItems = computed(() => [
{
key: 'active',
label: t('admin.riskControl.preBlockActive'),
value: formatNumber(status.value?.pre_block_active ?? 0),
meta: t('admin.riskControl.preBlockActiveHint'),
class: 'bg-sky-50 dark:bg-sky-900/10',
valueClass: 'text-sky-700 dark:text-sky-300',
},
{
key: 'checked',
label: t('admin.riskControl.preBlockChecked'),
value: formatNumber(status.value?.pre_block_checked ?? 0),
meta: t('admin.riskControl.preBlockCheckedHint'),
class: 'bg-gray-50 dark:bg-dark-700/50',
valueClass: 'text-gray-900 dark:text-white',
},
{
key: 'allowed',
label: t('admin.riskControl.preBlockAllowed'),
value: formatNumber(status.value?.pre_block_allowed ?? 0),
meta: t('admin.riskControl.preBlockAllowedHint'),
class: 'bg-emerald-50 dark:bg-emerald-900/10',
valueClass: 'text-emerald-700 dark:text-emerald-300',
},
{
key: 'blocked',
label: t('admin.riskControl.preBlockBlocked'),
value: formatNumber(status.value?.pre_block_blocked ?? 0),
meta: t('admin.riskControl.preBlockBlockedHint'),
class: 'bg-rose-50 dark:bg-rose-900/10',
valueClass: 'text-rose-700 dark:text-rose-300',
},
{
key: 'errors',
label: t('admin.riskControl.preBlockErrors'),
value: formatNumber(status.value?.pre_block_errors ?? 0),
meta: t('admin.riskControl.preBlockErrorsHint'),
class: 'bg-amber-50 dark:bg-amber-900/10',
valueClass: 'text-amber-700 dark:text-amber-300',
},
{
key: 'latency',
label: t('admin.riskControl.preBlockAvgLatency'),
value: `${formatNumber(status.value?.pre_block_avg_latency_ms ?? 0)} ms`,
meta: t('admin.riskControl.preBlockAvgLatencyHint'),
class: 'bg-violet-50 dark:bg-violet-900/10',
valueClass: 'text-violet-700 dark:text-violet-300',
},
])
const preBlockAPIKeyLoads = computed<ContentModerationAPIKeyLoad[]>(() => (
[...(status.value?.pre_block_api_key_loads ?? [])].sort((a, b) => a.index - b.index)
))
const preBlockAPIKeyMaxTotal = computed(() => Math.max(1, ...preBlockAPIKeyLoads.value.map((item) => item.total || 0)))
const preBlockAPIKeyLoadSummaryText = computed(() => t('admin.riskControl.preBlockAPIKeyLoadSummary', {
active: formatNumber(status.value?.pre_block_api_key_active ?? 0),
available: formatNumber(status.value?.pre_block_api_key_available_count ?? 0),
total: formatNumber(status.value?.pre_block_api_key_total_calls ?? 0),
workerActive: formatNumber(status.value?.active_workers ?? 0),
workerTotal: formatNumber(status.value?.worker_count ?? configForm.worker_count),
}))
function preBlockAPIKeyLoadWidth(total: number): string {
return `${Math.min(100, Math.max(0, (total / preBlockAPIKeyMaxTotal.value) * 100)).toFixed(1)}%`
}
const workerSlots = computed(() => { const workerSlots = computed(() => {
const total = Math.max(0, status.value?.worker_count ?? configForm.worker_count) const total = Math.max(0, status.value?.worker_count ?? configForm.worker_count)
const active = Math.max(0, status.value?.active_workers ?? 0) const active = Math.max(0, status.value?.active_workers ?? 0)

View File

@ -58,8 +58,12 @@ vi.mock('vue-i18n', async () => {
return { return {
...actual, ...actual,
useI18n: () => ({ useI18n: () => ({
t: (key: string, params?: Record<string, string | number>) => t: (key: string, params?: Record<string, string | number>) => {
key.replace(/\{(\w+)\}/g, (_, token) => String(params?.[token] ?? `{${token}}`)), if (key === 'admin.riskControl.preBlockAPIKeyLoadSummary') {
return `同步并发 ${params?.active} / 可用 Key ${params?.available},累计 ${params?.total}worker${params?.workerActive} / ${params?.workerTotal}`
}
return key.replace(/\{(\w+)\}/g, (_, token) => String(params?.[token] ?? `{${token}}`))
},
}), }),
} }
}) })
@ -118,6 +122,16 @@ const runtimeStatus = () => ({
dropped: 0, dropped: 0,
processed: 0, processed: 0,
errors: 0, errors: 0,
pre_block_active: 0,
pre_block_checked: 0,
pre_block_allowed: 0,
pre_block_blocked: 0,
pre_block_errors: 0,
pre_block_avg_latency_ms: 0,
pre_block_api_key_active: 0,
pre_block_api_key_available_count: 0,
pre_block_api_key_total_calls: 0,
pre_block_api_key_loads: [],
api_key_statuses: [], api_key_statuses: [],
flagged_hash_count: 0, flagged_hash_count: 0,
last_cleanup_deleted_hit: 0, last_cleanup_deleted_hit: 0,
@ -261,4 +275,133 @@ describe('admin RiskControlView', () => {
})) }))
expect(showError).not.toHaveBeenCalled() expect(showError).not.toHaveBeenCalled()
}) })
it('describes worker runtime as async audit and pre-block record processing', async () => {
getStatus.mockResolvedValue({
...runtimeStatus(),
mode: 'observe',
processed: 12,
queue_length: 2,
})
const wrapper = mount(RiskControlView, {
global: {
stubs: {
AppLayout: AppLayoutStub,
BaseDialog: BaseDialogStub,
Icon: true,
Select: true,
Toggle: true,
Pagination: true,
ModelWhitelistSelector: ModelWhitelistSelectorStub,
},
},
})
await flushPromises()
expect(wrapper.text()).toContain('admin.riskControl.workerStatusHint')
expect(wrapper.text()).not.toContain('admin.riskControl.preBlockSyncStatus')
expect(wrapper.text()).toContain('admin.riskControl.records')
expect(wrapper.text()).toContain('12')
expect(wrapper.text()).toContain('2 / 32,768')
})
it('shows pre-block synchronous moderation metrics separately from worker queue', async () => {
getStatus.mockResolvedValue({
...runtimeStatus(),
pre_block_active: 2,
pre_block_checked: 128,
pre_block_allowed: 120,
pre_block_blocked: 8,
pre_block_errors: 1,
pre_block_avg_latency_ms: 86,
pre_block_api_key_active: 2,
pre_block_api_key_available_count: 2,
pre_block_api_key_total_calls: 128,
active_workers: 3,
worker_count: 7,
pre_block_api_key_loads: [
{
index: 0,
key_hash: 'hash-one',
masked: 'sk-...one',
status: 'ok',
active: 1,
total: 72,
success: 70,
errors: 2,
avg_latency_ms: 84,
last_latency_ms: 80,
last_http_status: 200,
},
{
index: 1,
key_hash: 'hash-two',
masked: 'sk-...two',
status: 'ok',
active: 1,
total: 56,
success: 56,
errors: 0,
avg_latency_ms: 90,
last_latency_ms: 92,
last_http_status: 200,
},
],
})
const wrapper = mount(RiskControlView, {
global: {
stubs: {
AppLayout: AppLayoutStub,
BaseDialog: BaseDialogStub,
Icon: true,
Select: true,
Toggle: true,
Pagination: true,
ModelWhitelistSelector: ModelWhitelistSelectorStub,
},
},
})
await flushPromises()
expect(wrapper.text()).toContain('admin.riskControl.preBlockSyncStatus')
expect(wrapper.text()).toContain('admin.riskControl.preBlockSyncHint')
expect(wrapper.text()).not.toContain('admin.riskControl.workerStatus')
expect(wrapper.text()).toContain('admin.riskControl.records')
expect(wrapper.text()).toContain('128')
expect(wrapper.text()).toContain('120')
expect(wrapper.text()).toContain('8')
expect(wrapper.text()).toContain('86 ms')
expect(wrapper.text()).toContain('admin.riskControl.preBlockAPIKeyLoad')
expect(wrapper.text()).toContain('sk-...one')
expect(wrapper.text()).toContain('sk-...two')
expect(wrapper.text()).toContain('72')
expect(wrapper.text()).toContain('56')
expect(wrapper.text()).toContain('同步并发 2 / 可用 Key 2累计 128 次worker3 / 7')
const runtimeCards = wrapper.get('[data-test="pre-block-runtime-cards"]')
const syncCard = wrapper.get('[data-test="pre-block-sync-card"]')
const apiKeyLoadCard = wrapper.get('[data-test="pre-block-api-key-load-card"]')
expect(runtimeCards.classes()).toEqual(expect.arrayContaining([
'grid',
'grid-cols-1',
'xl:grid-cols-[minmax(0,520px)_minmax(0,1fr)]',
]))
expect(syncCard.element.parentElement).toBe(runtimeCards.element)
expect(apiKeyLoadCard.element.parentElement).toBe(runtimeCards.element)
expect(syncCard.classes()).toContain('card')
expect(apiKeyLoadCard.classes()).toContain('card')
expect(syncCard.get('h2').text()).toBe('admin.riskControl.preBlockSyncStatus')
expect(syncCard.text()).toContain('admin.riskControl.preBlockSyncHint')
expect(apiKeyLoadCard.get('h2').text()).toBe('admin.riskControl.preBlockAPIKeyLoad')
expect(apiKeyLoadCard.text()).toContain('admin.riskControl.preBlockAPIKeyLoadHint')
expect(wrapper.get('[data-test="pre-block-api-key-load-list"]').classes()).toEqual(expect.arrayContaining([
'max-h-[280px]',
'overflow-y-auto',
]))
})
}) })