From ead471d64bd7ebfc7769c36fa33d0d46bb015f80 Mon Sep 17 00:00:00 2001 From: wucm667 Date: Fri, 29 May 2026 10:38:00 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat(account):=20=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E6=8C=89=205h/7d=20=E7=94=A8=E9=87=8F=E9=98=88=E5=80=BC?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E6=9A=82=E5=81=9C=E8=B4=A6=E5=8F=B7=E8=B0=83?= =?UTF-8?q?=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../internal/repository/scheduler_cache.go | 6 + .../repository/scheduler_cache_unit_test.go | 19 +++ .../service/openai_account_scheduler.go | 2 +- .../service/openai_account_scheduler_test.go | 111 ++++++++++++++ .../service/openai_gateway_service.go | 144 +++++++++++++++++- backend/internal/service/ops_settings.go | 19 +++ .../service/ops_settings_advanced_test.go | 16 ++ .../internal/service/ops_settings_models.go | 28 ++-- backend/internal/service/setting_service.go | 81 ++++++++++ .../components/account/EditAccountModal.vue | 64 ++++++-- .../__tests__/EditAccountModal.spec.ts | 22 +++ frontend/src/i18n/locales/en.ts | 3 + frontend/src/i18n/locales/zh.ts | 3 + 13 files changed, 495 insertions(+), 23 deletions(-) diff --git a/backend/internal/repository/scheduler_cache.go b/backend/internal/repository/scheduler_cache.go index ab01a863..ec8c72dc 100644 --- a/backend/internal/repository/scheduler_cache.go +++ b/backend/internal/repository/scheduler_cache.go @@ -548,6 +548,12 @@ func filterSchedulerExtra(extra map[string]any) map[string]any { "openai_ws_force_http", "openai_responses_mode", "openai_responses_supported", + "codex_5h_used_percent", + "codex_7d_used_percent", + "auto_pause_5h_threshold", + "auto_pause_7d_threshold", + "auto_pause_5h_limit", + "auto_pause_7d_limit", } filtered := make(map[string]any) for _, key := range keys { diff --git a/backend/internal/repository/scheduler_cache_unit_test.go b/backend/internal/repository/scheduler_cache_unit_test.go index 86de87c7..fabc6bad 100644 --- a/backend/internal/repository/scheduler_cache_unit_test.go +++ b/backend/internal/repository/scheduler_cache_unit_test.go @@ -75,3 +75,22 @@ func TestBuildSchedulerMetadataAccount_KeepsSlimGroupMembership(t *testing.T) { require.Equal(t, int64(11), got.AccountGroups[1].GroupID) require.Nil(t, got.Groups) } + +func TestBuildSchedulerMetadataAccount_KeepsQuotaAutoPauseFields(t *testing.T) { + account := service.Account{ + ID: 88, + Extra: map[string]any{ + "codex_5h_used_percent": 12.34, + "codex_7d_used_percent": 56.78, + "auto_pause_5h_threshold": 0.95, + "auto_pause_7d_threshold": 0.96, + }, + } + + got := buildSchedulerMetadataAccount(account) + + require.Equal(t, 12.34, got.Extra["codex_5h_used_percent"]) + require.Equal(t, 56.78, got.Extra["codex_7d_used_percent"]) + require.Equal(t, 0.95, got.Extra["auto_pause_5h_threshold"]) + require.Equal(t, 0.96, got.Extra["auto_pause_7d_threshold"]) +} diff --git a/backend/internal/service/openai_account_scheduler.go b/backend/internal/service/openai_account_scheduler.go index 1eca08b1..fd28fa86 100644 --- a/backend/internal/service/openai_account_scheduler.go +++ b/backend/internal/service/openai_account_scheduler.go @@ -370,7 +370,6 @@ func (s *defaultOpenAIAccountScheduler) selectBySessionHash( _ = s.service.deleteStickySessionAccountID(ctx, req.GroupID, sessionHash) return nil, nil } - result, acquireErr := s.service.tryAcquireAccountSlot(ctx, accountID, account.Concurrency) if acquireErr == nil && result != nil && result.Acquired { _ = s.service.refreshStickySessionTTL(ctx, req.GroupID, sessionHash, s.service.openAIWSSessionStickyTTL()) @@ -1154,6 +1153,7 @@ func (s *OpenAIGatewayService) selectAccountWithScheduler( requiredImageCapability OpenAIImagesCapability, requireCompact bool, ) (*AccountSelectionResult, OpenAIAccountScheduleDecision, error) { + ctx = s.withOpenAIQuotaAutoPauseContext(ctx) decision := OpenAIAccountScheduleDecision{} scheduler := s.getOpenAIAccountScheduler(ctx) if scheduler == nil { diff --git a/backend/internal/service/openai_account_scheduler_test.go b/backend/internal/service/openai_account_scheduler_test.go index fedf7e9c..37810870 100644 --- a/backend/internal/service/openai_account_scheduler_test.go +++ b/backend/internal/service/openai_account_scheduler_test.go @@ -691,6 +691,117 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_SessionStickyRateLimite require.Equal(t, openAIAccountScheduleLayerLoadBalance, decision.Layer) } +func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_AutoPauseBy5hThreshold(t *testing.T) { + ctx := context.Background() + primary := Account{ + ID: 35001, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_5h_used_percent": 95.0, + "auto_pause_5h_threshold": 0.95, + "auto_pause_5h_limit": 100, + }, + } + secondary := Account{ID: 35002, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + svc := &OpenAIGatewayService{accountRepo: schedulerTestOpenAIAccountRepo{accounts: []Account{primary, secondary}}, cfg: &config.Config{}} + + account, err := svc.SelectAccountForModelWithExclusions(ctx, nil, "", "gpt-5.1", nil) + require.NoError(t, err) + require.NotNil(t, account) + require.Equal(t, int64(35002), account.ID) +} + +func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_AllowsBelow5hThreshold(t *testing.T) { + ctx := context.Background() + primary := Account{ + ID: 35101, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_5h_used_percent": 80.0, + "auto_pause_5h_threshold": 0.95, + "auto_pause_5h_limit": 100, + }, + } + secondary := Account{ID: 35102, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + svc := &OpenAIGatewayService{accountRepo: schedulerTestOpenAIAccountRepo{accounts: []Account{primary, secondary}}, cfg: &config.Config{}} + + account, err := svc.SelectAccountForModelWithExclusions(ctx, nil, "", "gpt-5.1", nil) + require.NoError(t, err) + require.NotNil(t, account) + require.Equal(t, int64(35101), account.ID) +} + +func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_AutoPauseBy7dThreshold(t *testing.T) { + ctx := context.Background() + primary := Account{ + ID: 35201, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_7d_used_percent": 95.0, + "auto_pause_7d_threshold": 0.95, + "auto_pause_7d_limit": 200, + }, + } + secondary := Account{ID: 35202, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + svc := &OpenAIGatewayService{accountRepo: schedulerTestOpenAIAccountRepo{accounts: []Account{primary, secondary}}, cfg: &config.Config{}} + + account, err := svc.SelectAccountForModelWithExclusions(ctx, nil, "", "gpt-5.1", nil) + require.NoError(t, err) + require.NotNil(t, account) + require.Equal(t, int64(35202), account.ID) +} + +func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_UnconfiguredThresholdKeepsLegacyBehavior(t *testing.T) { + ctx := context.Background() + primary := Account{ID: 35301, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 0, Extra: map[string]any{"codex_5h_used_percent": 99.0, "codex_7d_used_percent": 99.0}} + secondary := Account{ID: 35302, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + svc := &OpenAIGatewayService{accountRepo: schedulerTestOpenAIAccountRepo{accounts: []Account{primary, secondary}}, cfg: &config.Config{}} + + account, err := svc.SelectAccountForModelWithExclusions(ctx, nil, "", "gpt-5.1", nil) + require.NoError(t, err) + require.NotNil(t, account) + require.Equal(t, int64(35301), account.ID) +} + +func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_UsesGlobalDefaultThreshold(t *testing.T) { + ctx := withOpenAIQuotaAutoPauseSettings(context.Background(), OpsOpenAIAccountQuotaAutoPauseSettings{DefaultThreshold5h: 0.95}) + primary := Account{ + ID: 35401, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_5h_used_percent": 95.0, + "auto_pause_5h_limit": 100, + }, + } + secondary := Account{ID: 35402, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + svc := &OpenAIGatewayService{accountRepo: schedulerTestOpenAIAccountRepo{accounts: []Account{primary, secondary}}, cfg: &config.Config{}} + + account, err := svc.SelectAccountForModelWithExclusions(ctx, nil, "", "gpt-5.1", nil) + require.NoError(t, err) + require.NotNil(t, account) + require.Equal(t, int64(35402), account.ID) +} + func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_SkipsFreshlyRateLimitedSnapshotCandidate(t *testing.T) { ctx := context.Background() groupID := int64(10102) diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index 623a64e9..268f985c 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -1290,7 +1290,7 @@ func (s *OpenAIGatewayService) SelectAccountForModel(ctx context.Context, groupI // SelectAccountForModelWithExclusions selects an account supporting the requested model while excluding specified accounts. // SelectAccountForModelWithExclusions 选择支持指定模型的账号,同时排除指定的账号。 func (s *OpenAIGatewayService) SelectAccountForModelWithExclusions(ctx context.Context, groupID *int64, sessionHash string, requestedModel string, excludedIDs map[int64]struct{}) (*Account, error) { - return s.selectAccountForModelWithExclusions(ctx, groupID, sessionHash, requestedModel, excludedIDs, false, 0, "") + return s.selectAccountForModelWithExclusions(s.withOpenAIQuotaAutoPauseContext(ctx), groupID, sessionHash, requestedModel, excludedIDs, false, 0, "") } // noAvailableOpenAISelectionError builds the standard "no account available" error @@ -1327,6 +1327,18 @@ func isOpenAIAccountEligibleForRequest(ctx context.Context, account *Account, re if account == nil || !account.IsOpenAI() || !account.IsSchedulableForModelWithContext(ctx, requestedModel) { return false } + if paused, reason := shouldAutoPauseOpenAIAccountByQuota(ctx, account); paused { + slog.Info("account_auto_paused_by_quota", + "account_id", account.ID, + "usage_5h_percent", readOpenAIQuotaUsedPercent(account.Extra, "5h"), + "usage_7d_percent", readOpenAIQuotaUsedPercent(account.Extra, "7d"), + "threshold_type", reason.window, + "threshold", reason.threshold, + "limit", reason.limit, + "utilization", reason.utilization, + ) + return false + } if requestedModel != "" && !account.IsModelSupported(requestedModel) { return false } @@ -1339,6 +1351,134 @@ func isOpenAIAccountEligibleForRequest(ctx context.Context, account *Account, re return true } +type openAIQuotaAutoPauseDecision struct { + window string + threshold float64 + limit float64 + utilization float64 +} + +func shouldAutoPauseOpenAIAccountByQuota(ctx context.Context, account *Account) (bool, openAIQuotaAutoPauseDecision) { + if account == nil || !account.IsOpenAI() { + return false, openAIQuotaAutoPauseDecision{} + } + threshold5h, threshold7d := resolveOpenAIQuotaAutoPauseThresholds(ctx, account) + if threshold5h > 0 { + if utilization, limit, ok := resolveOpenAIQuotaUtilization(account.Extra, "5h"); ok { + if utilization >= threshold5h { + return true, openAIQuotaAutoPauseDecision{window: "5h", threshold: threshold5h, limit: limit, utilization: utilization} + } + } + } + if threshold7d > 0 { + if utilization, limit, ok := resolveOpenAIQuotaUtilization(account.Extra, "7d"); ok { + if utilization >= threshold7d { + return true, openAIQuotaAutoPauseDecision{window: "7d", threshold: threshold7d, limit: limit, utilization: utilization} + } + } + } + return false, openAIQuotaAutoPauseDecision{} +} + +func resolveOpenAIQuotaAutoPauseThresholds(ctx context.Context, account *Account) (float64, float64) { + threshold5h, _ := resolveAccountExtraNumber(account.Extra, "auto_pause_5h_threshold") + threshold7d, _ := resolveAccountExtraNumber(account.Extra, "auto_pause_7d_threshold") + threshold5h = clamp01(threshold5h) + threshold7d = clamp01(threshold7d) + if threshold5h > 0 && threshold7d > 0 { + return threshold5h, threshold7d + } + settings := openAIQuotaAutoPauseSettingsFromContext(ctx) + if threshold5h <= 0 { + threshold5h = clamp01(settings.DefaultThreshold5h) + } + if threshold7d <= 0 { + threshold7d = clamp01(settings.DefaultThreshold7d) + } + return threshold5h, threshold7d +} + +func resolveAccountExtraNumber(extra map[string]any, keys ...string) (float64, bool) { + if len(extra) == 0 { + return 0, false + } + for _, key := range keys { + value, ok := extra[key] + if !ok || value == nil { + continue + } + switch v := value.(type) { + case float64: + return v, true + case float32: + return float64(v), true + case int: + return float64(v), true + case int64: + return float64(v), true + case json.Number: + parsed, err := v.Float64() + if err == nil { + return parsed, true + } + case string: + parsed, err := strconv.ParseFloat(strings.TrimSpace(v), 64) + if err == nil { + return parsed, true + } + } + } + return 0, false +} + +func resolveOpenAIQuotaUtilization(extra map[string]any, window string) (float64, float64, bool) { + limitKeys := []string{"auto_pause_" + window + "_limit", "quota_" + window + "_limit", window + "_limit"} + if limit, ok := resolveAccountExtraNumber(extra, limitKeys...); ok && limit > 0 { + if usage, ok := resolveAccountExtraNumber(extra, "usage_"+window); ok && usage >= 0 { + return usage / limit, limit, true + } + } + usedPercent := readOpenAIQuotaUsedPercent(extra, window) + if usedPercent <= 0 { + return 0, 0, false + } + return usedPercent / 100, 100, true +} + +func readOpenAIQuotaUsedPercent(extra map[string]any, window string) float64 { + if len(extra) == 0 { + return 0 + } + if value, ok := resolveAccountExtraNumber(extra, "codex_"+window+"_used_percent"); ok { + return value + } + return 0 +} + +type openAIQuotaAutoPauseCtxKey struct{} + +func withOpenAIQuotaAutoPauseSettings(ctx context.Context, settings OpsOpenAIAccountQuotaAutoPauseSettings) context.Context { + if ctx == nil { + ctx = context.Background() + } + return context.WithValue(ctx, openAIQuotaAutoPauseCtxKey{}, settings) +} + +func openAIQuotaAutoPauseSettingsFromContext(ctx context.Context) OpsOpenAIAccountQuotaAutoPauseSettings { + if ctx == nil { + return OpsOpenAIAccountQuotaAutoPauseSettings{} + } + settings, _ := ctx.Value(openAIQuotaAutoPauseCtxKey{}).(OpsOpenAIAccountQuotaAutoPauseSettings) + return settings +} + +func (s *OpenAIGatewayService) withOpenAIQuotaAutoPauseContext(ctx context.Context) context.Context { + if s == nil || s.settingService == nil { + return ctx + } + return withOpenAIQuotaAutoPauseSettings(ctx, s.settingService.GetOpenAIQuotaAutoPauseSettings(ctx)) +} + // prioritizeOpenAICompactAccounts re-orders a slice so that accounts with known // compact support are tried first, followed by unknown, then explicitly unsupported. // The relative order within each tier is preserved. @@ -1587,7 +1727,7 @@ func (s *OpenAIGatewayService) isBetterAccount(candidate, current *Account) bool // SelectAccountWithLoadAwareness selects an account with load-awareness and wait plan. func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Context, groupID *int64, sessionHash string, requestedModel string, excludedIDs map[int64]struct{}) (*AccountSelectionResult, error) { - return s.selectAccountWithLoadAwareness(ctx, groupID, sessionHash, requestedModel, excludedIDs, false, "") + return s.selectAccountWithLoadAwareness(s.withOpenAIQuotaAutoPauseContext(ctx), groupID, sessionHash, requestedModel, excludedIDs, false, "") } func (s *OpenAIGatewayService) selectAccountWithLoadAwareness(ctx context.Context, groupID *int64, sessionHash string, requestedModel string, excludedIDs map[int64]struct{}, requireCompact bool, requiredCapability OpenAIEndpointCapability) (*AccountSelectionResult, error) { diff --git a/backend/internal/service/ops_settings.go b/backend/internal/service/ops_settings.go index 68c1d9dd..23e92e5a 100644 --- a/backend/internal/service/ops_settings.go +++ b/backend/internal/service/ops_settings.go @@ -369,6 +369,7 @@ func defaultOpsAdvancedSettings() *OpsAdvancedSettings { Aggregation: OpsAggregationSettings{ AggregationEnabled: false, }, + OpenAIAccountQuotaAutoPause: OpsOpenAIAccountQuotaAutoPauseSettings{}, IgnoreCountTokensErrors: true, // count_tokens 404 是预期行为,默认忽略 IgnoreContextCanceled: true, // Default to true - client disconnects are not errors IgnoreNoAvailableAccounts: false, // Default to false - this is a real routing issue @@ -384,6 +385,8 @@ func normalizeOpsAdvancedSettings(cfg *OpsAdvancedSettings) { if cfg == nil { return } + cfg.OpenAIAccountQuotaAutoPause.DefaultThreshold5h = clampOpsQuotaAutoPauseThreshold(cfg.OpenAIAccountQuotaAutoPause.DefaultThreshold5h) + cfg.OpenAIAccountQuotaAutoPause.DefaultThreshold7d = clampOpsQuotaAutoPauseThreshold(cfg.OpenAIAccountQuotaAutoPause.DefaultThreshold7d) cfg.DataRetention.CleanupSchedule = strings.TrimSpace(cfg.DataRetention.CleanupSchedule) if cfg.DataRetention.CleanupSchedule == "" { cfg.DataRetention.CleanupSchedule = opsCleanupDefaultSchedule @@ -405,6 +408,16 @@ func normalizeOpsAdvancedSettings(cfg *OpsAdvancedSettings) { } } +func clampOpsQuotaAutoPauseThreshold(value float64) float64 { + if value <= 0 { + return 0 + } + if value > 1 { + return 1 + } + return value +} + func validateOpsAdvancedSettings(cfg *OpsAdvancedSettings) error { if cfg == nil { return errors.New("invalid config") @@ -477,6 +490,12 @@ func (s *OpsService) UpdateOpsAdvancedSettings(ctx context.Context, cfg *OpsAdva if err := s.settingRepo.Set(ctx, SettingKeyOpsAdvancedSettings, string(raw)); err != nil { return nil, err } + cacheKey := openAIQuotaAutoPauseSettingsCacheKey(s.settingRepo) + openAIQuotaAutoPauseSettingsSF.Forget(cacheKey) + storeOpenAIQuotaAutoPauseSettingsCache(s.settingRepo, &cachedOpenAIQuotaAutoPauseSettings{ + settings: cfg.OpenAIAccountQuotaAutoPause, + expiresAt: time.Now().Add(openAIQuotaAutoPauseSettingsCacheTTL).UnixNano(), + }) // notify cleanup service to reload schedule/enabled. if s.cleanupReloader != nil { diff --git a/backend/internal/service/ops_settings_advanced_test.go b/backend/internal/service/ops_settings_advanced_test.go index 06cc545b..d8598fe0 100644 --- a/backend/internal/service/ops_settings_advanced_test.go +++ b/backend/internal/service/ops_settings_advanced_test.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "testing" + + "github.com/Wei-Shaw/sub2api/internal/config" ) func TestGetOpsAdvancedSettings_DefaultHidesOpenAITokenStats(t *testing.T) { @@ -95,3 +97,17 @@ func TestGetOpsAdvancedSettings_BackfillsNewDisplayFlagsFromDefaults(t *testing. t.Fatalf("DisplayAlertEvents = false, want true default backfill") } } + +func TestGetOpenAIQuotaAutoPauseSettings_ReadsDefaultsFromOpsAdvancedSettings(t *testing.T) { + repo := newRuntimeSettingRepoStub() + repo.values[SettingKeyOpsAdvancedSettings] = `{"openai_account_quota_auto_pause":{"default_threshold_5h":0.95,"default_threshold_7d":0.9}}` + svc := NewSettingService(repo, &config.Config{}) + + settings := svc.GetOpenAIQuotaAutoPauseSettings(context.Background()) + if settings.DefaultThreshold5h != 0.95 { + t.Fatalf("DefaultThreshold5h = %v, want 0.95", settings.DefaultThreshold5h) + } + if settings.DefaultThreshold7d != 0.9 { + t.Fatalf("DefaultThreshold7d = %v, want 0.9", settings.DefaultThreshold7d) + } +} diff --git a/backend/internal/service/ops_settings_models.go b/backend/internal/service/ops_settings_models.go index fa18b05f..4d459e2b 100644 --- a/backend/internal/service/ops_settings_models.go +++ b/backend/internal/service/ops_settings_models.go @@ -92,17 +92,23 @@ type OpsAlertRuntimeSettings struct { // OpsAdvancedSettings stores advanced ops configuration (data retention, aggregation). type OpsAdvancedSettings struct { - DataRetention OpsDataRetentionSettings `json:"data_retention"` - Aggregation OpsAggregationSettings `json:"aggregation"` - IgnoreCountTokensErrors bool `json:"ignore_count_tokens_errors"` - IgnoreContextCanceled bool `json:"ignore_context_canceled"` - IgnoreNoAvailableAccounts bool `json:"ignore_no_available_accounts"` - IgnoreInvalidApiKeyErrors bool `json:"ignore_invalid_api_key_errors"` - IgnoreInsufficientBalanceErrors bool `json:"ignore_insufficient_balance_errors"` - DisplayOpenAITokenStats bool `json:"display_openai_token_stats"` - DisplayAlertEvents bool `json:"display_alert_events"` - AutoRefreshEnabled bool `json:"auto_refresh_enabled"` - AutoRefreshIntervalSec int `json:"auto_refresh_interval_seconds"` + DataRetention OpsDataRetentionSettings `json:"data_retention"` + Aggregation OpsAggregationSettings `json:"aggregation"` + OpenAIAccountQuotaAutoPause OpsOpenAIAccountQuotaAutoPauseSettings `json:"openai_account_quota_auto_pause"` + IgnoreCountTokensErrors bool `json:"ignore_count_tokens_errors"` + IgnoreContextCanceled bool `json:"ignore_context_canceled"` + IgnoreNoAvailableAccounts bool `json:"ignore_no_available_accounts"` + IgnoreInvalidApiKeyErrors bool `json:"ignore_invalid_api_key_errors"` + IgnoreInsufficientBalanceErrors bool `json:"ignore_insufficient_balance_errors"` + DisplayOpenAITokenStats bool `json:"display_openai_token_stats"` + DisplayAlertEvents bool `json:"display_alert_events"` + AutoRefreshEnabled bool `json:"auto_refresh_enabled"` + AutoRefreshIntervalSec int `json:"auto_refresh_interval_seconds"` +} + +type OpsOpenAIAccountQuotaAutoPauseSettings struct { + DefaultThreshold5h float64 `json:"default_threshold_5h"` + DefaultThreshold7d float64 `json:"default_threshold_7d"` } type OpsDataRetentionSettings struct { diff --git a/backend/internal/service/setting_service.go b/backend/internal/service/setting_service.go index 08c0d045..40e98b88 100644 --- a/backend/internal/service/setting_service.go +++ b/backend/internal/service/setting_service.go @@ -14,6 +14,7 @@ import ( "sort" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -137,6 +138,11 @@ type cachedOpenAICodexUserAgent struct { expiresAt int64 // unix nano } +type cachedOpenAIQuotaAutoPauseSettings struct { + settings OpsOpenAIAccountQuotaAutoPauseSettings + expiresAt int64 +} + const openAICodexUserAgentCacheTTL = 60 * time.Second const openAICodexUserAgentErrorTTL = 5 * time.Second const openAICodexUserAgentDBTimeout = 5 * time.Second @@ -152,6 +158,33 @@ const openAIAllowCodexPluginCacheTTL = 60 * time.Second const openAIAllowCodexPluginErrorTTL = 5 * time.Second const openAIAllowCodexPluginDBTimeout = 5 * time.Second +const openAIQuotaAutoPauseSettingsCacheTTL = 60 * time.Second +const openAIQuotaAutoPauseSettingsErrorTTL = 5 * time.Second +const openAIQuotaAutoPauseSettingsDBTimeout = 5 * time.Second + +var openAIQuotaAutoPauseSettingsCache sync.Map // map[string]*cachedOpenAIQuotaAutoPauseSettings +var openAIQuotaAutoPauseSettingsSF singleflight.Group + +func openAIQuotaAutoPauseSettingsCacheKey(repo SettingRepository) string { + if repo == nil { + return "nil" + } + return fmt.Sprintf("%T:%p", repo, repo) +} + +func loadOpenAIQuotaAutoPauseSettingsCache(repo SettingRepository) (*cachedOpenAIQuotaAutoPauseSettings, bool) { + value, ok := openAIQuotaAutoPauseSettingsCache.Load(openAIQuotaAutoPauseSettingsCacheKey(repo)) + if !ok || value == nil { + return nil, false + } + cached, ok := value.(*cachedOpenAIQuotaAutoPauseSettings) + return cached, ok && cached != nil +} + +func storeOpenAIQuotaAutoPauseSettingsCache(repo SettingRepository, cached *cachedOpenAIQuotaAutoPauseSettings) { + openAIQuotaAutoPauseSettingsCache.Store(openAIQuotaAutoPauseSettingsCacheKey(repo), cached) +} + // DefaultSubscriptionGroupReader validates group references used by default subscriptions. type DefaultSubscriptionGroupReader interface { GetByID(ctx context.Context, id int64) (*Group, error) @@ -2027,6 +2060,9 @@ func (s *SettingService) refreshCachedSettings(settings *SystemSettings) { enabled: settings.OpenAIAdvancedSchedulerEnabled, expiresAt: time.Now().Add(openAIAdvancedSchedulerSettingCacheTTL).UnixNano(), }) + cacheKey := openAIQuotaAutoPauseSettingsCacheKey(s.settingRepo) + openAIQuotaAutoPauseSettingsSF.Forget(cacheKey) + openAIQuotaAutoPauseSettingsCache.Delete(cacheKey) if s.cfg != nil { s.cfg.SetTrustForwardedIPForAPIKeyACL(settings.APIKeyACLTrustForwardedIP) } @@ -4448,6 +4484,51 @@ func (s *SettingService) GetClaudeCodeVersionBounds(ctx context.Context) (min, m return b.min, b.max } +func (s *SettingService) GetOpenAIQuotaAutoPauseSettings(ctx context.Context) OpsOpenAIAccountQuotaAutoPauseSettings { + if cached, ok := loadOpenAIQuotaAutoPauseSettingsCache(s.settingRepo); ok { + if time.Now().UnixNano() < cached.expiresAt { + return cached.settings + } + } + + cacheKey := openAIQuotaAutoPauseSettingsCacheKey(s.settingRepo) + result, _, _ := openAIQuotaAutoPauseSettingsSF.Do(cacheKey, func() (any, error) { + if cached, ok := loadOpenAIQuotaAutoPauseSettingsCache(s.settingRepo); ok { + if time.Now().UnixNano() < cached.expiresAt { + return cached.settings, nil + } + } + + settings := OpsOpenAIAccountQuotaAutoPauseSettings{} + ttl := openAIQuotaAutoPauseSettingsCacheTTL + if s != nil && s.settingRepo != nil { + dbCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), openAIQuotaAutoPauseSettingsDBTimeout) + defer cancel() + raw, err := s.settingRepo.GetValue(dbCtx, SettingKeyOpsAdvancedSettings) + if err == nil { + cfg := defaultOpsAdvancedSettings() + if strings.TrimSpace(raw) != "" { + if jsonErr := json.Unmarshal([]byte(raw), cfg); jsonErr == nil { + normalizeOpsAdvancedSettings(cfg) + } + } + settings = cfg.OpenAIAccountQuotaAutoPause + } else { + ttl = openAIQuotaAutoPauseSettingsErrorTTL + } + } + + storeOpenAIQuotaAutoPauseSettingsCache(s.settingRepo, &cachedOpenAIQuotaAutoPauseSettings{ + settings: settings, + expiresAt: time.Now().Add(ttl).UnixNano(), + }) + return settings, nil + }) + + settings, _ := result.(OpsOpenAIAccountQuotaAutoPauseSettings) + return settings +} + // GetRectifierSettings 获取请求整流器配置 func (s *SettingService) GetRectifierSettings(ctx context.Context) (*RectifierSettings, error) { value, err := s.settingRepo.GetValue(ctx, SettingKeyRectifierSettings) diff --git a/frontend/src/components/account/EditAccountModal.vue b/frontend/src/components/account/EditAccountModal.vue index 515ba58f..470c0bfb 100644 --- a/frontend/src/components/account/EditAccountModal.vue +++ b/frontend/src/components/account/EditAccountModal.vue @@ -1787,6 +1787,38 @@ +
+
+ + +

{{ t('admin.accounts.autoPauseThresholdHint') }}

+
+
+ + +

{{ t('admin.accounts.autoPauseThresholdHint') }}

+
+
+
([]) const customErrorCodeInput = ref(null) const interceptWarmupRequests = ref(false) const autoPauseOnExpired = ref(false) +const autoPause5hThreshold = ref(null) +const autoPause7dThreshold = ref(null) const mixedScheduling = ref(false) // For antigravity accounts: enable mixed scheduling const allowOverages = ref(false) // For antigravity accounts: enable AI Credits overages const antigravityModelRestrictionMode = ref<'whitelist' | 'mapping'>('whitelist') @@ -2862,9 +2896,11 @@ const syncFormFromAccount = (newAccount: Account | null) => { // Load mixed scheduling setting (only for antigravity accounts) mixedScheduling.value = false allowOverages.value = false - const extra = newAccount.extra as Record | undefined - mixedScheduling.value = extra?.mixed_scheduling === true - allowOverages.value = extra?.allow_overages === true + const extra = newAccount.extra as Record | undefined + mixedScheduling.value = extra?.mixed_scheduling === true + allowOverages.value = extra?.allow_overages === true + autoPause5hThreshold.value = typeof extra?.auto_pause_5h_threshold === 'number' ? extra.auto_pause_5h_threshold * 100 : null + autoPause7dThreshold.value = typeof extra?.auto_pause_7d_threshold === 'number' ? extra.auto_pause_7d_threshold * 100 : null // Load OpenAI passthrough toggle (OpenAI OAuth/API Key) openaiPassthroughEnabled.value = false @@ -3987,9 +4023,9 @@ const handleSubmit = async () => { } // For OpenAI OAuth/API Key accounts, handle passthrough mode in extra - if (props.account.platform === 'openai' && (props.account.type === 'oauth' || props.account.type === 'apikey')) { - const currentExtra = (props.account.extra as Record) || {} - const newExtra: Record = { ...currentExtra } + if (props.account.platform === 'openai' && (props.account.type === 'oauth' || props.account.type === 'apikey')) { + const currentExtra = (props.account.extra as Record) || {} + const newExtra: Record = { ...currentExtra } const hadCodexCLIOnlyEnabled = currentExtra.codex_cli_only === true if (props.account.type === 'oauth') { newExtra.openai_oauth_responses_websockets_v2_mode = openaiOAuthResponsesWebSocketV2Mode.value @@ -4011,15 +4047,25 @@ const handleSubmit = async () => { } else { newExtra.openai_compact_mode = openAICompactMode.value } - if (props.account.type === 'apikey') { + if (props.account.type === 'apikey') { if (!openAITextGenerationCapabilityEnabled.value || openAIResponsesMode.value === 'auto') { delete newExtra.openai_responses_mode } else { newExtra.openai_responses_mode = openAIResponsesMode.value } - } + } + if (autoPause5hThreshold.value != null && autoPause5hThreshold.value > 0) { + newExtra.auto_pause_5h_threshold = autoPause5hThreshold.value / 100 + } else { + delete newExtra.auto_pause_5h_threshold + } + if (autoPause7dThreshold.value != null && autoPause7dThreshold.value > 0) { + newExtra.auto_pause_7d_threshold = autoPause7dThreshold.value / 100 + } else { + delete newExtra.auto_pause_7d_threshold + } - delete newExtra.codex_image_generation_bridge_enabled + delete newExtra.codex_image_generation_bridge_enabled if (codexImageGenerationBridgeMode.value === 'inherit') { delete newExtra.codex_image_generation_bridge } else { diff --git a/frontend/src/components/account/__tests__/EditAccountModal.spec.ts b/frontend/src/components/account/__tests__/EditAccountModal.spec.ts index 4561924f..6db63831 100644 --- a/frontend/src/components/account/__tests__/EditAccountModal.spec.ts +++ b/frontend/src/components/account/__tests__/EditAccountModal.spec.ts @@ -330,6 +330,28 @@ describe('EditAccountModal', () => { ]) }) + it('submits OpenAI quota auto-pause thresholds in extra', async () => { + const account = buildAccount() + account.extra = { + auto_pause_5h_threshold: 0.9, + auto_pause_7d_threshold: 0.8 + } + updateAccountMock.mockReset() + checkMixedChannelRiskMock.mockReset() + checkMixedChannelRiskMock.mockResolvedValue({ has_risk: false }) + updateAccountMock.mockResolvedValue(account) + + const wrapper = mountModal(account) + + await wrapper.get('[data-testid="auto-pause-5h-threshold"]').setValue('95') + await wrapper.get('[data-testid="auto-pause-7d-threshold"]').setValue('96') + await wrapper.get('form#edit-account-form').trigger('submit.prevent') + + expect(updateAccountMock).toHaveBeenCalledTimes(1) + expect(updateAccountMock.mock.calls[0]?.[1]?.extra?.auto_pause_5h_threshold).toBe(0.95) + expect(updateAccountMock.mock.calls[0]?.[1]?.extra?.auto_pause_7d_threshold).toBe(0.96) + }) + it('keeps at least one OpenAI APIKey endpoint capability selected', async () => { const account = buildAccount() updateAccountMock.mockReset() diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index b2aeb2f8..fa2e5a92 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -3475,6 +3475,9 @@ export default { 'When enabled, warmup requests like title generation will return mock responses without consuming upstream tokens', autoPauseOnExpired: 'Auto Pause On Expired', autoPauseOnExpiredDesc: 'When enabled, the account will auto pause scheduling after it expires', + autoPause5hThreshold: '5h Usage Threshold (%)', + autoPause7dThreshold: '7d Usage Threshold (%)', + autoPauseThresholdHint: 'Leave empty or set 0 to disable. Reaching the threshold only skips the account during scheduling and does not modify schedulable.', // Quota control (Anthropic OAuth/SetupToken only) quotaControl: { title: 'Quota Control', diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index 85d1feee..2364f9c4 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -3613,6 +3613,9 @@ export default { interceptWarmupRequestsDesc: '启用后,标题生成等预热请求将返回 mock 响应,不消耗上游 token', autoPauseOnExpired: '过期自动暂停调度', autoPauseOnExpiredDesc: '启用后,账号过期将自动暂停调度', + autoPause5hThreshold: '5h 用量阈值(%)', + autoPause7dThreshold: '7d 用量阈值(%)', + autoPauseThresholdHint: '填 0 或留空表示不启用;达到阈值后仅在调度时跳过账号,不修改 schedulable。', // Quota control (Anthropic OAuth/SetupToken only) quotaControl: { title: '配额控制', From 8b7a8227060e3eac20b5c0e331b50b2dc99f5e13 Mon Sep 17 00:00:00 2001 From: wucm667 Date: Fri, 29 May 2026 12:20:30 +0800 Subject: [PATCH 2/3] fix(account): address review on OpenAI quota auto-pause - gate previous_response_id sticky path with quota auto-pause check at both the snapshot and DB-recheck stages (previously bypassed, #1) - skip pausing when the usage window already reset to avoid a stale stuck-pause; carry codex_*_reset_at / reset_after_seconds / codex_usage_updated_at through the scheduler snapshot whitelist (#2) - remove the incomplete limit mode; percentage threshold only (#3) - add global default 5h/7d threshold inputs to the Ops settings dialog with validation and en/zh i18n (#4) - downgrade account_auto_paused_by_quota log from Info to Debug; it fires per-candidate on the scheduling hot path (#5) Co-Authored-By: Claude Opus 4.8 --- .../internal/repository/scheduler_cache.go | 7 +- .../repository/scheduler_cache_unit_test.go | 18 +++-- .../service/openai_account_scheduler_test.go | 58 +++++++++++++-- .../service/openai_gateway_service.go | 72 +++++++++++++------ .../service/openai_ws_account_sticky_test.go | 40 +++++++++++ .../internal/service/openai_ws_forwarder.go | 10 +++ frontend/src/api/admin/ops.ts | 6 ++ frontend/src/i18n/locales/en.ts | 8 ++- frontend/src/i18n/locales/zh.ts | 8 ++- .../ops/components/OpsSettingsDialog.vue | 65 +++++++++++++++++ 10 files changed, 257 insertions(+), 35 deletions(-) diff --git a/backend/internal/repository/scheduler_cache.go b/backend/internal/repository/scheduler_cache.go index ec8c72dc..ff3c4301 100644 --- a/backend/internal/repository/scheduler_cache.go +++ b/backend/internal/repository/scheduler_cache.go @@ -550,10 +550,13 @@ func filterSchedulerExtra(extra map[string]any) map[string]any { "openai_responses_supported", "codex_5h_used_percent", "codex_7d_used_percent", + "codex_5h_reset_at", + "codex_7d_reset_at", + "codex_5h_reset_after_seconds", + "codex_7d_reset_after_seconds", + "codex_usage_updated_at", "auto_pause_5h_threshold", "auto_pause_7d_threshold", - "auto_pause_5h_limit", - "auto_pause_7d_limit", } filtered := make(map[string]any) for _, key := range keys { diff --git a/backend/internal/repository/scheduler_cache_unit_test.go b/backend/internal/repository/scheduler_cache_unit_test.go index fabc6bad..9e4ec23e 100644 --- a/backend/internal/repository/scheduler_cache_unit_test.go +++ b/backend/internal/repository/scheduler_cache_unit_test.go @@ -80,10 +80,15 @@ func TestBuildSchedulerMetadataAccount_KeepsQuotaAutoPauseFields(t *testing.T) { account := service.Account{ ID: 88, Extra: map[string]any{ - "codex_5h_used_percent": 12.34, - "codex_7d_used_percent": 56.78, - "auto_pause_5h_threshold": 0.95, - "auto_pause_7d_threshold": 0.96, + "codex_5h_used_percent": 12.34, + "codex_7d_used_percent": 56.78, + "codex_5h_reset_at": "2026-05-29T10:00:00Z", + "codex_7d_reset_at": "2026-06-01T10:00:00Z", + "codex_5h_reset_after_seconds": 300, + "codex_7d_reset_after_seconds": 600, + "codex_usage_updated_at": "2026-05-29T09:00:00Z", + "auto_pause_5h_threshold": 0.95, + "auto_pause_7d_threshold": 0.96, }, } @@ -91,6 +96,11 @@ func TestBuildSchedulerMetadataAccount_KeepsQuotaAutoPauseFields(t *testing.T) { require.Equal(t, 12.34, got.Extra["codex_5h_used_percent"]) require.Equal(t, 56.78, got.Extra["codex_7d_used_percent"]) + require.Equal(t, "2026-05-29T10:00:00Z", got.Extra["codex_5h_reset_at"]) + require.Equal(t, "2026-06-01T10:00:00Z", got.Extra["codex_7d_reset_at"]) + require.Equal(t, 300, got.Extra["codex_5h_reset_after_seconds"]) + require.Equal(t, 600, got.Extra["codex_7d_reset_after_seconds"]) + require.Equal(t, "2026-05-29T09:00:00Z", got.Extra["codex_usage_updated_at"]) require.Equal(t, 0.95, got.Extra["auto_pause_5h_threshold"]) require.Equal(t, 0.96, got.Extra["auto_pause_7d_threshold"]) } diff --git a/backend/internal/service/openai_account_scheduler_test.go b/backend/internal/service/openai_account_scheduler_test.go index 37810870..531769a7 100644 --- a/backend/internal/service/openai_account_scheduler_test.go +++ b/backend/internal/service/openai_account_scheduler_test.go @@ -704,7 +704,6 @@ func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_AutoPauseBy5hT Extra: map[string]any{ "codex_5h_used_percent": 95.0, "auto_pause_5h_threshold": 0.95, - "auto_pause_5h_limit": 100, }, } secondary := Account{ID: 35002, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} @@ -729,7 +728,6 @@ func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_AllowsBelow5hT Extra: map[string]any{ "codex_5h_used_percent": 80.0, "auto_pause_5h_threshold": 0.95, - "auto_pause_5h_limit": 100, }, } secondary := Account{ID: 35102, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} @@ -754,7 +752,6 @@ func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_AutoPauseBy7dT Extra: map[string]any{ "codex_7d_used_percent": 95.0, "auto_pause_7d_threshold": 0.95, - "auto_pause_7d_limit": 200, }, } secondary := Account{ID: 35202, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} @@ -790,7 +787,6 @@ func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_UsesGlobalDefa Priority: 0, Extra: map[string]any{ "codex_5h_used_percent": 95.0, - "auto_pause_5h_limit": 100, }, } secondary := Account{ID: 35402, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} @@ -802,6 +798,60 @@ func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_UsesGlobalDefa require.Equal(t, int64(35402), account.ID) } +func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_StaleUsageWindowResetSkipsPause(t *testing.T) { + ctx := context.Background() + // Usage is over threshold but the window's reset time has already passed, so the + // cached percentage is stale (the real window rolled over) and the account must NOT + // stay paused — otherwise it could be skipped forever with no traffic to refresh it. + primary := Account{ + ID: 35501, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_5h_used_percent": 99.0, + "auto_pause_5h_threshold": 0.95, + "codex_5h_reset_at": time.Now().Add(-time.Minute).Format(time.RFC3339), + }, + } + secondary := Account{ID: 35502, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + svc := &OpenAIGatewayService{accountRepo: schedulerTestOpenAIAccountRepo{accounts: []Account{primary, secondary}}, cfg: &config.Config{}} + + account, err := svc.SelectAccountForModelWithExclusions(ctx, nil, "", "gpt-5.1", nil) + require.NoError(t, err) + require.NotNil(t, account) + require.Equal(t, int64(35501), account.ID) +} + +func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_FreshUsageWindowStillPauses(t *testing.T) { + ctx := context.Background() + // Same as above but the window has not reset yet, so the account stays paused. + primary := Account{ + ID: 35601, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_5h_used_percent": 99.0, + "auto_pause_5h_threshold": 0.95, + "codex_5h_reset_at": time.Now().Add(time.Hour).Format(time.RFC3339), + }, + } + secondary := Account{ID: 35602, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + svc := &OpenAIGatewayService{accountRepo: schedulerTestOpenAIAccountRepo{accounts: []Account{primary, secondary}}, cfg: &config.Config{}} + + account, err := svc.SelectAccountForModelWithExclusions(ctx, nil, "", "gpt-5.1", nil) + require.NoError(t, err) + require.NotNil(t, account) + require.Equal(t, int64(35602), account.ID) +} + func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_SkipsFreshlyRateLimitedSnapshotCandidate(t *testing.T) { ctx := context.Background() groupID := int64(10102) diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index 268f985c..e2534cc2 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -1328,13 +1328,12 @@ func isOpenAIAccountEligibleForRequest(ctx context.Context, account *Account, re return false } if paused, reason := shouldAutoPauseOpenAIAccountByQuota(ctx, account); paused { - slog.Info("account_auto_paused_by_quota", + // Debug level: this fires per-candidate on the scheduling hot path, so Info + // would amplify into log spam once several accounts cross the threshold. + slog.Debug("account_auto_paused_by_quota", "account_id", account.ID, - "usage_5h_percent", readOpenAIQuotaUsedPercent(account.Extra, "5h"), - "usage_7d_percent", readOpenAIQuotaUsedPercent(account.Extra, "7d"), - "threshold_type", reason.window, + "window", reason.window, "threshold", reason.threshold, - "limit", reason.limit, "utilization", reason.utilization, ) return false @@ -1354,7 +1353,6 @@ func isOpenAIAccountEligibleForRequest(ctx context.Context, account *Account, re type openAIQuotaAutoPauseDecision struct { window string threshold float64 - limit float64 utilization float64 } @@ -1363,18 +1361,15 @@ func shouldAutoPauseOpenAIAccountByQuota(ctx context.Context, account *Account) return false, openAIQuotaAutoPauseDecision{} } threshold5h, threshold7d := resolveOpenAIQuotaAutoPauseThresholds(ctx, account) + now := time.Now() if threshold5h > 0 { - if utilization, limit, ok := resolveOpenAIQuotaUtilization(account.Extra, "5h"); ok { - if utilization >= threshold5h { - return true, openAIQuotaAutoPauseDecision{window: "5h", threshold: threshold5h, limit: limit, utilization: utilization} - } + if utilization, ok := resolveOpenAIQuotaUtilization(account.Extra, "5h", now); ok && utilization >= threshold5h { + return true, openAIQuotaAutoPauseDecision{window: "5h", threshold: threshold5h, utilization: utilization} } } if threshold7d > 0 { - if utilization, limit, ok := resolveOpenAIQuotaUtilization(account.Extra, "7d"); ok { - if utilization >= threshold7d { - return true, openAIQuotaAutoPauseDecision{window: "7d", threshold: threshold7d, limit: limit, utilization: utilization} - } + if utilization, ok := resolveOpenAIQuotaUtilization(account.Extra, "7d", now); ok && utilization >= threshold7d { + return true, openAIQuotaAutoPauseDecision{window: "7d", threshold: threshold7d, utilization: utilization} } } return false, openAIQuotaAutoPauseDecision{} @@ -1431,18 +1426,49 @@ func resolveAccountExtraNumber(extra map[string]any, keys ...string) (float64, b return 0, false } -func resolveOpenAIQuotaUtilization(extra map[string]any, window string) (float64, float64, bool) { - limitKeys := []string{"auto_pause_" + window + "_limit", "quota_" + window + "_limit", window + "_limit"} - if limit, ok := resolveAccountExtraNumber(extra, limitKeys...); ok && limit > 0 { - if usage, ok := resolveAccountExtraNumber(extra, "usage_"+window); ok && usage >= 0 { - return usage / limit, limit, true - } - } +// resolveOpenAIQuotaUtilization returns the current utilization ratio (0..1) for the +// given Codex usage window. ok=false means there is no usable signal to pause on: +// either no snapshot exists, or the window has already rolled over so the cached +// percentage is stale. The stale guard matters because a paused account stops +// receiving requests, so its snapshot is never refreshed from upstream headers — +// without this check an old used_percent would keep the account paused forever even +// after the real window reset. +func resolveOpenAIQuotaUtilization(extra map[string]any, window string, now time.Time) (float64, bool) { usedPercent := readOpenAIQuotaUsedPercent(extra, window) if usedPercent <= 0 { - return 0, 0, false + return 0, false } - return usedPercent / 100, 100, true + if openAIQuotaWindowReset(extra, window, now) { + return 0, false + } + return usedPercent / 100, true +} + +// openAIQuotaWindowReset reports whether the Codex usage window's reset time has +// already passed relative to now. It prefers the absolute codex__reset_at +// timestamp and falls back to codex__reset_after_seconds anchored at +// codex_usage_updated_at, mirroring AccountUsageService's window-progress logic. +func openAIQuotaWindowReset(extra map[string]any, window string, now time.Time) bool { + if len(extra) == 0 { + return false + } + if resetAtRaw, ok := extra["codex_"+window+"_reset_at"]; ok { + if resetAt, err := parseTime(fmt.Sprint(resetAtRaw)); err == nil { + return !now.Before(resetAt) + } + } + resetAfter := parseExtraInt(extra["codex_"+window+"_reset_after_seconds"]) + if resetAfter <= 0 { + return false + } + base := now + if updatedRaw, ok := extra["codex_usage_updated_at"]; ok { + if updatedAt, err := parseTime(fmt.Sprint(updatedRaw)); err == nil { + base = updatedAt + } + } + resetAt := base.Add(time.Duration(resetAfter) * time.Second) + return !now.Before(resetAt) } func readOpenAIQuotaUsedPercent(extra map[string]any, window string) float64 { diff --git a/backend/internal/service/openai_ws_account_sticky_test.go b/backend/internal/service/openai_ws_account_sticky_test.go index c8b28a46..6fc44298 100644 --- a/backend/internal/service/openai_ws_account_sticky_test.go +++ b/backend/internal/service/openai_ws_account_sticky_test.go @@ -48,6 +48,46 @@ func TestOpenAIGatewayService_SelectAccountByPreviousResponseID_Hit(t *testing.T } } +func TestOpenAIGatewayService_SelectAccountByPreviousResponseID_QuotaAutoPausedMiss(t *testing.T) { + ctx := context.Background() + groupID := int64(23) + account := Account{ + ID: 77, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 2, + Extra: map[string]any{ + "openai_apikey_responses_websockets_v2_enabled": true, + "codex_5h_used_percent": 96.0, + "auto_pause_5h_threshold": 0.95, + }, + } + cache := &stubGatewayCache{} + store := NewOpenAIWSStateStore(cache) + cfg := newOpenAIWSV2TestConfig() + svc := &OpenAIGatewayService{ + accountRepo: stubOpenAIAccountRepo{accounts: []Account{account}}, + cache: cache, + cfg: cfg, + concurrencyService: NewConcurrencyService(stubConcurrencyCache{}), + openaiWSStateStore: store, + } + + require.NoError(t, store.BindResponseAccount(ctx, groupID, "resp_prev_quota", account.ID, time.Hour)) + + selection, err := svc.SelectAccountByPreviousResponseID(ctx, &groupID, "resp_prev_quota", "gpt-5.1", nil, false) + require.NoError(t, err) + require.Nil(t, selection, "超过 5h 配额阈值的账号不应继续命中 previous_response_id 粘连") + + // Auto-pause is transient, so the binding is preserved: the chain can resume on the + // same account once the quota window resets. + boundAccountID, getErr := store.GetResponseAccount(ctx, groupID, "resp_prev_quota") + require.NoError(t, getErr) + require.Equal(t, account.ID, boundAccountID) +} + func TestOpenAIGatewayService_SelectAccountByPreviousResponseID_RateLimitedMiss(t *testing.T) { ctx := context.Background() groupID := int64(23) diff --git a/backend/internal/service/openai_ws_forwarder.go b/backend/internal/service/openai_ws_forwarder.go index 6eea0191..878ff486 100644 --- a/backend/internal/service/openai_ws_forwarder.go +++ b/backend/internal/service/openai_ws_forwarder.go @@ -4045,6 +4045,13 @@ func (s *OpenAIGatewayService) selectAccountByPreviousResponseIDForCapability( if !account.SupportsOpenAIEndpointCapability(requiredCapability) { return nil, nil } + // Quota auto-pause must also gate the previous_response_id sticky path; otherwise an + // account over its 5h/7d threshold keeps serving the same response chain even though + // normal scheduling skips it. Pause is transient, so fall through to normal scheduling + // without deleting the binding (the window may reset before the next turn). + if paused, _ := shouldAutoPauseOpenAIAccountByQuota(ctx, account); paused { + return nil, nil + } if s.schedulerSnapshot != nil && s.accountRepo != nil { latest, latestErr := s.accountRepo.GetByID(ctx, account.ID) if latestErr != nil || latest == nil { @@ -4061,6 +4068,9 @@ func (s *OpenAIGatewayService) selectAccountByPreviousResponseIDForCapability( if !latest.SupportsOpenAIEndpointCapability(requiredCapability) { return nil, nil } + if paused, _ := shouldAutoPauseOpenAIAccountByQuota(ctx, latest); paused { + return nil, nil + } if s.isOpenAIAccountRuntimeBlocked(latest) { _ = store.DeleteResponseAccount(ctx, derefGroupID(groupID), responseID) return nil, nil diff --git a/frontend/src/api/admin/ops.ts b/frontend/src/api/admin/ops.ts index 69235668..847fc8c9 100644 --- a/frontend/src/api/admin/ops.ts +++ b/frontend/src/api/admin/ops.ts @@ -778,9 +778,15 @@ export interface OpsAlertRuntimeSettings { thresholds: OpsMetricThresholds // 指标阈值配置 } +export interface OpsOpenAIAccountQuotaAutoPauseSettings { + default_threshold_5h: number // 0~1,0 表示不启用全局默认 5h 阈值 + default_threshold_7d: number // 0~1,0 表示不启用全局默认 7d 阈值 +} + export interface OpsAdvancedSettings { data_retention: OpsDataRetentionSettings aggregation: OpsAggregationSettings + openai_account_quota_auto_pause: OpsOpenAIAccountQuotaAutoPauseSettings ignore_count_tokens_errors: boolean ignore_context_canceled: boolean ignore_no_available_accounts: boolean diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index fa2e5a92..8ab90961 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -5193,6 +5193,11 @@ export default { aggregation: 'Pre-aggregation Tasks', enableAggregation: 'Enable Pre-aggregation', aggregationHint: 'Pre-aggregation improves query performance for long time windows', + openaiQuotaAutoPause: 'OpenAI Account Quota Auto-pause', + openaiQuotaAutoPauseHint: 'When an OpenAI account reaches its 5h / 7d usage threshold, the scheduler skips it automatically and resumes once the window rolls over. Per-account thresholds take precedence over this global default.', + openaiQuotaAutoPauseDefault5h: 'Default 5h usage threshold (%)', + openaiQuotaAutoPauseDefault7d: 'Default 7d usage threshold (%)', + openaiQuotaAutoPauseThresholdHint: 'Value 0-100; leave blank or 0 to disable the global default threshold.', errorFiltering: 'Error Filtering', ignoreCountTokensErrors: 'Ignore count_tokens errors', ignoreCountTokensErrorsHint: 'When enabled, errors from count_tokens requests will not be written to the error log.', @@ -5223,7 +5228,8 @@ export default { slaMinPercentRange: 'SLA minimum percentage must be between 0 and 100', ttftP99MaxRange: 'TTFT P99 maximum must be a number ≥ 0', requestErrorRateMaxRange: 'Request error rate maximum must be between 0 and 100', - upstreamErrorRateMaxRange: 'Upstream error rate maximum must be between 0 and 100' + upstreamErrorRateMaxRange: 'Upstream error rate maximum must be between 0 and 100', + openaiQuotaAutoPauseRange: 'OpenAI quota auto-pause threshold must be between 0 and 100' } }, concurrency: { diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index 2364f9c4..4f1d1f13 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -5352,6 +5352,11 @@ export default { aggregation: '预聚合任务', enableAggregation: '启用预聚合任务', aggregationHint: '预聚合可提升长时间窗口查询性能', + openaiQuotaAutoPause: 'OpenAI 账号配额自动暂停', + openaiQuotaAutoPauseHint: '当 OpenAI 账号 5h / 7d 用量达到阈值时,调度会自动跳过该账号;窗口滚动后自动恢复。账号级阈值优先于此全局默认值。', + openaiQuotaAutoPauseDefault5h: '默认 5h 用量阈值 (%)', + openaiQuotaAutoPauseDefault7d: '默认 7d 用量阈值 (%)', + openaiQuotaAutoPauseThresholdHint: '取值 0-100,留空或 0 表示不启用全局默认阈值。', errorFiltering: '错误过滤', ignoreCountTokensErrors: '忽略 count_tokens 错误', ignoreCountTokensErrorsHint: '启用后,count_tokens 请求的错误将不会写入错误日志。', @@ -5383,7 +5388,8 @@ export default { slaMinPercentRange: 'SLA最低百分比必须在0-100之间', ttftP99MaxRange: 'TTFT P99最大值必须大于等于0', requestErrorRateMaxRange: '请求错误率最大值必须在0-100之间', - upstreamErrorRateMaxRange: '上游错误率最大值必须在0-100之间' + upstreamErrorRateMaxRange: '上游错误率最大值必须在0-100之间', + openaiQuotaAutoPauseRange: 'OpenAI 配额自动暂停阈值必须在 0-100 之间' } }, concurrency: { diff --git a/frontend/src/views/admin/ops/components/OpsSettingsDialog.vue b/frontend/src/views/admin/ops/components/OpsSettingsDialog.vue index 5dba5b1d..bfb7a65f 100644 --- a/frontend/src/views/admin/ops/components/OpsSettingsDialog.vue +++ b/frontend/src/views/admin/ops/components/OpsSettingsDialog.vue @@ -50,6 +50,10 @@ async function loadAllSettings() { runtimeSettings.value = runtime emailConfig.value = email advancedSettings.value = advanced + // 兼容旧 payload:后端未返回该字段时补默认值,保证表单可绑定 + if (advancedSettings.value && !advancedSettings.value.openai_account_quota_auto_pause) { + advancedSettings.value.openai_account_quota_auto_pause = { default_threshold_5h: 0, default_threshold_7d: 0 } + } // 如果后端返回了阈值,使用后端的值;否则保持默认值 if (thresholds && Object.keys(thresholds).length > 0) { metricThresholds.value = { @@ -119,6 +123,28 @@ function removeRecipient(target: 'alert' | 'report', email: string) { if (idx >= 0) list.splice(idx, 1) } +// OpenAI 账号配额自动暂停:后端按 0~1 分数存储,UI 按百分比(0~100)展示 +const quotaAutoPause5hPercent = computed({ + get() { + const v = advancedSettings.value?.openai_account_quota_auto_pause?.default_threshold_5h + return v && v > 0 ? Math.round(v * 1000) / 10 : null + }, + set(val) { + if (!advancedSettings.value?.openai_account_quota_auto_pause) return + advancedSettings.value.openai_account_quota_auto_pause.default_threshold_5h = val != null && val > 0 ? val / 100 : 0 + } +}) +const quotaAutoPause7dPercent = computed({ + get() { + const v = advancedSettings.value?.openai_account_quota_auto_pause?.default_threshold_7d + return v && v > 0 ? Math.round(v * 1000) / 10 : null + }, + set(val) { + if (!advancedSettings.value?.openai_account_quota_auto_pause) return + advancedSettings.value.openai_account_quota_auto_pause.default_threshold_7d = val != null && val > 0 ? val / 100 : 0 + } +}) + // 验证 const validation = computed(() => { const errors: string[] = [] @@ -145,6 +171,11 @@ const validation = computed(() => { if (hourly_metrics_retention_days < 0 || hourly_metrics_retention_days > 365) { errors.push(t('admin.ops.settings.validation.retentionDaysRange')) } + + const { default_threshold_5h, default_threshold_7d } = advancedSettings.value.openai_account_quota_auto_pause + if (default_threshold_5h < 0 || default_threshold_5h > 1 || default_threshold_7d < 0 || default_threshold_7d > 1) { + errors.push(t('admin.ops.settings.validation.openaiQuotaAutoPauseRange')) + } } // 验证指标阈值 @@ -473,6 +504,40 @@ async function saveAllSettings() {
+ +
+
{{ t('admin.ops.settings.openaiQuotaAutoPause') }}
+

{{ t('admin.ops.settings.openaiQuotaAutoPauseHint') }}

+ +
+
+ + +
+
+ + +
+
+

{{ t('admin.ops.settings.openaiQuotaAutoPauseThresholdHint') }}

+
+
{{ t('admin.ops.settings.errorFiltering') }}
From c9caadb3782a4e658ce46789df092d0b15d58172 Mon Sep 17 00:00:00 2001 From: wucm667 Date: Fri, 29 May 2026 14:32:45 +0800 Subject: [PATCH 3/3] fix(account): address second-round review on quota auto-pause - TopK initial filter now drops quota-paused accounts: fold the quota check into isAccountRequestCompatible so session-hash, TopK pool, and per-candidate rechecks all skip paused accounts. Previously the candidate pool was built without the quota check, so paused accounts could fill TopK and leave the scheduler returning "no available accounts" even with healthy ones available. - Add per-account explicit disable flags auto_pause_5h_disabled / auto_pause_7d_disabled with toggles in EditAccountModal. Without these, leaving the account threshold blank silently falls back to the global default, so admins could not exempt a single account once a global default existed. Disable is per-window: an account can opt out of 5h auto-pause while still honoring 7d. Schedule snapshot whitelist includes the new fields, i18n EN/ZH updated, threshold-hint text revised to explain "blank = global default". - Move quota auto-pause settings off the request hot path: replace the per-repo TTL+singleflight sync DB read with a per-SettingService stale-while-revalidate in-memory snapshot. Get is non-blocking (atomic.Pointer load + async refresh on staleness); writes via UpdateOpsAdvancedSettings push directly into the cache through an injected sink; wire warms the cache at startup. Adds Warm (sync) for tests/init and SetOpenAIQuotaAutoPauseSettings (sink target). Co-Authored-By: Claude Opus 4.7 --- backend/cmd/server/wire_gen.go | 2 +- .../internal/repository/scheduler_cache.go | 2 + .../repository/scheduler_cache_unit_test.go | 4 + .../service/openai_account_scheduler.go | 7 + .../service/openai_account_scheduler_test.go | 136 ++++++++++++++ .../service/openai_gateway_service.go | 44 ++++- backend/internal/service/ops_service.go | 15 ++ backend/internal/service/ops_settings.go | 12 +- .../service/ops_settings_advanced_test.go | 50 ++++- backend/internal/service/setting_service.go | 176 +++++++++++------- backend/internal/service/wire.go | 42 ++++- .../components/account/EditAccountModal.vue | 60 ++++++ .../__tests__/EditAccountModal.spec.ts | 21 +++ frontend/src/i18n/locales/en.ts | 5 +- frontend/src/i18n/locales/zh.ts | 5 +- 15 files changed, 505 insertions(+), 76 deletions(-) diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 465f5e25..6e8be8fc 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -195,7 +195,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, usageBillingRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService, claudeTokenProvider, sessionLimitCache, rpmCache, digestSessionStore, settingService, tlsFingerprintProfileService, channelService, modelPricingResolver, balanceNotifyService, serviceUserPlatformQuotaRepository) geminiMessagesCompatService := service.NewGeminiMessagesCompatService(accountRepository, groupRepository, gatewayCache, schedulerSnapshotService, geminiTokenProvider, rateLimitService, httpUpstream, antigravityGatewayService, configConfig) opsSystemLogSink := service.ProvideOpsSystemLogSink(opsRepository) - opsService := service.NewOpsService(opsRepository, settingRepository, configConfig, accountRepository, userRepository, concurrencyService, gatewayService, openAIGatewayService, geminiMessagesCompatService, antigravityGatewayService, opsSystemLogSink) + opsService := service.ProvideOpsService(opsRepository, settingRepository, configConfig, accountRepository, userRepository, concurrencyService, gatewayService, openAIGatewayService, geminiMessagesCompatService, antigravityGatewayService, opsSystemLogSink, settingService) encryptionKey, err := payment.ProvideEncryptionKey(configConfig) if err != nil { return nil, err diff --git a/backend/internal/repository/scheduler_cache.go b/backend/internal/repository/scheduler_cache.go index ff3c4301..cf19deda 100644 --- a/backend/internal/repository/scheduler_cache.go +++ b/backend/internal/repository/scheduler_cache.go @@ -557,6 +557,8 @@ func filterSchedulerExtra(extra map[string]any) map[string]any { "codex_usage_updated_at", "auto_pause_5h_threshold", "auto_pause_7d_threshold", + "auto_pause_5h_disabled", + "auto_pause_7d_disabled", } filtered := make(map[string]any) for _, key := range keys { diff --git a/backend/internal/repository/scheduler_cache_unit_test.go b/backend/internal/repository/scheduler_cache_unit_test.go index 9e4ec23e..a4667591 100644 --- a/backend/internal/repository/scheduler_cache_unit_test.go +++ b/backend/internal/repository/scheduler_cache_unit_test.go @@ -89,6 +89,8 @@ func TestBuildSchedulerMetadataAccount_KeepsQuotaAutoPauseFields(t *testing.T) { "codex_usage_updated_at": "2026-05-29T09:00:00Z", "auto_pause_5h_threshold": 0.95, "auto_pause_7d_threshold": 0.96, + "auto_pause_5h_disabled": true, + "auto_pause_7d_disabled": false, }, } @@ -103,4 +105,6 @@ func TestBuildSchedulerMetadataAccount_KeepsQuotaAutoPauseFields(t *testing.T) { require.Equal(t, "2026-05-29T09:00:00Z", got.Extra["codex_usage_updated_at"]) require.Equal(t, 0.95, got.Extra["auto_pause_5h_threshold"]) require.Equal(t, 0.96, got.Extra["auto_pause_7d_threshold"]) + require.Equal(t, true, got.Extra["auto_pause_5h_disabled"]) + require.Equal(t, false, got.Extra["auto_pause_7d_disabled"]) } diff --git a/backend/internal/service/openai_account_scheduler.go b/backend/internal/service/openai_account_scheduler.go index fd28fa86..47a8142a 100644 --- a/backend/internal/service/openai_account_scheduler.go +++ b/backend/internal/service/openai_account_scheduler.go @@ -974,6 +974,13 @@ func (s *defaultOpenAIAccountScheduler) isAccountRequestCompatible(ctx context.C if s != nil && s.service != nil && s.service.isOpenAIAccountRuntimeBlocked(account) { return false } + // Quota auto-pause must be evaluated during the initial filter too. Without it the + // TopK candidate pool can be filled with paused accounts and the later fresh/DB + // rechecks won't reach healthy accounts that fell outside TopK — manifesting as + // "no available accounts" even though healthy ones exist. + if paused, _ := shouldAutoPauseOpenAIAccountByQuota(ctx, account); paused { + return false + } if req.RequestedModel != "" && !account.IsModelSupported(req.RequestedModel) { return false } diff --git a/backend/internal/service/openai_account_scheduler_test.go b/backend/internal/service/openai_account_scheduler_test.go index 531769a7..da5f0a66 100644 --- a/backend/internal/service/openai_account_scheduler_test.go +++ b/backend/internal/service/openai_account_scheduler_test.go @@ -798,6 +798,63 @@ func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_UsesGlobalDefa require.Equal(t, int64(35402), account.ID) } +// Regression: a per-account explicit-disable flag exempts the account from auto-pause +// even when a global default threshold is set. Without this, "leave threshold blank" +// silently falls back to global default and admins have no way to whitelist a single +// account. +func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_PerAccountDisableOverridesGlobalDefault(t *testing.T) { + ctx := withOpenAIQuotaAutoPauseSettings(context.Background(), OpsOpenAIAccountQuotaAutoPauseSettings{DefaultThreshold5h: 0.95}) + // Account has high usage AND no per-account threshold (would normally fall back to + // the global default and get paused), but the explicit disable flag is set. + primary := Account{ + ID: 35701, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_5h_used_percent": 99.0, + "auto_pause_5h_disabled": true, + }, + } + secondary := Account{ID: 35702, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + svc := &OpenAIGatewayService{accountRepo: schedulerTestOpenAIAccountRepo{accounts: []Account{primary, secondary}}, cfg: &config.Config{}} + + account, err := svc.SelectAccountForModelWithExclusions(ctx, nil, "", "gpt-5.1", nil) + require.NoError(t, err) + require.NotNil(t, account) + require.Equal(t, int64(35701), account.ID) +} + +// Disable is per-window: disabling only 5h must still allow 7d auto-pause to fire. +func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_PerWindowDisableScoped(t *testing.T) { + ctx := context.Background() + primary := Account{ + ID: 35801, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_5h_used_percent": 99.0, + "codex_7d_used_percent": 99.0, + "auto_pause_5h_disabled": true, + "auto_pause_7d_threshold": 0.95, + }, + } + secondary := Account{ID: 35802, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + svc := &OpenAIGatewayService{accountRepo: schedulerTestOpenAIAccountRepo{accounts: []Account{primary, secondary}}, cfg: &config.Config{}} + + account, err := svc.SelectAccountForModelWithExclusions(ctx, nil, "", "gpt-5.1", nil) + require.NoError(t, err) + require.NotNil(t, account) + require.Equal(t, int64(35802), account.ID, "7d auto-pause must still fire even though 5h is disabled") +} + func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_StaleUsageWindowResetSkipsPause(t *testing.T) { ctx := context.Background() // Usage is over threshold but the window's reset time has already passed, so the @@ -1399,6 +1456,85 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_LoadBalanceTopKFallback } } +// Regression: TopK initial filter must drop quota-auto-paused accounts. Otherwise +// the candidate pool is filled with paused accounts, healthy accounts fall outside +// TopK, and the scheduler returns "no available accounts" even though healthy ones +// exist. +func TestOpenAIGatewayService_SelectAccountWithScheduler_LoadBalanceTopKExcludesQuotaPaused(t *testing.T) { + ctx := context.Background() + groupID := int64(110) + accounts := []Account{ + { + ID: 37001, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_5h_used_percent": 96.0, + "auto_pause_5h_threshold": 0.95, + }, + }, + { + ID: 37002, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 5, + }, + } + + cfg := &config.Config{} + cfg.Gateway.OpenAIWS.LBTopK = 1 // TopK=1 makes the bug fatal: paused account would crowd out the healthy one entirely + cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Priority = 0.4 + cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Load = 1.0 + cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Queue = 1.0 + + concurrencyCache := schedulerTestConcurrencyCache{ + loadMap: map[int64]*AccountLoadInfo{ + 37001: {AccountID: 37001, LoadRate: 5, WaitingCount: 0}, + 37002: {AccountID: 37002, LoadRate: 5, WaitingCount: 0}, + }, + acquireResults: map[int64]bool{ + 37002: true, + }, + } + + svc := &OpenAIGatewayService{ + accountRepo: schedulerTestOpenAIAccountRepo{accounts: accounts}, + cache: &schedulerTestGatewayCache{}, + cfg: cfg, + rateLimitService: newOpenAIAdvancedSchedulerRateLimitService("true"), + concurrencyService: NewConcurrencyService(concurrencyCache), + } + + selection, decision, err := svc.SelectAccountWithScheduler( + ctx, + &groupID, + "", + "", + "gpt-5.1", + nil, + OpenAIUpstreamTransportAny, + false, + ) + require.NoError(t, err) + require.NotNil(t, selection) + require.NotNil(t, selection.Account) + require.Equal(t, int64(37002), selection.Account.ID) + require.Equal(t, openAIAccountScheduleLayerLoadBalance, decision.Layer) + // Only the healthy account should ever enter the candidate pool; the paused one + // must be filtered out at the initial-filter stage. + require.Equal(t, 1, decision.CandidateCount) + if selection.ReleaseFunc != nil { + selection.ReleaseFunc() + } +} + func TestOpenAIGatewayService_OpenAIAccountSchedulerMetrics(t *testing.T) { ctx := context.Background() groupID := int64(12) diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index e2534cc2..b1ae6a9a 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -1360,14 +1360,21 @@ func shouldAutoPauseOpenAIAccountByQuota(ctx context.Context, account *Account) if account == nil || !account.IsOpenAI() { return false, openAIQuotaAutoPauseDecision{} } + // Per-account explicit-disable flags must take precedence over the global default. + // Without these, leaving the account threshold blank means "use global default", + // so an admin has no way to exempt a single account from auto-pause once a global + // default exists. The disable flag is per-window so an account can opt out of + // only 5h or only 7d auto-pause. + disabled5h := resolveAccountExtraBool(account.Extra, "auto_pause_5h_disabled") + disabled7d := resolveAccountExtraBool(account.Extra, "auto_pause_7d_disabled") threshold5h, threshold7d := resolveOpenAIQuotaAutoPauseThresholds(ctx, account) now := time.Now() - if threshold5h > 0 { + if !disabled5h && threshold5h > 0 { if utilization, ok := resolveOpenAIQuotaUtilization(account.Extra, "5h", now); ok && utilization >= threshold5h { return true, openAIQuotaAutoPauseDecision{window: "5h", threshold: threshold5h, utilization: utilization} } } - if threshold7d > 0 { + if !disabled7d && threshold7d > 0 { if utilization, ok := resolveOpenAIQuotaUtilization(account.Extra, "7d", now); ok && utilization >= threshold7d { return true, openAIQuotaAutoPauseDecision{window: "7d", threshold: threshold7d, utilization: utilization} } @@ -1375,6 +1382,39 @@ func shouldAutoPauseOpenAIAccountByQuota(ctx context.Context, account *Account) return false, openAIQuotaAutoPauseDecision{} } +// resolveAccountExtraBool reads a bool-like value from account extra, tolerating +// the few shapes JSON unmarshalling may produce (real bool, "true"/"false" +// strings, 0/1 numbers). +func resolveAccountExtraBool(extra map[string]any, key string) bool { + if len(extra) == 0 { + return false + } + value, ok := extra[key] + if !ok || value == nil { + return false + } + switch v := value.(type) { + case bool: + return v + case string: + parsed, err := strconv.ParseBool(strings.TrimSpace(v)) + return err == nil && parsed + case float64: + return v != 0 + case float32: + return v != 0 + case int: + return v != 0 + case int64: + return v != 0 + case json.Number: + if i, err := v.Int64(); err == nil { + return i != 0 + } + } + return false +} + func resolveOpenAIQuotaAutoPauseThresholds(ctx context.Context, account *Account) (float64, float64) { threshold5h, _ := resolveAccountExtraNumber(account.Extra, "auto_pause_5h_threshold") threshold7d, _ := resolveAccountExtraNumber(account.Extra, "auto_pause_7d_threshold") diff --git a/backend/internal/service/ops_service.go b/backend/internal/service/ops_service.go index 1cea72fa..2d7c5bd4 100644 --- a/backend/internal/service/ops_service.go +++ b/backend/internal/service/ops_service.go @@ -41,6 +41,11 @@ type OpsService struct { // cleanupReloader 由 wire 在 OpsCleanupService 构造完成后通过 SetCleanupReloader 注入。 // 解耦避免 OpsService -> OpsCleanupService 的硬依赖(cleanup 也读 settings,会循环)。 cleanupReloader CleanupReloader + + // quotaAutoPauseSink 由 wire 注入(通常是 SettingService.SetOpenAIQuotaAutoPauseSettings)。 + // UpdateOpsAdvancedSettings 写入新配置后调用,把最新的 quota auto-pause 全局默认阈值 + // 立即同步到调度热路径读取的内存缓存,避免下次请求才能感知新值。 + quotaAutoPauseSink func(OpsOpenAIAccountQuotaAutoPauseSettings) } // CleanupReloader 由 OpsCleanupService 实现。 @@ -57,6 +62,16 @@ func (s *OpsService) SetCleanupReloader(r CleanupReloader) { s.cleanupReloader = r } +// SetOpenAIQuotaAutoPauseSettingsSink 由 wire 注入,把最新的 quota auto-pause 全局默认 +// 阈值 push 到调度热路径读取的内存缓存。同 SetCleanupReloader 的解耦目的:避免 OpsService +// 持有 *SettingService 引入循环依赖。 +func (s *OpsService) SetOpenAIQuotaAutoPauseSettingsSink(sink func(OpsOpenAIAccountQuotaAutoPauseSettings)) { + if s == nil { + return + } + s.quotaAutoPauseSink = sink +} + func NewOpsService( opsRepo OpsRepository, settingRepo SettingRepository, diff --git a/backend/internal/service/ops_settings.go b/backend/internal/service/ops_settings.go index 23e92e5a..472f4e32 100644 --- a/backend/internal/service/ops_settings.go +++ b/backend/internal/service/ops_settings.go @@ -490,12 +490,12 @@ func (s *OpsService) UpdateOpsAdvancedSettings(ctx context.Context, cfg *OpsAdva if err := s.settingRepo.Set(ctx, SettingKeyOpsAdvancedSettings, string(raw)); err != nil { return nil, err } - cacheKey := openAIQuotaAutoPauseSettingsCacheKey(s.settingRepo) - openAIQuotaAutoPauseSettingsSF.Forget(cacheKey) - storeOpenAIQuotaAutoPauseSettingsCache(s.settingRepo, &cachedOpenAIQuotaAutoPauseSettings{ - settings: cfg.OpenAIAccountQuotaAutoPause, - expiresAt: time.Now().Add(openAIQuotaAutoPauseSettingsCacheTTL).UnixNano(), - }) + // Push the new quota auto-pause settings straight into the in-memory cache that + // the OpenAI scheduling hot path reads, so the next request observes the new value + // without waiting for the background refresher's TTL. + if s.quotaAutoPauseSink != nil { + s.quotaAutoPauseSink(cfg.OpenAIAccountQuotaAutoPause) + } // notify cleanup service to reload schedule/enabled. if s.cleanupReloader != nil { diff --git a/backend/internal/service/ops_settings_advanced_test.go b/backend/internal/service/ops_settings_advanced_test.go index d8598fe0..62803f94 100644 --- a/backend/internal/service/ops_settings_advanced_test.go +++ b/backend/internal/service/ops_settings_advanced_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "testing" + "time" "github.com/Wei-Shaw/sub2api/internal/config" ) @@ -103,11 +104,58 @@ func TestGetOpenAIQuotaAutoPauseSettings_ReadsDefaultsFromOpsAdvancedSettings(t repo.values[SettingKeyOpsAdvancedSettings] = `{"openai_account_quota_auto_pause":{"default_threshold_5h":0.95,"default_threshold_7d":0.9}}` svc := NewSettingService(repo, &config.Config{}) - settings := svc.GetOpenAIQuotaAutoPauseSettings(context.Background()) + // Warm the in-memory cache synchronously so the assertion below is deterministic. + // GetOpenAIQuotaAutoPauseSettings is non-blocking on the hot path (returns the + // cached value, refreshes asynchronously); for tests and startup, Warm is the + // synchronous entry point that guarantees a populated cache. + settings := svc.WarmOpenAIQuotaAutoPauseSettings(context.Background()) if settings.DefaultThreshold5h != 0.95 { t.Fatalf("DefaultThreshold5h = %v, want 0.95", settings.DefaultThreshold5h) } if settings.DefaultThreshold7d != 0.9 { t.Fatalf("DefaultThreshold7d = %v, want 0.9", settings.DefaultThreshold7d) } + + // Subsequent Get must hit the warm cache and return the same value without any DB + // access — that's the hot-path invariant. + cached := svc.GetOpenAIQuotaAutoPauseSettings(context.Background()) + if cached.DefaultThreshold5h != 0.95 || cached.DefaultThreshold7d != 0.9 { + t.Fatalf("cached read = %+v, want {0.95, 0.9}", cached) + } +} + +// Hot-path invariant: a Get with cold cache must return immediately (zero defaults) +// rather than blocking on the DB. The async refresher will populate the cache for +// subsequent calls. +func TestGetOpenAIQuotaAutoPauseSettings_ColdCacheNonBlocking(t *testing.T) { + repo := newRuntimeSettingRepoStub() + repo.values[SettingKeyOpsAdvancedSettings] = `{"openai_account_quota_auto_pause":{"default_threshold_5h":0.7}}` + svc := NewSettingService(repo, &config.Config{}) + + start := time.Now() + settings := svc.GetOpenAIQuotaAutoPauseSettings(context.Background()) + elapsed := time.Since(start) + if elapsed > 50*time.Millisecond { + t.Fatalf("cold-cache Get must be non-blocking, took %v", elapsed) + } + // Cold cache means we get zero defaults (the async refresh hasn't completed yet). + if settings.DefaultThreshold5h != 0 || settings.DefaultThreshold7d != 0 { + t.Fatalf("cold-cache Get = %+v, want zeroes", settings) + } +} + +// Explicit cache write (e.g. from UpdateOpsAdvancedSettings) must be visible on the +// very next read without any DB roundtrip. +func TestSetOpenAIQuotaAutoPauseSettings_VisibleImmediately(t *testing.T) { + svc := NewSettingService(newRuntimeSettingRepoStub(), &config.Config{}) + + svc.SetOpenAIQuotaAutoPauseSettings(OpsOpenAIAccountQuotaAutoPauseSettings{ + DefaultThreshold5h: 0.88, + DefaultThreshold7d: 0.77, + }) + + got := svc.GetOpenAIQuotaAutoPauseSettings(context.Background()) + if got.DefaultThreshold5h != 0.88 || got.DefaultThreshold7d != 0.77 { + t.Fatalf("after Set, Get = %+v, want {0.88, 0.77}", got) + } } diff --git a/backend/internal/service/setting_service.go b/backend/internal/service/setting_service.go index 40e98b88..98acdb80 100644 --- a/backend/internal/service/setting_service.go +++ b/backend/internal/service/setting_service.go @@ -14,7 +14,6 @@ import ( "sort" "strconv" "strings" - "sync" "sync/atomic" "time" @@ -162,28 +161,7 @@ const openAIQuotaAutoPauseSettingsCacheTTL = 60 * time.Second const openAIQuotaAutoPauseSettingsErrorTTL = 5 * time.Second const openAIQuotaAutoPauseSettingsDBTimeout = 5 * time.Second -var openAIQuotaAutoPauseSettingsCache sync.Map // map[string]*cachedOpenAIQuotaAutoPauseSettings -var openAIQuotaAutoPauseSettingsSF singleflight.Group - -func openAIQuotaAutoPauseSettingsCacheKey(repo SettingRepository) string { - if repo == nil { - return "nil" - } - return fmt.Sprintf("%T:%p", repo, repo) -} - -func loadOpenAIQuotaAutoPauseSettingsCache(repo SettingRepository) (*cachedOpenAIQuotaAutoPauseSettings, bool) { - value, ok := openAIQuotaAutoPauseSettingsCache.Load(openAIQuotaAutoPauseSettingsCacheKey(repo)) - if !ok || value == nil { - return nil, false - } - cached, ok := value.(*cachedOpenAIQuotaAutoPauseSettings) - return cached, ok && cached != nil -} - -func storeOpenAIQuotaAutoPauseSettingsCache(repo SettingRepository, cached *cachedOpenAIQuotaAutoPauseSettings) { - openAIQuotaAutoPauseSettingsCache.Store(openAIQuotaAutoPauseSettingsCacheKey(repo), cached) -} +const openAIQuotaAutoPauseSettingsRefreshKey = "openai_quota_auto_pause_settings" // DefaultSubscriptionGroupReader validates group references used by default subscriptions. type DefaultSubscriptionGroupReader interface { @@ -209,6 +187,15 @@ type SettingService struct { openAICodexUASF singleflight.Group openAIAllowCodexPluginCache atomic.Value // *cachedOpenAIAllowCodexPlugin openAIAllowCodexPluginSF singleflight.Group + + // openAIQuotaAutoPauseSettingsCache holds the most recently observed quota auto-pause + // settings. GetOpenAIQuotaAutoPauseSettings reads this atomic.Value on the request hot + // path without ever blocking on the DB; when the cached entry expires, a background + // goroutine refreshes it via openAIQuotaAutoPauseSettingsSF (stale-while-revalidate). + // This per-service field also gives tests natural isolation — each SettingService + // instance owns its own cache, no shared package-level state. + openAIQuotaAutoPauseSettingsCache atomic.Value // *cachedOpenAIQuotaAutoPauseSettings + openAIQuotaAutoPauseSettingsSF singleflight.Group } // DefaultPlatformQuotaSetting 单 platform 三档限额(nil = 沿用上层;0 = 显式禁用;>0 = 上限) @@ -2060,9 +2047,17 @@ func (s *SettingService) refreshCachedSettings(settings *SystemSettings) { enabled: settings.OpenAIAdvancedSchedulerEnabled, expiresAt: time.Now().Add(openAIAdvancedSchedulerSettingCacheTTL).UnixNano(), }) - cacheKey := openAIQuotaAutoPauseSettingsCacheKey(s.settingRepo) - openAIQuotaAutoPauseSettingsSF.Forget(cacheKey) - openAIQuotaAutoPauseSettingsCache.Delete(cacheKey) + // Invalidate the quota auto-pause cache and let the next read trigger a fresh load. + // We can't know from here whether ops_advanced_settings was also touched, so be + // defensive: store an expired entry — GetOpenAIQuotaAutoPauseSettings will serve + // stale and kick off an async refresh, never blocking the request that follows. + s.openAIQuotaAutoPauseSettingsSF.Forget(openAIQuotaAutoPauseSettingsRefreshKey) + if cached, _ := s.openAIQuotaAutoPauseSettingsCache.Load().(*cachedOpenAIQuotaAutoPauseSettings); cached != nil { + s.openAIQuotaAutoPauseSettingsCache.Store(&cachedOpenAIQuotaAutoPauseSettings{ + settings: cached.settings, + expiresAt: 0, + }) + } if s.cfg != nil { s.cfg.SetTrustForwardedIPForAPIKeyACL(settings.APIKeyACLTrustForwardedIP) } @@ -4484,49 +4479,104 @@ func (s *SettingService) GetClaudeCodeVersionBounds(ctx context.Context) (min, m return b.min, b.max } +// GetOpenAIQuotaAutoPauseSettings returns the current global default quota auto-pause +// settings. It is invoked on the OpenAI scheduling hot path (once per request) and is +// therefore designed to never block on the DB: +// +// - Fresh cached value → returned immediately. +// - Stale or empty cache → the last known value is returned, and a background +// goroutine refreshes the cache via singleflight (stale-while-revalidate). +// - First call with no cache yet → zero defaults are returned and the same async +// refresh is kicked off; the next call gets the freshly populated value. +// +// Callers that need the freshly persisted value synchronously (tests, post-update +// confirmation, optional startup warm-up) should call WarmOpenAIQuotaAutoPauseSettings. func (s *SettingService) GetOpenAIQuotaAutoPauseSettings(ctx context.Context) OpsOpenAIAccountQuotaAutoPauseSettings { - if cached, ok := loadOpenAIQuotaAutoPauseSettingsCache(s.settingRepo); ok { - if time.Now().UnixNano() < cached.expiresAt { - return cached.settings + if s == nil { + return OpsOpenAIAccountQuotaAutoPauseSettings{} + } + cached, _ := s.openAIQuotaAutoPauseSettingsCache.Load().(*cachedOpenAIQuotaAutoPauseSettings) + now := time.Now().UnixNano() + if cached != nil && now < cached.expiresAt { + return cached.settings + } + // Stale or unset: trigger background refresh without blocking this request. + // singleflight.DoChan dedupes concurrent refreshes; we deliberately ignore the + // returned channel — the result is observable via the atomic cache. + s.openAIQuotaAutoPauseSettingsSF.DoChan(openAIQuotaAutoPauseSettingsRefreshKey, func() (any, error) { + s.refreshOpenAIQuotaAutoPauseSettings(context.Background()) + return nil, nil + }) + if cached != nil { + return cached.settings // serve stale value while revalidating + } + return OpsOpenAIAccountQuotaAutoPauseSettings{} +} + +// WarmOpenAIQuotaAutoPauseSettings synchronously loads the quota auto-pause settings +// into the in-memory cache. Useful for application startup (so the first request hits +// a warm cache) and for tests that need deterministic reads immediately after +// constructing the service. +func (s *SettingService) WarmOpenAIQuotaAutoPauseSettings(ctx context.Context) OpsOpenAIAccountQuotaAutoPauseSettings { + if s == nil { + return OpsOpenAIAccountQuotaAutoPauseSettings{} + } + s.refreshOpenAIQuotaAutoPauseSettings(ctx) + cached, _ := s.openAIQuotaAutoPauseSettingsCache.Load().(*cachedOpenAIQuotaAutoPauseSettings) + if cached == nil { + return OpsOpenAIAccountQuotaAutoPauseSettings{} + } + return cached.settings +} + +// refreshOpenAIQuotaAutoPauseSettings reads the latest settings from the DB and stores +// them into the in-memory cache. On error it stores the prior value (or zero defaults +// if nothing is cached yet) with the shorter error TTL so the next refresh comes +// sooner. Always uses its own timeout-bounded context to keep refresh latency +// predictable regardless of the caller. +func (s *SettingService) refreshOpenAIQuotaAutoPauseSettings(ctx context.Context) { + if s == nil || s.settingRepo == nil { + return + } + dbCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), openAIQuotaAutoPauseSettingsDBTimeout) + defer cancel() + + settings := OpsOpenAIAccountQuotaAutoPauseSettings{} + ttl := openAIQuotaAutoPauseSettingsCacheTTL + raw, err := s.settingRepo.GetValue(dbCtx, SettingKeyOpsAdvancedSettings) + if err == nil { + cfg := defaultOpsAdvancedSettings() + if strings.TrimSpace(raw) != "" { + if jsonErr := json.Unmarshal([]byte(raw), cfg); jsonErr == nil { + normalizeOpsAdvancedSettings(cfg) + } } + settings = cfg.OpenAIAccountQuotaAutoPause + } else if !errors.Is(err, ErrSettingNotFound) { + // Real error: keep serving prior value but refresh sooner. + if prior, _ := s.openAIQuotaAutoPauseSettingsCache.Load().(*cachedOpenAIQuotaAutoPauseSettings); prior != nil { + settings = prior.settings + } + ttl = openAIQuotaAutoPauseSettingsErrorTTL } - cacheKey := openAIQuotaAutoPauseSettingsCacheKey(s.settingRepo) - result, _, _ := openAIQuotaAutoPauseSettingsSF.Do(cacheKey, func() (any, error) { - if cached, ok := loadOpenAIQuotaAutoPauseSettingsCache(s.settingRepo); ok { - if time.Now().UnixNano() < cached.expiresAt { - return cached.settings, nil - } - } - - settings := OpsOpenAIAccountQuotaAutoPauseSettings{} - ttl := openAIQuotaAutoPauseSettingsCacheTTL - if s != nil && s.settingRepo != nil { - dbCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), openAIQuotaAutoPauseSettingsDBTimeout) - defer cancel() - raw, err := s.settingRepo.GetValue(dbCtx, SettingKeyOpsAdvancedSettings) - if err == nil { - cfg := defaultOpsAdvancedSettings() - if strings.TrimSpace(raw) != "" { - if jsonErr := json.Unmarshal([]byte(raw), cfg); jsonErr == nil { - normalizeOpsAdvancedSettings(cfg) - } - } - settings = cfg.OpenAIAccountQuotaAutoPause - } else { - ttl = openAIQuotaAutoPauseSettingsErrorTTL - } - } - - storeOpenAIQuotaAutoPauseSettingsCache(s.settingRepo, &cachedOpenAIQuotaAutoPauseSettings{ - settings: settings, - expiresAt: time.Now().Add(ttl).UnixNano(), - }) - return settings, nil + s.openAIQuotaAutoPauseSettingsCache.Store(&cachedOpenAIQuotaAutoPauseSettings{ + settings: settings, + expiresAt: time.Now().Add(ttl).UnixNano(), }) +} - settings, _ := result.(OpsOpenAIAccountQuotaAutoPauseSettings) - return settings +// SetOpenAIQuotaAutoPauseSettings writes the given settings directly into the in-memory +// cache. Called from settings-write code paths so that the next read reflects the new +// value immediately, without waiting for the background refresh. +func (s *SettingService) SetOpenAIQuotaAutoPauseSettings(settings OpsOpenAIAccountQuotaAutoPauseSettings) { + if s == nil { + return + } + s.openAIQuotaAutoPauseSettingsCache.Store(&cachedOpenAIQuotaAutoPauseSettings{ + settings: settings, + expiresAt: time.Now().Add(openAIQuotaAutoPauseSettingsCacheTTL).UnixNano(), + }) } // GetRectifierSettings 获取请求整流器配置 diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index b22e10ae..d3e4ce51 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -396,6 +396,46 @@ func ProvideBackupService( return svc } +// ProvideOpsService constructs OpsService and wires the SettingService-backed quota +// auto-pause cache sink. Mirrors the SetCleanupReloader pattern: OpsService doesn't +// hold a *SettingService reference, but wire injects a tiny callback so writes to +// ops_advanced_settings immediately propagate into the scheduler hot-path cache. +func ProvideOpsService( + opsRepo OpsRepository, + settingRepo SettingRepository, + cfg *config.Config, + accountRepo AccountRepository, + userRepo UserRepository, + concurrencyService *ConcurrencyService, + gatewayService *GatewayService, + openAIGatewayService *OpenAIGatewayService, + geminiCompatService *GeminiMessagesCompatService, + antigravityGatewayService *AntigravityGatewayService, + systemLogSink *OpsSystemLogSink, + settingService *SettingService, +) *OpsService { + svc := NewOpsService( + opsRepo, + settingRepo, + cfg, + accountRepo, + userRepo, + concurrencyService, + gatewayService, + openAIGatewayService, + geminiCompatService, + antigravityGatewayService, + systemLogSink, + ) + if settingService != nil { + svc.SetOpenAIQuotaAutoPauseSettingsSink(settingService.SetOpenAIQuotaAutoPauseSettings) + // Optional warm-up so the first scheduled request after process start observes + // a populated cache rather than zero defaults. Best-effort, sync-bounded. + settingService.WarmOpenAIQuotaAutoPauseSettings(context.Background()) + } + return svc +} + // ProvideSettingService wires SettingService with group reader and proxy repo. func ProvideSettingService(settingRepo SettingRepository, groupRepo GroupRepository, proxyRepo ProxyRepository, cfg *config.Config) *SettingService { svc := NewSettingService(settingRepo, cfg) @@ -481,7 +521,7 @@ var ProviderSet = wire.NewSet( NewDataManagementService, ProvideBackupService, ProvideOpsSystemLogSink, - NewOpsService, + ProvideOpsService, ProvideOpsMetricsCollector, ProvideOpsAggregationService, ProvideOpsAlertEvaluatorService, diff --git a/frontend/src/components/account/EditAccountModal.vue b/frontend/src/components/account/EditAccountModal.vue index 470c0bfb..8dc85d0e 100644 --- a/frontend/src/components/account/EditAccountModal.vue +++ b/frontend/src/components/account/EditAccountModal.vue @@ -1791,6 +1791,28 @@ v-if="account?.platform === 'openai'" class="border-t border-gray-200 pt-4 dark:border-dark-600 space-y-4" > +
+
+ + +
+

{{ t('admin.accounts.autoPauseDisabledHint') }}

+

{{ t('admin.accounts.autoPauseThresholdHint') }}

+
+
+ + +
+

{{ t('admin.accounts.autoPauseDisabledHint') }}

+

{{ t('admin.accounts.autoPauseThresholdHint') }}

@@ -2481,6 +2527,8 @@ const interceptWarmupRequests = ref(false) const autoPauseOnExpired = ref(false) const autoPause5hThreshold = ref(null) const autoPause7dThreshold = ref(null) +const autoPause5hDisabled = ref(false) +const autoPause7dDisabled = ref(false) const mixedScheduling = ref(false) // For antigravity accounts: enable mixed scheduling const allowOverages = ref(false) // For antigravity accounts: enable AI Credits overages const antigravityModelRestrictionMode = ref<'whitelist' | 'mapping'>('whitelist') @@ -2901,6 +2949,8 @@ const syncFormFromAccount = (newAccount: Account | null) => { allowOverages.value = extra?.allow_overages === true autoPause5hThreshold.value = typeof extra?.auto_pause_5h_threshold === 'number' ? extra.auto_pause_5h_threshold * 100 : null autoPause7dThreshold.value = typeof extra?.auto_pause_7d_threshold === 'number' ? extra.auto_pause_7d_threshold * 100 : null + autoPause5hDisabled.value = extra?.auto_pause_5h_disabled === true + autoPause7dDisabled.value = extra?.auto_pause_7d_disabled === true // Load OpenAI passthrough toggle (OpenAI OAuth/API Key) openaiPassthroughEnabled.value = false @@ -4064,6 +4114,16 @@ const handleSubmit = async () => { } else { delete newExtra.auto_pause_7d_threshold } + if (autoPause5hDisabled.value) { + newExtra.auto_pause_5h_disabled = true + } else { + delete newExtra.auto_pause_5h_disabled + } + if (autoPause7dDisabled.value) { + newExtra.auto_pause_7d_disabled = true + } else { + delete newExtra.auto_pause_7d_disabled + } delete newExtra.codex_image_generation_bridge_enabled if (codexImageGenerationBridgeMode.value === 'inherit') { diff --git a/frontend/src/components/account/__tests__/EditAccountModal.spec.ts b/frontend/src/components/account/__tests__/EditAccountModal.spec.ts index 6db63831..f4865de9 100644 --- a/frontend/src/components/account/__tests__/EditAccountModal.spec.ts +++ b/frontend/src/components/account/__tests__/EditAccountModal.spec.ts @@ -352,6 +352,27 @@ describe('EditAccountModal', () => { expect(updateAccountMock.mock.calls[0]?.[1]?.extra?.auto_pause_7d_threshold).toBe(0.96) }) + it('submits OpenAI quota auto-pause disable flag in extra', async () => { + // Toggling the per-account disable flag must persist as auto_pause_5h_disabled + // so an admin can exempt one account from auto-pause even when a global default + // threshold is configured (otherwise leaving the threshold blank would silently + // fall back to the global default). + const account = buildAccount() + updateAccountMock.mockReset() + checkMixedChannelRiskMock.mockReset() + checkMixedChannelRiskMock.mockResolvedValue({ has_risk: false }) + updateAccountMock.mockResolvedValue(account) + + const wrapper = mountModal(account) + + await wrapper.get('[data-testid="auto-pause-5h-disabled"]').trigger('click') + await wrapper.get('form#edit-account-form').trigger('submit.prevent') + + expect(updateAccountMock).toHaveBeenCalledTimes(1) + expect(updateAccountMock.mock.calls[0]?.[1]?.extra?.auto_pause_5h_disabled).toBe(true) + expect(updateAccountMock.mock.calls[0]?.[1]?.extra?.auto_pause_7d_disabled).toBeUndefined() + }) + it('keeps at least one OpenAI APIKey endpoint capability selected', async () => { const account = buildAccount() updateAccountMock.mockReset() diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index 8ab90961..6735029c 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -3477,7 +3477,10 @@ export default { autoPauseOnExpiredDesc: 'When enabled, the account will auto pause scheduling after it expires', autoPause5hThreshold: '5h Usage Threshold (%)', autoPause7dThreshold: '7d Usage Threshold (%)', - autoPauseThresholdHint: 'Leave empty or set 0 to disable. Reaching the threshold only skips the account during scheduling and does not modify schedulable.', + autoPauseThresholdHint: 'Leave empty or set 0 to use the global default threshold (configured in Ops settings); set a value to override the global default. Reaching the threshold only skips the account during scheduling and does not modify schedulable.', + autoPause5hDisabled: 'Disable 5h auto-pause', + autoPause7dDisabled: 'Disable 7d auto-pause', + autoPauseDisabledHint: 'When enabled, this account is never auto-paused (even if a global default threshold is configured).', // Quota control (Anthropic OAuth/SetupToken only) quotaControl: { title: 'Quota Control', diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index 4f1d1f13..abb8dff7 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -3615,7 +3615,10 @@ export default { autoPauseOnExpiredDesc: '启用后,账号过期将自动暂停调度', autoPause5hThreshold: '5h 用量阈值(%)', autoPause7dThreshold: '7d 用量阈值(%)', - autoPauseThresholdHint: '填 0 或留空表示不启用;达到阈值后仅在调度时跳过账号,不修改 schedulable。', + autoPauseThresholdHint: '留空或填 0 表示使用全局默认阈值(在运维设置中配置);填具体值则覆盖全局默认。达到阈值后仅在调度时跳过账号,不修改 schedulable。', + autoPause5hDisabled: '禁用 5h 自动暂停', + autoPause7dDisabled: '禁用 7d 自动暂停', + autoPauseDisabledHint: '开启后该账号永不进入自动暂停(即使全局默认阈值已配置)。', // Quota control (Anthropic OAuth/SetupToken only) quotaControl: { title: '配额控制',