From c9caadb3782a4e658ce46789df092d0b15d58172 Mon Sep 17 00:00:00 2001 From: wucm667 Date: Fri, 29 May 2026 14:32:45 +0800 Subject: [PATCH] fix(account): address second-round review on quota auto-pause - TopK initial filter now drops quota-paused accounts: fold the quota check into isAccountRequestCompatible so session-hash, TopK pool, and per-candidate rechecks all skip paused accounts. Previously the candidate pool was built without the quota check, so paused accounts could fill TopK and leave the scheduler returning "no available accounts" even with healthy ones available. - Add per-account explicit disable flags auto_pause_5h_disabled / auto_pause_7d_disabled with toggles in EditAccountModal. Without these, leaving the account threshold blank silently falls back to the global default, so admins could not exempt a single account once a global default existed. Disable is per-window: an account can opt out of 5h auto-pause while still honoring 7d. Schedule snapshot whitelist includes the new fields, i18n EN/ZH updated, threshold-hint text revised to explain "blank = global default". - Move quota auto-pause settings off the request hot path: replace the per-repo TTL+singleflight sync DB read with a per-SettingService stale-while-revalidate in-memory snapshot. Get is non-blocking (atomic.Pointer load + async refresh on staleness); writes via UpdateOpsAdvancedSettings push directly into the cache through an injected sink; wire warms the cache at startup. Adds Warm (sync) for tests/init and SetOpenAIQuotaAutoPauseSettings (sink target). Co-Authored-By: Claude Opus 4.7 --- backend/cmd/server/wire_gen.go | 2 +- .../internal/repository/scheduler_cache.go | 2 + .../repository/scheduler_cache_unit_test.go | 4 + .../service/openai_account_scheduler.go | 7 + .../service/openai_account_scheduler_test.go | 136 ++++++++++++++ .../service/openai_gateway_service.go | 44 ++++- backend/internal/service/ops_service.go | 15 ++ backend/internal/service/ops_settings.go | 12 +- .../service/ops_settings_advanced_test.go | 50 ++++- backend/internal/service/setting_service.go | 176 +++++++++++------- backend/internal/service/wire.go | 42 ++++- .../components/account/EditAccountModal.vue | 60 ++++++ .../__tests__/EditAccountModal.spec.ts | 21 +++ frontend/src/i18n/locales/en.ts | 5 +- frontend/src/i18n/locales/zh.ts | 5 +- 15 files changed, 505 insertions(+), 76 deletions(-) diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 465f5e25..6e8be8fc 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -195,7 +195,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, usageBillingRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService, claudeTokenProvider, sessionLimitCache, rpmCache, digestSessionStore, settingService, tlsFingerprintProfileService, channelService, modelPricingResolver, balanceNotifyService, serviceUserPlatformQuotaRepository) geminiMessagesCompatService := service.NewGeminiMessagesCompatService(accountRepository, groupRepository, gatewayCache, schedulerSnapshotService, geminiTokenProvider, rateLimitService, httpUpstream, antigravityGatewayService, configConfig) opsSystemLogSink := service.ProvideOpsSystemLogSink(opsRepository) - opsService := service.NewOpsService(opsRepository, settingRepository, configConfig, accountRepository, userRepository, concurrencyService, gatewayService, openAIGatewayService, geminiMessagesCompatService, antigravityGatewayService, opsSystemLogSink) + opsService := service.ProvideOpsService(opsRepository, settingRepository, configConfig, accountRepository, userRepository, concurrencyService, gatewayService, openAIGatewayService, geminiMessagesCompatService, antigravityGatewayService, opsSystemLogSink, settingService) encryptionKey, err := payment.ProvideEncryptionKey(configConfig) if err != nil { return nil, err diff --git a/backend/internal/repository/scheduler_cache.go b/backend/internal/repository/scheduler_cache.go index ff3c4301..cf19deda 100644 --- a/backend/internal/repository/scheduler_cache.go +++ b/backend/internal/repository/scheduler_cache.go @@ -557,6 +557,8 @@ func filterSchedulerExtra(extra map[string]any) map[string]any { "codex_usage_updated_at", "auto_pause_5h_threshold", "auto_pause_7d_threshold", + "auto_pause_5h_disabled", + "auto_pause_7d_disabled", } filtered := make(map[string]any) for _, key := range keys { diff --git a/backend/internal/repository/scheduler_cache_unit_test.go b/backend/internal/repository/scheduler_cache_unit_test.go index 9e4ec23e..a4667591 100644 --- a/backend/internal/repository/scheduler_cache_unit_test.go +++ b/backend/internal/repository/scheduler_cache_unit_test.go @@ -89,6 +89,8 @@ func TestBuildSchedulerMetadataAccount_KeepsQuotaAutoPauseFields(t *testing.T) { "codex_usage_updated_at": "2026-05-29T09:00:00Z", "auto_pause_5h_threshold": 0.95, "auto_pause_7d_threshold": 0.96, + "auto_pause_5h_disabled": true, + "auto_pause_7d_disabled": false, }, } @@ -103,4 +105,6 @@ func TestBuildSchedulerMetadataAccount_KeepsQuotaAutoPauseFields(t *testing.T) { require.Equal(t, "2026-05-29T09:00:00Z", got.Extra["codex_usage_updated_at"]) require.Equal(t, 0.95, got.Extra["auto_pause_5h_threshold"]) require.Equal(t, 0.96, got.Extra["auto_pause_7d_threshold"]) + require.Equal(t, true, got.Extra["auto_pause_5h_disabled"]) + require.Equal(t, false, got.Extra["auto_pause_7d_disabled"]) } diff --git a/backend/internal/service/openai_account_scheduler.go b/backend/internal/service/openai_account_scheduler.go index fd28fa86..47a8142a 100644 --- a/backend/internal/service/openai_account_scheduler.go +++ b/backend/internal/service/openai_account_scheduler.go @@ -974,6 +974,13 @@ func (s *defaultOpenAIAccountScheduler) isAccountRequestCompatible(ctx context.C if s != nil && s.service != nil && s.service.isOpenAIAccountRuntimeBlocked(account) { return false } + // Quota auto-pause must be evaluated during the initial filter too. Without it the + // TopK candidate pool can be filled with paused accounts and the later fresh/DB + // rechecks won't reach healthy accounts that fell outside TopK — manifesting as + // "no available accounts" even though healthy ones exist. + if paused, _ := shouldAutoPauseOpenAIAccountByQuota(ctx, account); paused { + return false + } if req.RequestedModel != "" && !account.IsModelSupported(req.RequestedModel) { return false } diff --git a/backend/internal/service/openai_account_scheduler_test.go b/backend/internal/service/openai_account_scheduler_test.go index 531769a7..da5f0a66 100644 --- a/backend/internal/service/openai_account_scheduler_test.go +++ b/backend/internal/service/openai_account_scheduler_test.go @@ -798,6 +798,63 @@ func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_UsesGlobalDefa require.Equal(t, int64(35402), account.ID) } +// Regression: a per-account explicit-disable flag exempts the account from auto-pause +// even when a global default threshold is set. Without this, "leave threshold blank" +// silently falls back to global default and admins have no way to whitelist a single +// account. +func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_PerAccountDisableOverridesGlobalDefault(t *testing.T) { + ctx := withOpenAIQuotaAutoPauseSettings(context.Background(), OpsOpenAIAccountQuotaAutoPauseSettings{DefaultThreshold5h: 0.95}) + // Account has high usage AND no per-account threshold (would normally fall back to + // the global default and get paused), but the explicit disable flag is set. + primary := Account{ + ID: 35701, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_5h_used_percent": 99.0, + "auto_pause_5h_disabled": true, + }, + } + secondary := Account{ID: 35702, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + svc := &OpenAIGatewayService{accountRepo: schedulerTestOpenAIAccountRepo{accounts: []Account{primary, secondary}}, cfg: &config.Config{}} + + account, err := svc.SelectAccountForModelWithExclusions(ctx, nil, "", "gpt-5.1", nil) + require.NoError(t, err) + require.NotNil(t, account) + require.Equal(t, int64(35701), account.ID) +} + +// Disable is per-window: disabling only 5h must still allow 7d auto-pause to fire. +func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_PerWindowDisableScoped(t *testing.T) { + ctx := context.Background() + primary := Account{ + ID: 35801, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_5h_used_percent": 99.0, + "codex_7d_used_percent": 99.0, + "auto_pause_5h_disabled": true, + "auto_pause_7d_threshold": 0.95, + }, + } + secondary := Account{ID: 35802, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + svc := &OpenAIGatewayService{accountRepo: schedulerTestOpenAIAccountRepo{accounts: []Account{primary, secondary}}, cfg: &config.Config{}} + + account, err := svc.SelectAccountForModelWithExclusions(ctx, nil, "", "gpt-5.1", nil) + require.NoError(t, err) + require.NotNil(t, account) + require.Equal(t, int64(35802), account.ID, "7d auto-pause must still fire even though 5h is disabled") +} + func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_StaleUsageWindowResetSkipsPause(t *testing.T) { ctx := context.Background() // Usage is over threshold but the window's reset time has already passed, so the @@ -1399,6 +1456,85 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_LoadBalanceTopKFallback } } +// Regression: TopK initial filter must drop quota-auto-paused accounts. Otherwise +// the candidate pool is filled with paused accounts, healthy accounts fall outside +// TopK, and the scheduler returns "no available accounts" even though healthy ones +// exist. +func TestOpenAIGatewayService_SelectAccountWithScheduler_LoadBalanceTopKExcludesQuotaPaused(t *testing.T) { + ctx := context.Background() + groupID := int64(110) + accounts := []Account{ + { + ID: 37001, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_5h_used_percent": 96.0, + "auto_pause_5h_threshold": 0.95, + }, + }, + { + ID: 37002, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 5, + }, + } + + cfg := &config.Config{} + cfg.Gateway.OpenAIWS.LBTopK = 1 // TopK=1 makes the bug fatal: paused account would crowd out the healthy one entirely + cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Priority = 0.4 + cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Load = 1.0 + cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Queue = 1.0 + + concurrencyCache := schedulerTestConcurrencyCache{ + loadMap: map[int64]*AccountLoadInfo{ + 37001: {AccountID: 37001, LoadRate: 5, WaitingCount: 0}, + 37002: {AccountID: 37002, LoadRate: 5, WaitingCount: 0}, + }, + acquireResults: map[int64]bool{ + 37002: true, + }, + } + + svc := &OpenAIGatewayService{ + accountRepo: schedulerTestOpenAIAccountRepo{accounts: accounts}, + cache: &schedulerTestGatewayCache{}, + cfg: cfg, + rateLimitService: newOpenAIAdvancedSchedulerRateLimitService("true"), + concurrencyService: NewConcurrencyService(concurrencyCache), + } + + selection, decision, err := svc.SelectAccountWithScheduler( + ctx, + &groupID, + "", + "", + "gpt-5.1", + nil, + OpenAIUpstreamTransportAny, + false, + ) + require.NoError(t, err) + require.NotNil(t, selection) + require.NotNil(t, selection.Account) + require.Equal(t, int64(37002), selection.Account.ID) + require.Equal(t, openAIAccountScheduleLayerLoadBalance, decision.Layer) + // Only the healthy account should ever enter the candidate pool; the paused one + // must be filtered out at the initial-filter stage. + require.Equal(t, 1, decision.CandidateCount) + if selection.ReleaseFunc != nil { + selection.ReleaseFunc() + } +} + func TestOpenAIGatewayService_OpenAIAccountSchedulerMetrics(t *testing.T) { ctx := context.Background() groupID := int64(12) diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index e2534cc2..b1ae6a9a 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -1360,14 +1360,21 @@ func shouldAutoPauseOpenAIAccountByQuota(ctx context.Context, account *Account) if account == nil || !account.IsOpenAI() { return false, openAIQuotaAutoPauseDecision{} } + // Per-account explicit-disable flags must take precedence over the global default. + // Without these, leaving the account threshold blank means "use global default", + // so an admin has no way to exempt a single account from auto-pause once a global + // default exists. The disable flag is per-window so an account can opt out of + // only 5h or only 7d auto-pause. + disabled5h := resolveAccountExtraBool(account.Extra, "auto_pause_5h_disabled") + disabled7d := resolveAccountExtraBool(account.Extra, "auto_pause_7d_disabled") threshold5h, threshold7d := resolveOpenAIQuotaAutoPauseThresholds(ctx, account) now := time.Now() - if threshold5h > 0 { + if !disabled5h && threshold5h > 0 { if utilization, ok := resolveOpenAIQuotaUtilization(account.Extra, "5h", now); ok && utilization >= threshold5h { return true, openAIQuotaAutoPauseDecision{window: "5h", threshold: threshold5h, utilization: utilization} } } - if threshold7d > 0 { + if !disabled7d && threshold7d > 0 { if utilization, ok := resolveOpenAIQuotaUtilization(account.Extra, "7d", now); ok && utilization >= threshold7d { return true, openAIQuotaAutoPauseDecision{window: "7d", threshold: threshold7d, utilization: utilization} } @@ -1375,6 +1382,39 @@ func shouldAutoPauseOpenAIAccountByQuota(ctx context.Context, account *Account) return false, openAIQuotaAutoPauseDecision{} } +// resolveAccountExtraBool reads a bool-like value from account extra, tolerating +// the few shapes JSON unmarshalling may produce (real bool, "true"/"false" +// strings, 0/1 numbers). +func resolveAccountExtraBool(extra map[string]any, key string) bool { + if len(extra) == 0 { + return false + } + value, ok := extra[key] + if !ok || value == nil { + return false + } + switch v := value.(type) { + case bool: + return v + case string: + parsed, err := strconv.ParseBool(strings.TrimSpace(v)) + return err == nil && parsed + case float64: + return v != 0 + case float32: + return v != 0 + case int: + return v != 0 + case int64: + return v != 0 + case json.Number: + if i, err := v.Int64(); err == nil { + return i != 0 + } + } + return false +} + func resolveOpenAIQuotaAutoPauseThresholds(ctx context.Context, account *Account) (float64, float64) { threshold5h, _ := resolveAccountExtraNumber(account.Extra, "auto_pause_5h_threshold") threshold7d, _ := resolveAccountExtraNumber(account.Extra, "auto_pause_7d_threshold") diff --git a/backend/internal/service/ops_service.go b/backend/internal/service/ops_service.go index 1cea72fa..2d7c5bd4 100644 --- a/backend/internal/service/ops_service.go +++ b/backend/internal/service/ops_service.go @@ -41,6 +41,11 @@ type OpsService struct { // cleanupReloader 由 wire 在 OpsCleanupService 构造完成后通过 SetCleanupReloader 注入。 // 解耦避免 OpsService -> OpsCleanupService 的硬依赖(cleanup 也读 settings,会循环)。 cleanupReloader CleanupReloader + + // quotaAutoPauseSink 由 wire 注入(通常是 SettingService.SetOpenAIQuotaAutoPauseSettings)。 + // UpdateOpsAdvancedSettings 写入新配置后调用,把最新的 quota auto-pause 全局默认阈值 + // 立即同步到调度热路径读取的内存缓存,避免下次请求才能感知新值。 + quotaAutoPauseSink func(OpsOpenAIAccountQuotaAutoPauseSettings) } // CleanupReloader 由 OpsCleanupService 实现。 @@ -57,6 +62,16 @@ func (s *OpsService) SetCleanupReloader(r CleanupReloader) { s.cleanupReloader = r } +// SetOpenAIQuotaAutoPauseSettingsSink 由 wire 注入,把最新的 quota auto-pause 全局默认 +// 阈值 push 到调度热路径读取的内存缓存。同 SetCleanupReloader 的解耦目的:避免 OpsService +// 持有 *SettingService 引入循环依赖。 +func (s *OpsService) SetOpenAIQuotaAutoPauseSettingsSink(sink func(OpsOpenAIAccountQuotaAutoPauseSettings)) { + if s == nil { + return + } + s.quotaAutoPauseSink = sink +} + func NewOpsService( opsRepo OpsRepository, settingRepo SettingRepository, diff --git a/backend/internal/service/ops_settings.go b/backend/internal/service/ops_settings.go index 23e92e5a..472f4e32 100644 --- a/backend/internal/service/ops_settings.go +++ b/backend/internal/service/ops_settings.go @@ -490,12 +490,12 @@ func (s *OpsService) UpdateOpsAdvancedSettings(ctx context.Context, cfg *OpsAdva if err := s.settingRepo.Set(ctx, SettingKeyOpsAdvancedSettings, string(raw)); err != nil { return nil, err } - cacheKey := openAIQuotaAutoPauseSettingsCacheKey(s.settingRepo) - openAIQuotaAutoPauseSettingsSF.Forget(cacheKey) - storeOpenAIQuotaAutoPauseSettingsCache(s.settingRepo, &cachedOpenAIQuotaAutoPauseSettings{ - settings: cfg.OpenAIAccountQuotaAutoPause, - expiresAt: time.Now().Add(openAIQuotaAutoPauseSettingsCacheTTL).UnixNano(), - }) + // Push the new quota auto-pause settings straight into the in-memory cache that + // the OpenAI scheduling hot path reads, so the next request observes the new value + // without waiting for the background refresher's TTL. + if s.quotaAutoPauseSink != nil { + s.quotaAutoPauseSink(cfg.OpenAIAccountQuotaAutoPause) + } // notify cleanup service to reload schedule/enabled. if s.cleanupReloader != nil { diff --git a/backend/internal/service/ops_settings_advanced_test.go b/backend/internal/service/ops_settings_advanced_test.go index d8598fe0..62803f94 100644 --- a/backend/internal/service/ops_settings_advanced_test.go +++ b/backend/internal/service/ops_settings_advanced_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "testing" + "time" "github.com/Wei-Shaw/sub2api/internal/config" ) @@ -103,11 +104,58 @@ func TestGetOpenAIQuotaAutoPauseSettings_ReadsDefaultsFromOpsAdvancedSettings(t repo.values[SettingKeyOpsAdvancedSettings] = `{"openai_account_quota_auto_pause":{"default_threshold_5h":0.95,"default_threshold_7d":0.9}}` svc := NewSettingService(repo, &config.Config{}) - settings := svc.GetOpenAIQuotaAutoPauseSettings(context.Background()) + // Warm the in-memory cache synchronously so the assertion below is deterministic. + // GetOpenAIQuotaAutoPauseSettings is non-blocking on the hot path (returns the + // cached value, refreshes asynchronously); for tests and startup, Warm is the + // synchronous entry point that guarantees a populated cache. + settings := svc.WarmOpenAIQuotaAutoPauseSettings(context.Background()) if settings.DefaultThreshold5h != 0.95 { t.Fatalf("DefaultThreshold5h = %v, want 0.95", settings.DefaultThreshold5h) } if settings.DefaultThreshold7d != 0.9 { t.Fatalf("DefaultThreshold7d = %v, want 0.9", settings.DefaultThreshold7d) } + + // Subsequent Get must hit the warm cache and return the same value without any DB + // access — that's the hot-path invariant. + cached := svc.GetOpenAIQuotaAutoPauseSettings(context.Background()) + if cached.DefaultThreshold5h != 0.95 || cached.DefaultThreshold7d != 0.9 { + t.Fatalf("cached read = %+v, want {0.95, 0.9}", cached) + } +} + +// Hot-path invariant: a Get with cold cache must return immediately (zero defaults) +// rather than blocking on the DB. The async refresher will populate the cache for +// subsequent calls. +func TestGetOpenAIQuotaAutoPauseSettings_ColdCacheNonBlocking(t *testing.T) { + repo := newRuntimeSettingRepoStub() + repo.values[SettingKeyOpsAdvancedSettings] = `{"openai_account_quota_auto_pause":{"default_threshold_5h":0.7}}` + svc := NewSettingService(repo, &config.Config{}) + + start := time.Now() + settings := svc.GetOpenAIQuotaAutoPauseSettings(context.Background()) + elapsed := time.Since(start) + if elapsed > 50*time.Millisecond { + t.Fatalf("cold-cache Get must be non-blocking, took %v", elapsed) + } + // Cold cache means we get zero defaults (the async refresh hasn't completed yet). + if settings.DefaultThreshold5h != 0 || settings.DefaultThreshold7d != 0 { + t.Fatalf("cold-cache Get = %+v, want zeroes", settings) + } +} + +// Explicit cache write (e.g. from UpdateOpsAdvancedSettings) must be visible on the +// very next read without any DB roundtrip. +func TestSetOpenAIQuotaAutoPauseSettings_VisibleImmediately(t *testing.T) { + svc := NewSettingService(newRuntimeSettingRepoStub(), &config.Config{}) + + svc.SetOpenAIQuotaAutoPauseSettings(OpsOpenAIAccountQuotaAutoPauseSettings{ + DefaultThreshold5h: 0.88, + DefaultThreshold7d: 0.77, + }) + + got := svc.GetOpenAIQuotaAutoPauseSettings(context.Background()) + if got.DefaultThreshold5h != 0.88 || got.DefaultThreshold7d != 0.77 { + t.Fatalf("after Set, Get = %+v, want {0.88, 0.77}", got) + } } diff --git a/backend/internal/service/setting_service.go b/backend/internal/service/setting_service.go index 40e98b88..98acdb80 100644 --- a/backend/internal/service/setting_service.go +++ b/backend/internal/service/setting_service.go @@ -14,7 +14,6 @@ import ( "sort" "strconv" "strings" - "sync" "sync/atomic" "time" @@ -162,28 +161,7 @@ const openAIQuotaAutoPauseSettingsCacheTTL = 60 * time.Second const openAIQuotaAutoPauseSettingsErrorTTL = 5 * time.Second const openAIQuotaAutoPauseSettingsDBTimeout = 5 * time.Second -var openAIQuotaAutoPauseSettingsCache sync.Map // map[string]*cachedOpenAIQuotaAutoPauseSettings -var openAIQuotaAutoPauseSettingsSF singleflight.Group - -func openAIQuotaAutoPauseSettingsCacheKey(repo SettingRepository) string { - if repo == nil { - return "nil" - } - return fmt.Sprintf("%T:%p", repo, repo) -} - -func loadOpenAIQuotaAutoPauseSettingsCache(repo SettingRepository) (*cachedOpenAIQuotaAutoPauseSettings, bool) { - value, ok := openAIQuotaAutoPauseSettingsCache.Load(openAIQuotaAutoPauseSettingsCacheKey(repo)) - if !ok || value == nil { - return nil, false - } - cached, ok := value.(*cachedOpenAIQuotaAutoPauseSettings) - return cached, ok && cached != nil -} - -func storeOpenAIQuotaAutoPauseSettingsCache(repo SettingRepository, cached *cachedOpenAIQuotaAutoPauseSettings) { - openAIQuotaAutoPauseSettingsCache.Store(openAIQuotaAutoPauseSettingsCacheKey(repo), cached) -} +const openAIQuotaAutoPauseSettingsRefreshKey = "openai_quota_auto_pause_settings" // DefaultSubscriptionGroupReader validates group references used by default subscriptions. type DefaultSubscriptionGroupReader interface { @@ -209,6 +187,15 @@ type SettingService struct { openAICodexUASF singleflight.Group openAIAllowCodexPluginCache atomic.Value // *cachedOpenAIAllowCodexPlugin openAIAllowCodexPluginSF singleflight.Group + + // openAIQuotaAutoPauseSettingsCache holds the most recently observed quota auto-pause + // settings. GetOpenAIQuotaAutoPauseSettings reads this atomic.Value on the request hot + // path without ever blocking on the DB; when the cached entry expires, a background + // goroutine refreshes it via openAIQuotaAutoPauseSettingsSF (stale-while-revalidate). + // This per-service field also gives tests natural isolation — each SettingService + // instance owns its own cache, no shared package-level state. + openAIQuotaAutoPauseSettingsCache atomic.Value // *cachedOpenAIQuotaAutoPauseSettings + openAIQuotaAutoPauseSettingsSF singleflight.Group } // DefaultPlatformQuotaSetting 单 platform 三档限额(nil = 沿用上层;0 = 显式禁用;>0 = 上限) @@ -2060,9 +2047,17 @@ func (s *SettingService) refreshCachedSettings(settings *SystemSettings) { enabled: settings.OpenAIAdvancedSchedulerEnabled, expiresAt: time.Now().Add(openAIAdvancedSchedulerSettingCacheTTL).UnixNano(), }) - cacheKey := openAIQuotaAutoPauseSettingsCacheKey(s.settingRepo) - openAIQuotaAutoPauseSettingsSF.Forget(cacheKey) - openAIQuotaAutoPauseSettingsCache.Delete(cacheKey) + // Invalidate the quota auto-pause cache and let the next read trigger a fresh load. + // We can't know from here whether ops_advanced_settings was also touched, so be + // defensive: store an expired entry — GetOpenAIQuotaAutoPauseSettings will serve + // stale and kick off an async refresh, never blocking the request that follows. + s.openAIQuotaAutoPauseSettingsSF.Forget(openAIQuotaAutoPauseSettingsRefreshKey) + if cached, _ := s.openAIQuotaAutoPauseSettingsCache.Load().(*cachedOpenAIQuotaAutoPauseSettings); cached != nil { + s.openAIQuotaAutoPauseSettingsCache.Store(&cachedOpenAIQuotaAutoPauseSettings{ + settings: cached.settings, + expiresAt: 0, + }) + } if s.cfg != nil { s.cfg.SetTrustForwardedIPForAPIKeyACL(settings.APIKeyACLTrustForwardedIP) } @@ -4484,49 +4479,104 @@ func (s *SettingService) GetClaudeCodeVersionBounds(ctx context.Context) (min, m return b.min, b.max } +// GetOpenAIQuotaAutoPauseSettings returns the current global default quota auto-pause +// settings. It is invoked on the OpenAI scheduling hot path (once per request) and is +// therefore designed to never block on the DB: +// +// - Fresh cached value → returned immediately. +// - Stale or empty cache → the last known value is returned, and a background +// goroutine refreshes the cache via singleflight (stale-while-revalidate). +// - First call with no cache yet → zero defaults are returned and the same async +// refresh is kicked off; the next call gets the freshly populated value. +// +// Callers that need the freshly persisted value synchronously (tests, post-update +// confirmation, optional startup warm-up) should call WarmOpenAIQuotaAutoPauseSettings. func (s *SettingService) GetOpenAIQuotaAutoPauseSettings(ctx context.Context) OpsOpenAIAccountQuotaAutoPauseSettings { - if cached, ok := loadOpenAIQuotaAutoPauseSettingsCache(s.settingRepo); ok { - if time.Now().UnixNano() < cached.expiresAt { - return cached.settings + if s == nil { + return OpsOpenAIAccountQuotaAutoPauseSettings{} + } + cached, _ := s.openAIQuotaAutoPauseSettingsCache.Load().(*cachedOpenAIQuotaAutoPauseSettings) + now := time.Now().UnixNano() + if cached != nil && now < cached.expiresAt { + return cached.settings + } + // Stale or unset: trigger background refresh without blocking this request. + // singleflight.DoChan dedupes concurrent refreshes; we deliberately ignore the + // returned channel — the result is observable via the atomic cache. + s.openAIQuotaAutoPauseSettingsSF.DoChan(openAIQuotaAutoPauseSettingsRefreshKey, func() (any, error) { + s.refreshOpenAIQuotaAutoPauseSettings(context.Background()) + return nil, nil + }) + if cached != nil { + return cached.settings // serve stale value while revalidating + } + return OpsOpenAIAccountQuotaAutoPauseSettings{} +} + +// WarmOpenAIQuotaAutoPauseSettings synchronously loads the quota auto-pause settings +// into the in-memory cache. Useful for application startup (so the first request hits +// a warm cache) and for tests that need deterministic reads immediately after +// constructing the service. +func (s *SettingService) WarmOpenAIQuotaAutoPauseSettings(ctx context.Context) OpsOpenAIAccountQuotaAutoPauseSettings { + if s == nil { + return OpsOpenAIAccountQuotaAutoPauseSettings{} + } + s.refreshOpenAIQuotaAutoPauseSettings(ctx) + cached, _ := s.openAIQuotaAutoPauseSettingsCache.Load().(*cachedOpenAIQuotaAutoPauseSettings) + if cached == nil { + return OpsOpenAIAccountQuotaAutoPauseSettings{} + } + return cached.settings +} + +// refreshOpenAIQuotaAutoPauseSettings reads the latest settings from the DB and stores +// them into the in-memory cache. On error it stores the prior value (or zero defaults +// if nothing is cached yet) with the shorter error TTL so the next refresh comes +// sooner. Always uses its own timeout-bounded context to keep refresh latency +// predictable regardless of the caller. +func (s *SettingService) refreshOpenAIQuotaAutoPauseSettings(ctx context.Context) { + if s == nil || s.settingRepo == nil { + return + } + dbCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), openAIQuotaAutoPauseSettingsDBTimeout) + defer cancel() + + settings := OpsOpenAIAccountQuotaAutoPauseSettings{} + ttl := openAIQuotaAutoPauseSettingsCacheTTL + raw, err := s.settingRepo.GetValue(dbCtx, SettingKeyOpsAdvancedSettings) + if err == nil { + cfg := defaultOpsAdvancedSettings() + if strings.TrimSpace(raw) != "" { + if jsonErr := json.Unmarshal([]byte(raw), cfg); jsonErr == nil { + normalizeOpsAdvancedSettings(cfg) + } } + settings = cfg.OpenAIAccountQuotaAutoPause + } else if !errors.Is(err, ErrSettingNotFound) { + // Real error: keep serving prior value but refresh sooner. + if prior, _ := s.openAIQuotaAutoPauseSettingsCache.Load().(*cachedOpenAIQuotaAutoPauseSettings); prior != nil { + settings = prior.settings + } + ttl = openAIQuotaAutoPauseSettingsErrorTTL } - cacheKey := openAIQuotaAutoPauseSettingsCacheKey(s.settingRepo) - result, _, _ := openAIQuotaAutoPauseSettingsSF.Do(cacheKey, func() (any, error) { - if cached, ok := loadOpenAIQuotaAutoPauseSettingsCache(s.settingRepo); ok { - if time.Now().UnixNano() < cached.expiresAt { - return cached.settings, nil - } - } - - settings := OpsOpenAIAccountQuotaAutoPauseSettings{} - ttl := openAIQuotaAutoPauseSettingsCacheTTL - if s != nil && s.settingRepo != nil { - dbCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), openAIQuotaAutoPauseSettingsDBTimeout) - defer cancel() - raw, err := s.settingRepo.GetValue(dbCtx, SettingKeyOpsAdvancedSettings) - if err == nil { - cfg := defaultOpsAdvancedSettings() - if strings.TrimSpace(raw) != "" { - if jsonErr := json.Unmarshal([]byte(raw), cfg); jsonErr == nil { - normalizeOpsAdvancedSettings(cfg) - } - } - settings = cfg.OpenAIAccountQuotaAutoPause - } else { - ttl = openAIQuotaAutoPauseSettingsErrorTTL - } - } - - storeOpenAIQuotaAutoPauseSettingsCache(s.settingRepo, &cachedOpenAIQuotaAutoPauseSettings{ - settings: settings, - expiresAt: time.Now().Add(ttl).UnixNano(), - }) - return settings, nil + s.openAIQuotaAutoPauseSettingsCache.Store(&cachedOpenAIQuotaAutoPauseSettings{ + settings: settings, + expiresAt: time.Now().Add(ttl).UnixNano(), }) +} - settings, _ := result.(OpsOpenAIAccountQuotaAutoPauseSettings) - return settings +// SetOpenAIQuotaAutoPauseSettings writes the given settings directly into the in-memory +// cache. Called from settings-write code paths so that the next read reflects the new +// value immediately, without waiting for the background refresh. +func (s *SettingService) SetOpenAIQuotaAutoPauseSettings(settings OpsOpenAIAccountQuotaAutoPauseSettings) { + if s == nil { + return + } + s.openAIQuotaAutoPauseSettingsCache.Store(&cachedOpenAIQuotaAutoPauseSettings{ + settings: settings, + expiresAt: time.Now().Add(openAIQuotaAutoPauseSettingsCacheTTL).UnixNano(), + }) } // GetRectifierSettings 获取请求整流器配置 diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index b22e10ae..d3e4ce51 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -396,6 +396,46 @@ func ProvideBackupService( return svc } +// ProvideOpsService constructs OpsService and wires the SettingService-backed quota +// auto-pause cache sink. Mirrors the SetCleanupReloader pattern: OpsService doesn't +// hold a *SettingService reference, but wire injects a tiny callback so writes to +// ops_advanced_settings immediately propagate into the scheduler hot-path cache. +func ProvideOpsService( + opsRepo OpsRepository, + settingRepo SettingRepository, + cfg *config.Config, + accountRepo AccountRepository, + userRepo UserRepository, + concurrencyService *ConcurrencyService, + gatewayService *GatewayService, + openAIGatewayService *OpenAIGatewayService, + geminiCompatService *GeminiMessagesCompatService, + antigravityGatewayService *AntigravityGatewayService, + systemLogSink *OpsSystemLogSink, + settingService *SettingService, +) *OpsService { + svc := NewOpsService( + opsRepo, + settingRepo, + cfg, + accountRepo, + userRepo, + concurrencyService, + gatewayService, + openAIGatewayService, + geminiCompatService, + antigravityGatewayService, + systemLogSink, + ) + if settingService != nil { + svc.SetOpenAIQuotaAutoPauseSettingsSink(settingService.SetOpenAIQuotaAutoPauseSettings) + // Optional warm-up so the first scheduled request after process start observes + // a populated cache rather than zero defaults. Best-effort, sync-bounded. + settingService.WarmOpenAIQuotaAutoPauseSettings(context.Background()) + } + return svc +} + // ProvideSettingService wires SettingService with group reader and proxy repo. func ProvideSettingService(settingRepo SettingRepository, groupRepo GroupRepository, proxyRepo ProxyRepository, cfg *config.Config) *SettingService { svc := NewSettingService(settingRepo, cfg) @@ -481,7 +521,7 @@ var ProviderSet = wire.NewSet( NewDataManagementService, ProvideBackupService, ProvideOpsSystemLogSink, - NewOpsService, + ProvideOpsService, ProvideOpsMetricsCollector, ProvideOpsAggregationService, ProvideOpsAlertEvaluatorService, diff --git a/frontend/src/components/account/EditAccountModal.vue b/frontend/src/components/account/EditAccountModal.vue index 470c0bfb..8dc85d0e 100644 --- a/frontend/src/components/account/EditAccountModal.vue +++ b/frontend/src/components/account/EditAccountModal.vue @@ -1791,6 +1791,28 @@ v-if="account?.platform === 'openai'" class="border-t border-gray-200 pt-4 dark:border-dark-600 space-y-4" > +
+
+ + +
+

{{ t('admin.accounts.autoPauseDisabledHint') }}

+

{{ t('admin.accounts.autoPauseThresholdHint') }}

+
+
+ + +
+

{{ t('admin.accounts.autoPauseDisabledHint') }}

+

{{ t('admin.accounts.autoPauseThresholdHint') }}

@@ -2481,6 +2527,8 @@ const interceptWarmupRequests = ref(false) const autoPauseOnExpired = ref(false) const autoPause5hThreshold = ref(null) const autoPause7dThreshold = ref(null) +const autoPause5hDisabled = ref(false) +const autoPause7dDisabled = ref(false) const mixedScheduling = ref(false) // For antigravity accounts: enable mixed scheduling const allowOverages = ref(false) // For antigravity accounts: enable AI Credits overages const antigravityModelRestrictionMode = ref<'whitelist' | 'mapping'>('whitelist') @@ -2901,6 +2949,8 @@ const syncFormFromAccount = (newAccount: Account | null) => { allowOverages.value = extra?.allow_overages === true autoPause5hThreshold.value = typeof extra?.auto_pause_5h_threshold === 'number' ? extra.auto_pause_5h_threshold * 100 : null autoPause7dThreshold.value = typeof extra?.auto_pause_7d_threshold === 'number' ? extra.auto_pause_7d_threshold * 100 : null + autoPause5hDisabled.value = extra?.auto_pause_5h_disabled === true + autoPause7dDisabled.value = extra?.auto_pause_7d_disabled === true // Load OpenAI passthrough toggle (OpenAI OAuth/API Key) openaiPassthroughEnabled.value = false @@ -4064,6 +4114,16 @@ const handleSubmit = async () => { } else { delete newExtra.auto_pause_7d_threshold } + if (autoPause5hDisabled.value) { + newExtra.auto_pause_5h_disabled = true + } else { + delete newExtra.auto_pause_5h_disabled + } + if (autoPause7dDisabled.value) { + newExtra.auto_pause_7d_disabled = true + } else { + delete newExtra.auto_pause_7d_disabled + } delete newExtra.codex_image_generation_bridge_enabled if (codexImageGenerationBridgeMode.value === 'inherit') { diff --git a/frontend/src/components/account/__tests__/EditAccountModal.spec.ts b/frontend/src/components/account/__tests__/EditAccountModal.spec.ts index 6db63831..f4865de9 100644 --- a/frontend/src/components/account/__tests__/EditAccountModal.spec.ts +++ b/frontend/src/components/account/__tests__/EditAccountModal.spec.ts @@ -352,6 +352,27 @@ describe('EditAccountModal', () => { expect(updateAccountMock.mock.calls[0]?.[1]?.extra?.auto_pause_7d_threshold).toBe(0.96) }) + it('submits OpenAI quota auto-pause disable flag in extra', async () => { + // Toggling the per-account disable flag must persist as auto_pause_5h_disabled + // so an admin can exempt one account from auto-pause even when a global default + // threshold is configured (otherwise leaving the threshold blank would silently + // fall back to the global default). + const account = buildAccount() + updateAccountMock.mockReset() + checkMixedChannelRiskMock.mockReset() + checkMixedChannelRiskMock.mockResolvedValue({ has_risk: false }) + updateAccountMock.mockResolvedValue(account) + + const wrapper = mountModal(account) + + await wrapper.get('[data-testid="auto-pause-5h-disabled"]').trigger('click') + await wrapper.get('form#edit-account-form').trigger('submit.prevent') + + expect(updateAccountMock).toHaveBeenCalledTimes(1) + expect(updateAccountMock.mock.calls[0]?.[1]?.extra?.auto_pause_5h_disabled).toBe(true) + expect(updateAccountMock.mock.calls[0]?.[1]?.extra?.auto_pause_7d_disabled).toBeUndefined() + }) + it('keeps at least one OpenAI APIKey endpoint capability selected', async () => { const account = buildAccount() updateAccountMock.mockReset() diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index 8ab90961..6735029c 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -3477,7 +3477,10 @@ export default { autoPauseOnExpiredDesc: 'When enabled, the account will auto pause scheduling after it expires', autoPause5hThreshold: '5h Usage Threshold (%)', autoPause7dThreshold: '7d Usage Threshold (%)', - autoPauseThresholdHint: 'Leave empty or set 0 to disable. Reaching the threshold only skips the account during scheduling and does not modify schedulable.', + autoPauseThresholdHint: 'Leave empty or set 0 to use the global default threshold (configured in Ops settings); set a value to override the global default. Reaching the threshold only skips the account during scheduling and does not modify schedulable.', + autoPause5hDisabled: 'Disable 5h auto-pause', + autoPause7dDisabled: 'Disable 7d auto-pause', + autoPauseDisabledHint: 'When enabled, this account is never auto-paused (even if a global default threshold is configured).', // Quota control (Anthropic OAuth/SetupToken only) quotaControl: { title: 'Quota Control', diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index 4f1d1f13..abb8dff7 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -3615,7 +3615,10 @@ export default { autoPauseOnExpiredDesc: '启用后,账号过期将自动暂停调度', autoPause5hThreshold: '5h 用量阈值(%)', autoPause7dThreshold: '7d 用量阈值(%)', - autoPauseThresholdHint: '填 0 或留空表示不启用;达到阈值后仅在调度时跳过账号,不修改 schedulable。', + autoPauseThresholdHint: '留空或填 0 表示使用全局默认阈值(在运维设置中配置);填具体值则覆盖全局默认。达到阈值后仅在调度时跳过账号,不修改 schedulable。', + autoPause5hDisabled: '禁用 5h 自动暂停', + autoPause7dDisabled: '禁用 7d 自动暂停', + autoPauseDisabledHint: '开启后该账号永不进入自动暂停(即使全局默认阈值已配置)。', // Quota control (Anthropic OAuth/SetupToken only) quotaControl: { title: '配额控制',