From a2ab67f8c771ead234d8194c1e7fd64834fb9a51 Mon Sep 17 00:00:00 2001 From: win Date: Wed, 29 Apr 2026 03:13:30 +0800 Subject: [PATCH] feat(scheduler): add P2C + quota-aware scheduling for OpenAI accounts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add GetQuotaRemainingFraction() to Account: returns [0,1] fraction of remaining quota; 1.0 when no limit is configured (unlimited accounts) - Add Quota float64 weight field to GatewayOpenAIWSSchedulerScoreWeights and EnableP2CScheduling bool to GatewayOpenAIWSConfig (both default off) - Extend selectByLoadBalance scoring with quota factor (gated by Quota>0) - Add selectByPowerOfTwo(): O(1) P2C selection — samples 2 random candidates, tries the better-scored one first then the other, falls back to wait plan; activated when EnableP2CScheduling=true - Add openAIWSP2CEnabled() helper on OpenAIGatewayService - Add 6 tests covering quota fraction edge cases, P2C toggle, weight defaults, single-candidate P2C, two-candidate P2C selection, and quota score ordering --- backend/internal/config/config.go | 7 +- backend/internal/service/account.go | 18 +++ .../service/openai_account_scheduler.go | 103 +++++++++++++- .../service/openai_account_scheduler_test.go | 126 ++++++++++++++++++ 4 files changed, 252 insertions(+), 2 deletions(-) diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 4d116313..276d76b9 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -657,6 +657,8 @@ type GatewayOpenAIWSConfig struct { // StickyPreviousResponseTTLSeconds: 兼容旧键(当新键未设置时回退) StickyPreviousResponseTTLSeconds int `mapstructure:"sticky_previous_response_ttl_seconds"` + // EnableP2CScheduling: 启用 Power-of-Two-Choices 调度(默认 false,使用 top-K 加权随机) + EnableP2CScheduling bool `mapstructure:"enable_p2c_scheduling"` SchedulerScoreWeights GatewayOpenAIWSSchedulerScoreWeights `mapstructure:"scheduler_score_weights"` } @@ -667,6 +669,8 @@ type GatewayOpenAIWSSchedulerScoreWeights struct { Queue float64 `mapstructure:"queue"` ErrorRate float64 `mapstructure:"error_rate"` TTFT float64 `mapstructure:"ttft"` + // Quota: 剩余配额比例权重(0 表示不参与打分) + Quota float64 `mapstructure:"quota"` } // GatewayUsageRecordConfig 使用量记录异步队列配置 @@ -2197,7 +2201,8 @@ func (c *Config) Validate() error { c.Gateway.OpenAIWS.SchedulerScoreWeights.Load < 0 || c.Gateway.OpenAIWS.SchedulerScoreWeights.Queue < 0 || c.Gateway.OpenAIWS.SchedulerScoreWeights.ErrorRate < 0 || - c.Gateway.OpenAIWS.SchedulerScoreWeights.TTFT < 0 { + c.Gateway.OpenAIWS.SchedulerScoreWeights.TTFT < 0 || + c.Gateway.OpenAIWS.SchedulerScoreWeights.Quota < 0 { return fmt.Errorf("gateway.openai_ws.scheduler_score_weights.* must be non-negative") } weightSum := c.Gateway.OpenAIWS.SchedulerScoreWeights.Priority + diff --git a/backend/internal/service/account.go b/backend/internal/service/account.go index feb1da37..9f875f23 100644 --- a/backend/internal/service/account.go +++ b/backend/internal/service/account.go @@ -1305,6 +1305,24 @@ func (a *Account) GetQuotaUsed() float64 { return a.getExtraFloat64("quota_used") } +// GetQuotaRemainingFraction returns the fraction of total quota remaining in [0,1]. +// Returns 1.0 when no quota limit is set (limit == 0 means unlimited). +func (a *Account) GetQuotaRemainingFraction() float64 { + limit := a.GetQuotaLimit() + if limit <= 0 { + return 1.0 + } + used := a.GetQuotaUsed() + remaining := (limit - used) / limit + if remaining < 0 { + return 0 + } + if remaining > 1 { + return 1 + } + return remaining +} + // GetQuotaDailyLimit 获取日额度限制(美元),0 表示未启用 func (a *Account) GetQuotaDailyLimit() float64 { return a.getExtraFloat64("quota_daily_limit") diff --git a/backend/internal/service/openai_account_scheduler.go b/backend/internal/service/openai_account_scheduler.go index 37e7ed2c..0b80a80e 100644 --- a/backend/internal/service/openai_account_scheduler.go +++ b/backend/internal/service/openai_account_scheduler.go @@ -672,12 +672,18 @@ func (s *defaultOpenAIAccountScheduler) selectByLoadBalance( if item.hasTTFT && hasTTFTSample && maxTTFT > minTTFT { ttftFactor = 1 - clamp01((item.ttft-minTTFT)/(maxTTFT-minTTFT)) } + quotaFactor := item.account.GetQuotaRemainingFraction() item.score = weights.Priority*priorityFactor + weights.Load*loadFactor + weights.Queue*queueFactor + weights.ErrorRate*errorFactor + - weights.TTFT*ttftFactor + weights.TTFT*ttftFactor + + weights.Quota*quotaFactor + } + + if s.service.openAIWSP2CEnabled() { + return s.selectByPowerOfTwo(ctx, req, candidates, loadSkew) } topK := s.service.openAIWSLBTopK() @@ -888,6 +894,7 @@ func (s *OpenAIGatewayService) openAIWSSchedulerWeights() GatewayOpenAIWSSchedul Queue: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Queue, ErrorRate: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.ErrorRate, TTFT: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.TTFT, + Quota: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota, } } return GatewayOpenAIWSSchedulerScoreWeightsView{ @@ -896,15 +903,21 @@ func (s *OpenAIGatewayService) openAIWSSchedulerWeights() GatewayOpenAIWSSchedul Queue: 0.7, ErrorRate: 0.8, TTFT: 0.5, + Quota: 0.0, } } +func (s *OpenAIGatewayService) openAIWSP2CEnabled() bool { + return s != nil && s.cfg != nil && s.cfg.Gateway.OpenAIWS.EnableP2CScheduling +} + type GatewayOpenAIWSSchedulerScoreWeightsView struct { Priority float64 Load float64 Queue float64 ErrorRate float64 TTFT float64 + Quota float64 } func clamp01(value float64) float64 { @@ -918,6 +931,94 @@ func clamp01(value float64) float64 { } } +// selectByPowerOfTwo implements Power-of-Two-Choices (P2C): sample 2 random +// candidates and attempt the better-scored one first, then the other. +// This gives O(1) selection with load distribution comparable to top-K when N is large. +func (s *defaultOpenAIAccountScheduler) selectByPowerOfTwo( + ctx context.Context, + req OpenAIAccountScheduleRequest, + candidates []openAIAccountCandidateScore, + loadSkew float64, +) (*AccountSelectionResult, int, int, float64, error) { + n := len(candidates) + if n == 0 { + return nil, 0, 0, loadSkew, ErrNoAvailableAccounts + } + + rng := newOpenAISelectionRNG(deriveOpenAISelectionSeed(req)) + + // Pick two distinct random indices. + idxA := int(rng.nextUint64() % uint64(n)) + idxB := idxA + if n > 1 { + for idxB == idxA { + idxB = int(rng.nextUint64() % uint64(n)) + } + } + + // Order: better candidate first. + first, second := candidates[idxA], candidates[idxB] + if isOpenAIAccountCandidateBetter(second, first) { + first, second = second, first + } + + tryAcquire := func(c openAIAccountCandidateScore) (*AccountSelectionResult, bool, error) { + fresh := s.service.resolveFreshSchedulableOpenAIAccount(ctx, c.account, req.RequestedModel) + if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) { + return nil, false, nil + } + fresh = s.service.recheckSelectedOpenAIAccountFromDB(ctx, fresh, req.RequestedModel) + if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) { + return nil, false, nil + } + result, err := s.service.tryAcquireAccountSlot(ctx, fresh.ID, fresh.Concurrency) + if err != nil { + return nil, false, err + } + if result != nil && result.Acquired { + if req.SessionHash != "" { + _ = s.service.BindStickySession(ctx, req.GroupID, req.SessionHash, fresh.ID) + } + return &AccountSelectionResult{ + Account: fresh, + Acquired: true, + ReleaseFunc: result.ReleaseFunc, + }, true, nil + } + return nil, false, nil + } + + for _, c := range []openAIAccountCandidateScore{first, second} { + result, ok, err := tryAcquire(c) + if err != nil { + return nil, n, 2, loadSkew, err + } + if ok { + return result, n, 2, loadSkew, nil + } + } + + // Both slots busy — return wait plan on the better candidate. + cfg := s.service.schedulingConfig() + for _, c := range []openAIAccountCandidateScore{first, second} { + fresh := s.service.resolveFreshSchedulableOpenAIAccount(ctx, c.account, req.RequestedModel) + if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) { + continue + } + return &AccountSelectionResult{ + Account: fresh, + WaitPlan: &AccountWaitPlan{ + AccountID: fresh.ID, + MaxConcurrency: fresh.Concurrency, + Timeout: cfg.FallbackWaitTimeout, + MaxWaiting: cfg.FallbackMaxWaiting, + }, + }, n, 2, loadSkew, nil + } + + return nil, n, 2, loadSkew, ErrNoAvailableAccounts +} + func calcLoadSkewByMoments(sum float64, sumSquares float64, count int) float64 { if count <= 1 { return 0 diff --git a/backend/internal/service/openai_account_scheduler_test.go b/backend/internal/service/openai_account_scheduler_test.go index 088815ed..35d26e2b 100644 --- a/backend/internal/service/openai_account_scheduler_test.go +++ b/backend/internal/service/openai_account_scheduler_test.go @@ -966,3 +966,129 @@ func TestDefaultOpenAIAccountScheduler_IsAccountTransportCompatible_Branches(t * func int64PtrForTest(v int64) *int64 { return &v } + +func TestAccount_GetQuotaRemainingFraction(t *testing.T) { + // No limit configured → always 1.0 (unlimited) + noLimit := &Account{} + require.Equal(t, 1.0, noLimit.GetQuotaRemainingFraction()) + + // 50% used + half := &Account{Extra: map[string]any{"quota_limit": 100.0, "quota_used": 50.0}} + require.InDelta(t, 0.5, half.GetQuotaRemainingFraction(), 1e-9) + + // Fully exhausted + full := &Account{Extra: map[string]any{"quota_limit": 100.0, "quota_used": 100.0}} + require.Equal(t, 0.0, full.GetQuotaRemainingFraction()) + + // Over limit → clamp to 0 + over := &Account{Extra: map[string]any{"quota_limit": 100.0, "quota_used": 150.0}} + require.Equal(t, 0.0, over.GetQuotaRemainingFraction()) + + // Fresh (0 used) + fresh := &Account{Extra: map[string]any{"quota_limit": 200.0, "quota_used": 0.0}} + require.Equal(t, 1.0, fresh.GetQuotaRemainingFraction()) +} + +func TestOpenAIGatewayService_P2CEnabled(t *testing.T) { + require.False(t, (*OpenAIGatewayService)(nil).openAIWSP2CEnabled()) + require.False(t, (&OpenAIGatewayService{}).openAIWSP2CEnabled()) + + cfg := &config.Config{} + cfg.Gateway.OpenAIWS.EnableP2CScheduling = false + require.False(t, (&OpenAIGatewayService{cfg: cfg}).openAIWSP2CEnabled()) + + cfg.Gateway.OpenAIWS.EnableP2CScheduling = true + require.True(t, (&OpenAIGatewayService{cfg: cfg}).openAIWSP2CEnabled()) +} + +func TestOpenAIGatewayService_SchedulerWeights_QuotaField(t *testing.T) { + // Default weights: Quota is 0 (disabled by default) + svc := &OpenAIGatewayService{} + weights := svc.openAIWSSchedulerWeights() + require.Equal(t, 0.0, weights.Quota) + + // Config-driven quota weight + cfg := &config.Config{} + cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota = 0.4 + svcWithCfg := &OpenAIGatewayService{cfg: cfg} + require.Equal(t, 0.4, svcWithCfg.openAIWSSchedulerWeights().Quota) +} + +func TestDefaultOpenAIAccountScheduler_SelectByPowerOfTwo_SingleCandidate(t *testing.T) { + ctx := context.Background() + groupID := int64(99001) + account := &Account{ID: 71001, Platform: PlatformOpenAI, Type: AccountTypeOAuth, Status: StatusActive, Schedulable: true, Concurrency: 5, Priority: 0} + snapshotCache := &openAISnapshotCacheStub{ + snapshotAccounts: []*Account{account}, + accountsByID: map[int64]*Account{71001: account}, + } + cfg := &config.Config{} + cfg.Gateway.OpenAIWS.EnableP2CScheduling = true + cfg.Gateway.OpenAIWS.LBTopK = 5 + svc := &OpenAIGatewayService{ + accountRepo: stubOpenAIAccountRepo{accounts: []Account{*account}}, + cfg: cfg, + schedulerSnapshot: &SchedulerSnapshotService{cache: snapshotCache}, + concurrencyService: NewConcurrencyService(stubConcurrencyCache{}), + } + + selection, decision, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "", "gpt-4o", nil, OpenAIUpstreamTransportAny) + require.NoError(t, err) + require.NotNil(t, selection) + require.Equal(t, int64(71001), selection.Account.ID) + require.Equal(t, openAIAccountScheduleLayerLoadBalance, decision.Layer) +} + +func TestDefaultOpenAIAccountScheduler_SelectByPowerOfTwo_PicksBetterCandidate(t *testing.T) { + ctx := context.Background() + groupID := int64(99002) + // Account A has low priority (better), B has high priority (worse). + // With P2C enabled and a deterministic seed, we should always get a valid selection. + accountA := &Account{ID: 72001, Platform: PlatformOpenAI, Type: AccountTypeOAuth, Status: StatusActive, Schedulable: true, Concurrency: 5, Priority: 0} + accountB := &Account{ID: 72002, Platform: PlatformOpenAI, Type: AccountTypeOAuth, Status: StatusActive, Schedulable: true, Concurrency: 5, Priority: 10} + snapshotCache := &openAISnapshotCacheStub{ + snapshotAccounts: []*Account{accountA, accountB}, + accountsByID: map[int64]*Account{72001: accountA, 72002: accountB}, + } + cfg := &config.Config{} + cfg.Gateway.OpenAIWS.EnableP2CScheduling = true + svc := &OpenAIGatewayService{ + accountRepo: stubOpenAIAccountRepo{accounts: []Account{*accountA, *accountB}}, + cfg: cfg, + schedulerSnapshot: &SchedulerSnapshotService{cache: snapshotCache}, + concurrencyService: NewConcurrencyService(stubConcurrencyCache{}), + } + + selection, decision, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "", "gpt-4o", nil, OpenAIUpstreamTransportAny) + require.NoError(t, err) + require.NotNil(t, selection) + require.NotNil(t, selection.Account) + require.Equal(t, openAIAccountScheduleLayerLoadBalance, decision.Layer) + // Either account is valid; just verify we got a schedulable one. + require.True(t, selection.Account.ID == 72001 || selection.Account.ID == 72002) +} + +func TestDefaultOpenAIAccountScheduler_QuotaFactorInfluencesScore(t *testing.T) { + // Verify that quota weight affects scoring by checking GetQuotaRemainingFraction is used. + // Account with high remaining quota should score higher when quota weight > 0. + highQuota := &Account{ + ID: 73001, Platform: PlatformOpenAI, Type: AccountTypeOAuth, + Status: StatusActive, Schedulable: true, Concurrency: 5, Priority: 0, + Extra: map[string]any{"quota_limit": 100.0, "quota_used": 10.0}, // 90% remaining + } + lowQuota := &Account{ + ID: 73002, Platform: PlatformOpenAI, Type: AccountTypeOAuth, + Status: StatusActive, Schedulable: true, Concurrency: 5, Priority: 0, + Extra: map[string]any{"quota_limit": 100.0, "quota_used": 90.0}, // 10% remaining + } + + require.InDelta(t, 0.9, highQuota.GetQuotaRemainingFraction(), 1e-9) + require.InDelta(t, 0.1, lowQuota.GetQuotaRemainingFraction(), 1e-9) + + // With quota weight = 1.0 and all other weights = 0, high-quota account should win. + // We verify the score ordering directly using isOpenAIAccountCandidateBetter. + highScore := openAIAccountCandidateScore{account: highQuota, score: 0.9} + lowScore := openAIAccountCandidateScore{account: lowQuota, score: 0.1} + require.True(t, isOpenAIAccountCandidateBetter(highScore, lowScore)) + require.False(t, isOpenAIAccountCandidateBetter(lowScore, highScore)) +}