diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 465f5e25..6e8be8fc 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -195,7 +195,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, usageBillingRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService, claudeTokenProvider, sessionLimitCache, rpmCache, digestSessionStore, settingService, tlsFingerprintProfileService, channelService, modelPricingResolver, balanceNotifyService, serviceUserPlatformQuotaRepository) geminiMessagesCompatService := service.NewGeminiMessagesCompatService(accountRepository, groupRepository, gatewayCache, schedulerSnapshotService, geminiTokenProvider, rateLimitService, httpUpstream, antigravityGatewayService, configConfig) opsSystemLogSink := service.ProvideOpsSystemLogSink(opsRepository) - opsService := service.NewOpsService(opsRepository, settingRepository, configConfig, accountRepository, userRepository, concurrencyService, gatewayService, openAIGatewayService, geminiMessagesCompatService, antigravityGatewayService, opsSystemLogSink) + opsService := service.ProvideOpsService(opsRepository, settingRepository, configConfig, accountRepository, userRepository, concurrencyService, gatewayService, openAIGatewayService, geminiMessagesCompatService, antigravityGatewayService, opsSystemLogSink, settingService) encryptionKey, err := payment.ProvideEncryptionKey(configConfig) if err != nil { return nil, err diff --git a/backend/internal/repository/scheduler_cache.go b/backend/internal/repository/scheduler_cache.go index ff3c4301..cf19deda 100644 --- a/backend/internal/repository/scheduler_cache.go +++ b/backend/internal/repository/scheduler_cache.go @@ -557,6 +557,8 @@ func filterSchedulerExtra(extra map[string]any) map[string]any { "codex_usage_updated_at", "auto_pause_5h_threshold", "auto_pause_7d_threshold", + "auto_pause_5h_disabled", + "auto_pause_7d_disabled", } filtered := make(map[string]any) for _, key := range keys { diff --git a/backend/internal/repository/scheduler_cache_unit_test.go b/backend/internal/repository/scheduler_cache_unit_test.go index 9e4ec23e..a4667591 100644 --- a/backend/internal/repository/scheduler_cache_unit_test.go +++ b/backend/internal/repository/scheduler_cache_unit_test.go @@ -89,6 +89,8 @@ func TestBuildSchedulerMetadataAccount_KeepsQuotaAutoPauseFields(t *testing.T) { "codex_usage_updated_at": "2026-05-29T09:00:00Z", "auto_pause_5h_threshold": 0.95, "auto_pause_7d_threshold": 0.96, + "auto_pause_5h_disabled": true, + "auto_pause_7d_disabled": false, }, } @@ -103,4 +105,6 @@ func TestBuildSchedulerMetadataAccount_KeepsQuotaAutoPauseFields(t *testing.T) { require.Equal(t, "2026-05-29T09:00:00Z", got.Extra["codex_usage_updated_at"]) require.Equal(t, 0.95, got.Extra["auto_pause_5h_threshold"]) require.Equal(t, 0.96, got.Extra["auto_pause_7d_threshold"]) + require.Equal(t, true, got.Extra["auto_pause_5h_disabled"]) + require.Equal(t, false, got.Extra["auto_pause_7d_disabled"]) } diff --git a/backend/internal/service/openai_account_scheduler.go b/backend/internal/service/openai_account_scheduler.go index fd28fa86..47a8142a 100644 --- a/backend/internal/service/openai_account_scheduler.go +++ b/backend/internal/service/openai_account_scheduler.go @@ -974,6 +974,13 @@ func (s *defaultOpenAIAccountScheduler) isAccountRequestCompatible(ctx context.C if s != nil && s.service != nil && s.service.isOpenAIAccountRuntimeBlocked(account) { return false } + // Quota auto-pause must be evaluated during the initial filter too. Without it the + // TopK candidate pool can be filled with paused accounts and the later fresh/DB + // rechecks won't reach healthy accounts that fell outside TopK — manifesting as + // "no available accounts" even though healthy ones exist. + if paused, _ := shouldAutoPauseOpenAIAccountByQuota(ctx, account); paused { + return false + } if req.RequestedModel != "" && !account.IsModelSupported(req.RequestedModel) { return false } diff --git a/backend/internal/service/openai_account_scheduler_test.go b/backend/internal/service/openai_account_scheduler_test.go index 531769a7..da5f0a66 100644 --- a/backend/internal/service/openai_account_scheduler_test.go +++ b/backend/internal/service/openai_account_scheduler_test.go @@ -798,6 +798,63 @@ func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_UsesGlobalDefa require.Equal(t, int64(35402), account.ID) } +// Regression: a per-account explicit-disable flag exempts the account from auto-pause +// even when a global default threshold is set. Without this, "leave threshold blank" +// silently falls back to global default and admins have no way to whitelist a single +// account. +func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_PerAccountDisableOverridesGlobalDefault(t *testing.T) { + ctx := withOpenAIQuotaAutoPauseSettings(context.Background(), OpsOpenAIAccountQuotaAutoPauseSettings{DefaultThreshold5h: 0.95}) + // Account has high usage AND no per-account threshold (would normally fall back to + // the global default and get paused), but the explicit disable flag is set. + primary := Account{ + ID: 35701, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_5h_used_percent": 99.0, + "auto_pause_5h_disabled": true, + }, + } + secondary := Account{ID: 35702, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + svc := &OpenAIGatewayService{accountRepo: schedulerTestOpenAIAccountRepo{accounts: []Account{primary, secondary}}, cfg: &config.Config{}} + + account, err := svc.SelectAccountForModelWithExclusions(ctx, nil, "", "gpt-5.1", nil) + require.NoError(t, err) + require.NotNil(t, account) + require.Equal(t, int64(35701), account.ID) +} + +// Disable is per-window: disabling only 5h must still allow 7d auto-pause to fire. +func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_PerWindowDisableScoped(t *testing.T) { + ctx := context.Background() + primary := Account{ + ID: 35801, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_5h_used_percent": 99.0, + "codex_7d_used_percent": 99.0, + "auto_pause_5h_disabled": true, + "auto_pause_7d_threshold": 0.95, + }, + } + secondary := Account{ID: 35802, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 5} + svc := &OpenAIGatewayService{accountRepo: schedulerTestOpenAIAccountRepo{accounts: []Account{primary, secondary}}, cfg: &config.Config{}} + + account, err := svc.SelectAccountForModelWithExclusions(ctx, nil, "", "gpt-5.1", nil) + require.NoError(t, err) + require.NotNil(t, account) + require.Equal(t, int64(35802), account.ID, "7d auto-pause must still fire even though 5h is disabled") +} + func TestOpenAIGatewayService_SelectAccountForModelWithExclusions_StaleUsageWindowResetSkipsPause(t *testing.T) { ctx := context.Background() // Usage is over threshold but the window's reset time has already passed, so the @@ -1399,6 +1456,85 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_LoadBalanceTopKFallback } } +// Regression: TopK initial filter must drop quota-auto-paused accounts. Otherwise +// the candidate pool is filled with paused accounts, healthy accounts fall outside +// TopK, and the scheduler returns "no available accounts" even though healthy ones +// exist. +func TestOpenAIGatewayService_SelectAccountWithScheduler_LoadBalanceTopKExcludesQuotaPaused(t *testing.T) { + ctx := context.Background() + groupID := int64(110) + accounts := []Account{ + { + ID: 37001, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{ + "codex_5h_used_percent": 96.0, + "auto_pause_5h_threshold": 0.95, + }, + }, + { + ID: 37002, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 5, + }, + } + + cfg := &config.Config{} + cfg.Gateway.OpenAIWS.LBTopK = 1 // TopK=1 makes the bug fatal: paused account would crowd out the healthy one entirely + cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Priority = 0.4 + cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Load = 1.0 + cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Queue = 1.0 + + concurrencyCache := schedulerTestConcurrencyCache{ + loadMap: map[int64]*AccountLoadInfo{ + 37001: {AccountID: 37001, LoadRate: 5, WaitingCount: 0}, + 37002: {AccountID: 37002, LoadRate: 5, WaitingCount: 0}, + }, + acquireResults: map[int64]bool{ + 37002: true, + }, + } + + svc := &OpenAIGatewayService{ + accountRepo: schedulerTestOpenAIAccountRepo{accounts: accounts}, + cache: &schedulerTestGatewayCache{}, + cfg: cfg, + rateLimitService: newOpenAIAdvancedSchedulerRateLimitService("true"), + concurrencyService: NewConcurrencyService(concurrencyCache), + } + + selection, decision, err := svc.SelectAccountWithScheduler( + ctx, + &groupID, + "", + "", + "gpt-5.1", + nil, + OpenAIUpstreamTransportAny, + false, + ) + require.NoError(t, err) + require.NotNil(t, selection) + require.NotNil(t, selection.Account) + require.Equal(t, int64(37002), selection.Account.ID) + require.Equal(t, openAIAccountScheduleLayerLoadBalance, decision.Layer) + // Only the healthy account should ever enter the candidate pool; the paused one + // must be filtered out at the initial-filter stage. + require.Equal(t, 1, decision.CandidateCount) + if selection.ReleaseFunc != nil { + selection.ReleaseFunc() + } +} + func TestOpenAIGatewayService_OpenAIAccountSchedulerMetrics(t *testing.T) { ctx := context.Background() groupID := int64(12) diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index e2534cc2..b1ae6a9a 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -1360,14 +1360,21 @@ func shouldAutoPauseOpenAIAccountByQuota(ctx context.Context, account *Account) if account == nil || !account.IsOpenAI() { return false, openAIQuotaAutoPauseDecision{} } + // Per-account explicit-disable flags must take precedence over the global default. + // Without these, leaving the account threshold blank means "use global default", + // so an admin has no way to exempt a single account from auto-pause once a global + // default exists. The disable flag is per-window so an account can opt out of + // only 5h or only 7d auto-pause. + disabled5h := resolveAccountExtraBool(account.Extra, "auto_pause_5h_disabled") + disabled7d := resolveAccountExtraBool(account.Extra, "auto_pause_7d_disabled") threshold5h, threshold7d := resolveOpenAIQuotaAutoPauseThresholds(ctx, account) now := time.Now() - if threshold5h > 0 { + if !disabled5h && threshold5h > 0 { if utilization, ok := resolveOpenAIQuotaUtilization(account.Extra, "5h", now); ok && utilization >= threshold5h { return true, openAIQuotaAutoPauseDecision{window: "5h", threshold: threshold5h, utilization: utilization} } } - if threshold7d > 0 { + if !disabled7d && threshold7d > 0 { if utilization, ok := resolveOpenAIQuotaUtilization(account.Extra, "7d", now); ok && utilization >= threshold7d { return true, openAIQuotaAutoPauseDecision{window: "7d", threshold: threshold7d, utilization: utilization} } @@ -1375,6 +1382,39 @@ func shouldAutoPauseOpenAIAccountByQuota(ctx context.Context, account *Account) return false, openAIQuotaAutoPauseDecision{} } +// resolveAccountExtraBool reads a bool-like value from account extra, tolerating +// the few shapes JSON unmarshalling may produce (real bool, "true"/"false" +// strings, 0/1 numbers). +func resolveAccountExtraBool(extra map[string]any, key string) bool { + if len(extra) == 0 { + return false + } + value, ok := extra[key] + if !ok || value == nil { + return false + } + switch v := value.(type) { + case bool: + return v + case string: + parsed, err := strconv.ParseBool(strings.TrimSpace(v)) + return err == nil && parsed + case float64: + return v != 0 + case float32: + return v != 0 + case int: + return v != 0 + case int64: + return v != 0 + case json.Number: + if i, err := v.Int64(); err == nil { + return i != 0 + } + } + return false +} + func resolveOpenAIQuotaAutoPauseThresholds(ctx context.Context, account *Account) (float64, float64) { threshold5h, _ := resolveAccountExtraNumber(account.Extra, "auto_pause_5h_threshold") threshold7d, _ := resolveAccountExtraNumber(account.Extra, "auto_pause_7d_threshold") diff --git a/backend/internal/service/ops_service.go b/backend/internal/service/ops_service.go index 1cea72fa..2d7c5bd4 100644 --- a/backend/internal/service/ops_service.go +++ b/backend/internal/service/ops_service.go @@ -41,6 +41,11 @@ type OpsService struct { // cleanupReloader 由 wire 在 OpsCleanupService 构造完成后通过 SetCleanupReloader 注入。 // 解耦避免 OpsService -> OpsCleanupService 的硬依赖(cleanup 也读 settings,会循环)。 cleanupReloader CleanupReloader + + // quotaAutoPauseSink 由 wire 注入(通常是 SettingService.SetOpenAIQuotaAutoPauseSettings)。 + // UpdateOpsAdvancedSettings 写入新配置后调用,把最新的 quota auto-pause 全局默认阈值 + // 立即同步到调度热路径读取的内存缓存,避免下次请求才能感知新值。 + quotaAutoPauseSink func(OpsOpenAIAccountQuotaAutoPauseSettings) } // CleanupReloader 由 OpsCleanupService 实现。 @@ -57,6 +62,16 @@ func (s *OpsService) SetCleanupReloader(r CleanupReloader) { s.cleanupReloader = r } +// SetOpenAIQuotaAutoPauseSettingsSink 由 wire 注入,把最新的 quota auto-pause 全局默认 +// 阈值 push 到调度热路径读取的内存缓存。同 SetCleanupReloader 的解耦目的:避免 OpsService +// 持有 *SettingService 引入循环依赖。 +func (s *OpsService) SetOpenAIQuotaAutoPauseSettingsSink(sink func(OpsOpenAIAccountQuotaAutoPauseSettings)) { + if s == nil { + return + } + s.quotaAutoPauseSink = sink +} + func NewOpsService( opsRepo OpsRepository, settingRepo SettingRepository, diff --git a/backend/internal/service/ops_settings.go b/backend/internal/service/ops_settings.go index 23e92e5a..472f4e32 100644 --- a/backend/internal/service/ops_settings.go +++ b/backend/internal/service/ops_settings.go @@ -490,12 +490,12 @@ func (s *OpsService) UpdateOpsAdvancedSettings(ctx context.Context, cfg *OpsAdva if err := s.settingRepo.Set(ctx, SettingKeyOpsAdvancedSettings, string(raw)); err != nil { return nil, err } - cacheKey := openAIQuotaAutoPauseSettingsCacheKey(s.settingRepo) - openAIQuotaAutoPauseSettingsSF.Forget(cacheKey) - storeOpenAIQuotaAutoPauseSettingsCache(s.settingRepo, &cachedOpenAIQuotaAutoPauseSettings{ - settings: cfg.OpenAIAccountQuotaAutoPause, - expiresAt: time.Now().Add(openAIQuotaAutoPauseSettingsCacheTTL).UnixNano(), - }) + // Push the new quota auto-pause settings straight into the in-memory cache that + // the OpenAI scheduling hot path reads, so the next request observes the new value + // without waiting for the background refresher's TTL. + if s.quotaAutoPauseSink != nil { + s.quotaAutoPauseSink(cfg.OpenAIAccountQuotaAutoPause) + } // notify cleanup service to reload schedule/enabled. if s.cleanupReloader != nil { diff --git a/backend/internal/service/ops_settings_advanced_test.go b/backend/internal/service/ops_settings_advanced_test.go index d8598fe0..62803f94 100644 --- a/backend/internal/service/ops_settings_advanced_test.go +++ b/backend/internal/service/ops_settings_advanced_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "testing" + "time" "github.com/Wei-Shaw/sub2api/internal/config" ) @@ -103,11 +104,58 @@ func TestGetOpenAIQuotaAutoPauseSettings_ReadsDefaultsFromOpsAdvancedSettings(t repo.values[SettingKeyOpsAdvancedSettings] = `{"openai_account_quota_auto_pause":{"default_threshold_5h":0.95,"default_threshold_7d":0.9}}` svc := NewSettingService(repo, &config.Config{}) - settings := svc.GetOpenAIQuotaAutoPauseSettings(context.Background()) + // Warm the in-memory cache synchronously so the assertion below is deterministic. + // GetOpenAIQuotaAutoPauseSettings is non-blocking on the hot path (returns the + // cached value, refreshes asynchronously); for tests and startup, Warm is the + // synchronous entry point that guarantees a populated cache. + settings := svc.WarmOpenAIQuotaAutoPauseSettings(context.Background()) if settings.DefaultThreshold5h != 0.95 { t.Fatalf("DefaultThreshold5h = %v, want 0.95", settings.DefaultThreshold5h) } if settings.DefaultThreshold7d != 0.9 { t.Fatalf("DefaultThreshold7d = %v, want 0.9", settings.DefaultThreshold7d) } + + // Subsequent Get must hit the warm cache and return the same value without any DB + // access — that's the hot-path invariant. + cached := svc.GetOpenAIQuotaAutoPauseSettings(context.Background()) + if cached.DefaultThreshold5h != 0.95 || cached.DefaultThreshold7d != 0.9 { + t.Fatalf("cached read = %+v, want {0.95, 0.9}", cached) + } +} + +// Hot-path invariant: a Get with cold cache must return immediately (zero defaults) +// rather than blocking on the DB. The async refresher will populate the cache for +// subsequent calls. +func TestGetOpenAIQuotaAutoPauseSettings_ColdCacheNonBlocking(t *testing.T) { + repo := newRuntimeSettingRepoStub() + repo.values[SettingKeyOpsAdvancedSettings] = `{"openai_account_quota_auto_pause":{"default_threshold_5h":0.7}}` + svc := NewSettingService(repo, &config.Config{}) + + start := time.Now() + settings := svc.GetOpenAIQuotaAutoPauseSettings(context.Background()) + elapsed := time.Since(start) + if elapsed > 50*time.Millisecond { + t.Fatalf("cold-cache Get must be non-blocking, took %v", elapsed) + } + // Cold cache means we get zero defaults (the async refresh hasn't completed yet). + if settings.DefaultThreshold5h != 0 || settings.DefaultThreshold7d != 0 { + t.Fatalf("cold-cache Get = %+v, want zeroes", settings) + } +} + +// Explicit cache write (e.g. from UpdateOpsAdvancedSettings) must be visible on the +// very next read without any DB roundtrip. +func TestSetOpenAIQuotaAutoPauseSettings_VisibleImmediately(t *testing.T) { + svc := NewSettingService(newRuntimeSettingRepoStub(), &config.Config{}) + + svc.SetOpenAIQuotaAutoPauseSettings(OpsOpenAIAccountQuotaAutoPauseSettings{ + DefaultThreshold5h: 0.88, + DefaultThreshold7d: 0.77, + }) + + got := svc.GetOpenAIQuotaAutoPauseSettings(context.Background()) + if got.DefaultThreshold5h != 0.88 || got.DefaultThreshold7d != 0.77 { + t.Fatalf("after Set, Get = %+v, want {0.88, 0.77}", got) + } } diff --git a/backend/internal/service/setting_service.go b/backend/internal/service/setting_service.go index 40e98b88..98acdb80 100644 --- a/backend/internal/service/setting_service.go +++ b/backend/internal/service/setting_service.go @@ -14,7 +14,6 @@ import ( "sort" "strconv" "strings" - "sync" "sync/atomic" "time" @@ -162,28 +161,7 @@ const openAIQuotaAutoPauseSettingsCacheTTL = 60 * time.Second const openAIQuotaAutoPauseSettingsErrorTTL = 5 * time.Second const openAIQuotaAutoPauseSettingsDBTimeout = 5 * time.Second -var openAIQuotaAutoPauseSettingsCache sync.Map // map[string]*cachedOpenAIQuotaAutoPauseSettings -var openAIQuotaAutoPauseSettingsSF singleflight.Group - -func openAIQuotaAutoPauseSettingsCacheKey(repo SettingRepository) string { - if repo == nil { - return "nil" - } - return fmt.Sprintf("%T:%p", repo, repo) -} - -func loadOpenAIQuotaAutoPauseSettingsCache(repo SettingRepository) (*cachedOpenAIQuotaAutoPauseSettings, bool) { - value, ok := openAIQuotaAutoPauseSettingsCache.Load(openAIQuotaAutoPauseSettingsCacheKey(repo)) - if !ok || value == nil { - return nil, false - } - cached, ok := value.(*cachedOpenAIQuotaAutoPauseSettings) - return cached, ok && cached != nil -} - -func storeOpenAIQuotaAutoPauseSettingsCache(repo SettingRepository, cached *cachedOpenAIQuotaAutoPauseSettings) { - openAIQuotaAutoPauseSettingsCache.Store(openAIQuotaAutoPauseSettingsCacheKey(repo), cached) -} +const openAIQuotaAutoPauseSettingsRefreshKey = "openai_quota_auto_pause_settings" // DefaultSubscriptionGroupReader validates group references used by default subscriptions. type DefaultSubscriptionGroupReader interface { @@ -209,6 +187,15 @@ type SettingService struct { openAICodexUASF singleflight.Group openAIAllowCodexPluginCache atomic.Value // *cachedOpenAIAllowCodexPlugin openAIAllowCodexPluginSF singleflight.Group + + // openAIQuotaAutoPauseSettingsCache holds the most recently observed quota auto-pause + // settings. GetOpenAIQuotaAutoPauseSettings reads this atomic.Value on the request hot + // path without ever blocking on the DB; when the cached entry expires, a background + // goroutine refreshes it via openAIQuotaAutoPauseSettingsSF (stale-while-revalidate). + // This per-service field also gives tests natural isolation — each SettingService + // instance owns its own cache, no shared package-level state. + openAIQuotaAutoPauseSettingsCache atomic.Value // *cachedOpenAIQuotaAutoPauseSettings + openAIQuotaAutoPauseSettingsSF singleflight.Group } // DefaultPlatformQuotaSetting 单 platform 三档限额(nil = 沿用上层;0 = 显式禁用;>0 = 上限) @@ -2060,9 +2047,17 @@ func (s *SettingService) refreshCachedSettings(settings *SystemSettings) { enabled: settings.OpenAIAdvancedSchedulerEnabled, expiresAt: time.Now().Add(openAIAdvancedSchedulerSettingCacheTTL).UnixNano(), }) - cacheKey := openAIQuotaAutoPauseSettingsCacheKey(s.settingRepo) - openAIQuotaAutoPauseSettingsSF.Forget(cacheKey) - openAIQuotaAutoPauseSettingsCache.Delete(cacheKey) + // Invalidate the quota auto-pause cache and let the next read trigger a fresh load. + // We can't know from here whether ops_advanced_settings was also touched, so be + // defensive: store an expired entry — GetOpenAIQuotaAutoPauseSettings will serve + // stale and kick off an async refresh, never blocking the request that follows. + s.openAIQuotaAutoPauseSettingsSF.Forget(openAIQuotaAutoPauseSettingsRefreshKey) + if cached, _ := s.openAIQuotaAutoPauseSettingsCache.Load().(*cachedOpenAIQuotaAutoPauseSettings); cached != nil { + s.openAIQuotaAutoPauseSettingsCache.Store(&cachedOpenAIQuotaAutoPauseSettings{ + settings: cached.settings, + expiresAt: 0, + }) + } if s.cfg != nil { s.cfg.SetTrustForwardedIPForAPIKeyACL(settings.APIKeyACLTrustForwardedIP) } @@ -4484,49 +4479,104 @@ func (s *SettingService) GetClaudeCodeVersionBounds(ctx context.Context) (min, m return b.min, b.max } +// GetOpenAIQuotaAutoPauseSettings returns the current global default quota auto-pause +// settings. It is invoked on the OpenAI scheduling hot path (once per request) and is +// therefore designed to never block on the DB: +// +// - Fresh cached value → returned immediately. +// - Stale or empty cache → the last known value is returned, and a background +// goroutine refreshes the cache via singleflight (stale-while-revalidate). +// - First call with no cache yet → zero defaults are returned and the same async +// refresh is kicked off; the next call gets the freshly populated value. +// +// Callers that need the freshly persisted value synchronously (tests, post-update +// confirmation, optional startup warm-up) should call WarmOpenAIQuotaAutoPauseSettings. func (s *SettingService) GetOpenAIQuotaAutoPauseSettings(ctx context.Context) OpsOpenAIAccountQuotaAutoPauseSettings { - if cached, ok := loadOpenAIQuotaAutoPauseSettingsCache(s.settingRepo); ok { - if time.Now().UnixNano() < cached.expiresAt { - return cached.settings + if s == nil { + return OpsOpenAIAccountQuotaAutoPauseSettings{} + } + cached, _ := s.openAIQuotaAutoPauseSettingsCache.Load().(*cachedOpenAIQuotaAutoPauseSettings) + now := time.Now().UnixNano() + if cached != nil && now < cached.expiresAt { + return cached.settings + } + // Stale or unset: trigger background refresh without blocking this request. + // singleflight.DoChan dedupes concurrent refreshes; we deliberately ignore the + // returned channel — the result is observable via the atomic cache. + s.openAIQuotaAutoPauseSettingsSF.DoChan(openAIQuotaAutoPauseSettingsRefreshKey, func() (any, error) { + s.refreshOpenAIQuotaAutoPauseSettings(context.Background()) + return nil, nil + }) + if cached != nil { + return cached.settings // serve stale value while revalidating + } + return OpsOpenAIAccountQuotaAutoPauseSettings{} +} + +// WarmOpenAIQuotaAutoPauseSettings synchronously loads the quota auto-pause settings +// into the in-memory cache. Useful for application startup (so the first request hits +// a warm cache) and for tests that need deterministic reads immediately after +// constructing the service. +func (s *SettingService) WarmOpenAIQuotaAutoPauseSettings(ctx context.Context) OpsOpenAIAccountQuotaAutoPauseSettings { + if s == nil { + return OpsOpenAIAccountQuotaAutoPauseSettings{} + } + s.refreshOpenAIQuotaAutoPauseSettings(ctx) + cached, _ := s.openAIQuotaAutoPauseSettingsCache.Load().(*cachedOpenAIQuotaAutoPauseSettings) + if cached == nil { + return OpsOpenAIAccountQuotaAutoPauseSettings{} + } + return cached.settings +} + +// refreshOpenAIQuotaAutoPauseSettings reads the latest settings from the DB and stores +// them into the in-memory cache. On error it stores the prior value (or zero defaults +// if nothing is cached yet) with the shorter error TTL so the next refresh comes +// sooner. Always uses its own timeout-bounded context to keep refresh latency +// predictable regardless of the caller. +func (s *SettingService) refreshOpenAIQuotaAutoPauseSettings(ctx context.Context) { + if s == nil || s.settingRepo == nil { + return + } + dbCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), openAIQuotaAutoPauseSettingsDBTimeout) + defer cancel() + + settings := OpsOpenAIAccountQuotaAutoPauseSettings{} + ttl := openAIQuotaAutoPauseSettingsCacheTTL + raw, err := s.settingRepo.GetValue(dbCtx, SettingKeyOpsAdvancedSettings) + if err == nil { + cfg := defaultOpsAdvancedSettings() + if strings.TrimSpace(raw) != "" { + if jsonErr := json.Unmarshal([]byte(raw), cfg); jsonErr == nil { + normalizeOpsAdvancedSettings(cfg) + } } + settings = cfg.OpenAIAccountQuotaAutoPause + } else if !errors.Is(err, ErrSettingNotFound) { + // Real error: keep serving prior value but refresh sooner. + if prior, _ := s.openAIQuotaAutoPauseSettingsCache.Load().(*cachedOpenAIQuotaAutoPauseSettings); prior != nil { + settings = prior.settings + } + ttl = openAIQuotaAutoPauseSettingsErrorTTL } - cacheKey := openAIQuotaAutoPauseSettingsCacheKey(s.settingRepo) - result, _, _ := openAIQuotaAutoPauseSettingsSF.Do(cacheKey, func() (any, error) { - if cached, ok := loadOpenAIQuotaAutoPauseSettingsCache(s.settingRepo); ok { - if time.Now().UnixNano() < cached.expiresAt { - return cached.settings, nil - } - } - - settings := OpsOpenAIAccountQuotaAutoPauseSettings{} - ttl := openAIQuotaAutoPauseSettingsCacheTTL - if s != nil && s.settingRepo != nil { - dbCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), openAIQuotaAutoPauseSettingsDBTimeout) - defer cancel() - raw, err := s.settingRepo.GetValue(dbCtx, SettingKeyOpsAdvancedSettings) - if err == nil { - cfg := defaultOpsAdvancedSettings() - if strings.TrimSpace(raw) != "" { - if jsonErr := json.Unmarshal([]byte(raw), cfg); jsonErr == nil { - normalizeOpsAdvancedSettings(cfg) - } - } - settings = cfg.OpenAIAccountQuotaAutoPause - } else { - ttl = openAIQuotaAutoPauseSettingsErrorTTL - } - } - - storeOpenAIQuotaAutoPauseSettingsCache(s.settingRepo, &cachedOpenAIQuotaAutoPauseSettings{ - settings: settings, - expiresAt: time.Now().Add(ttl).UnixNano(), - }) - return settings, nil + s.openAIQuotaAutoPauseSettingsCache.Store(&cachedOpenAIQuotaAutoPauseSettings{ + settings: settings, + expiresAt: time.Now().Add(ttl).UnixNano(), }) +} - settings, _ := result.(OpsOpenAIAccountQuotaAutoPauseSettings) - return settings +// SetOpenAIQuotaAutoPauseSettings writes the given settings directly into the in-memory +// cache. Called from settings-write code paths so that the next read reflects the new +// value immediately, without waiting for the background refresh. +func (s *SettingService) SetOpenAIQuotaAutoPauseSettings(settings OpsOpenAIAccountQuotaAutoPauseSettings) { + if s == nil { + return + } + s.openAIQuotaAutoPauseSettingsCache.Store(&cachedOpenAIQuotaAutoPauseSettings{ + settings: settings, + expiresAt: time.Now().Add(openAIQuotaAutoPauseSettingsCacheTTL).UnixNano(), + }) } // GetRectifierSettings 获取请求整流器配置 diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index b22e10ae..d3e4ce51 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -396,6 +396,46 @@ func ProvideBackupService( return svc } +// ProvideOpsService constructs OpsService and wires the SettingService-backed quota +// auto-pause cache sink. Mirrors the SetCleanupReloader pattern: OpsService doesn't +// hold a *SettingService reference, but wire injects a tiny callback so writes to +// ops_advanced_settings immediately propagate into the scheduler hot-path cache. +func ProvideOpsService( + opsRepo OpsRepository, + settingRepo SettingRepository, + cfg *config.Config, + accountRepo AccountRepository, + userRepo UserRepository, + concurrencyService *ConcurrencyService, + gatewayService *GatewayService, + openAIGatewayService *OpenAIGatewayService, + geminiCompatService *GeminiMessagesCompatService, + antigravityGatewayService *AntigravityGatewayService, + systemLogSink *OpsSystemLogSink, + settingService *SettingService, +) *OpsService { + svc := NewOpsService( + opsRepo, + settingRepo, + cfg, + accountRepo, + userRepo, + concurrencyService, + gatewayService, + openAIGatewayService, + geminiCompatService, + antigravityGatewayService, + systemLogSink, + ) + if settingService != nil { + svc.SetOpenAIQuotaAutoPauseSettingsSink(settingService.SetOpenAIQuotaAutoPauseSettings) + // Optional warm-up so the first scheduled request after process start observes + // a populated cache rather than zero defaults. Best-effort, sync-bounded. + settingService.WarmOpenAIQuotaAutoPauseSettings(context.Background()) + } + return svc +} + // ProvideSettingService wires SettingService with group reader and proxy repo. func ProvideSettingService(settingRepo SettingRepository, groupRepo GroupRepository, proxyRepo ProxyRepository, cfg *config.Config) *SettingService { svc := NewSettingService(settingRepo, cfg) @@ -481,7 +521,7 @@ var ProviderSet = wire.NewSet( NewDataManagementService, ProvideBackupService, ProvideOpsSystemLogSink, - NewOpsService, + ProvideOpsService, ProvideOpsMetricsCollector, ProvideOpsAggregationService, ProvideOpsAlertEvaluatorService, diff --git a/frontend/src/components/account/EditAccountModal.vue b/frontend/src/components/account/EditAccountModal.vue index 470c0bfb..8dc85d0e 100644 --- a/frontend/src/components/account/EditAccountModal.vue +++ b/frontend/src/components/account/EditAccountModal.vue @@ -1791,6 +1791,28 @@ v-if="account?.platform === 'openai'" class="border-t border-gray-200 pt-4 dark:border-dark-600 space-y-4" > +
{{ t('admin.accounts.autoPauseDisabledHint') }}
+{{ t('admin.accounts.autoPauseThresholdHint') }}
{{ t('admin.accounts.autoPauseDisabledHint') }}
+{{ t('admin.accounts.autoPauseThresholdHint') }}
@@ -2481,6 +2527,8 @@ const interceptWarmupRequests = ref(false) const autoPauseOnExpired = ref(false) const autoPause5hThreshold = ref