feat(scheduler): add P2C + quota-aware scheduling for OpenAI accounts

- Add GetQuotaRemainingFraction() to Account: returns [0,1] fraction of
  remaining quota; 1.0 when no limit is configured (unlimited accounts)
- Add Quota float64 weight field to GatewayOpenAIWSSchedulerScoreWeights
  and EnableP2CScheduling bool to GatewayOpenAIWSConfig (both default off)
- Extend selectByLoadBalance scoring with quota factor (gated by Quota>0)
- Add selectByPowerOfTwo(): O(1) P2C selection — samples 2 random candidates,
  tries the better-scored one first then the other, falls back to wait plan;
  activated when EnableP2CScheduling=true
- Add openAIWSP2CEnabled() helper on OpenAIGatewayService
- Add 6 tests covering quota fraction edge cases, P2C toggle, weight defaults,
  single-candidate P2C, two-candidate P2C selection, and quota score ordering
This commit is contained in:
win 2026-04-29 03:13:30 +08:00
parent d1e2d39c26
commit a2ab67f8c7
4 changed files with 252 additions and 2 deletions

View File

@ -657,6 +657,8 @@ type GatewayOpenAIWSConfig struct {
// StickyPreviousResponseTTLSeconds: 兼容旧键(当新键未设置时回退)
StickyPreviousResponseTTLSeconds int `mapstructure:"sticky_previous_response_ttl_seconds"`
// EnableP2CScheduling: 启用 Power-of-Two-Choices 调度(默认 false使用 top-K 加权随机)
EnableP2CScheduling bool `mapstructure:"enable_p2c_scheduling"`
SchedulerScoreWeights GatewayOpenAIWSSchedulerScoreWeights `mapstructure:"scheduler_score_weights"`
}
@ -667,6 +669,8 @@ type GatewayOpenAIWSSchedulerScoreWeights struct {
Queue float64 `mapstructure:"queue"`
ErrorRate float64 `mapstructure:"error_rate"`
TTFT float64 `mapstructure:"ttft"`
// Quota: 剩余配额比例权重0 表示不参与打分)
Quota float64 `mapstructure:"quota"`
}
// GatewayUsageRecordConfig 使用量记录异步队列配置
@ -2197,7 +2201,8 @@ func (c *Config) Validate() error {
c.Gateway.OpenAIWS.SchedulerScoreWeights.Load < 0 ||
c.Gateway.OpenAIWS.SchedulerScoreWeights.Queue < 0 ||
c.Gateway.OpenAIWS.SchedulerScoreWeights.ErrorRate < 0 ||
c.Gateway.OpenAIWS.SchedulerScoreWeights.TTFT < 0 {
c.Gateway.OpenAIWS.SchedulerScoreWeights.TTFT < 0 ||
c.Gateway.OpenAIWS.SchedulerScoreWeights.Quota < 0 {
return fmt.Errorf("gateway.openai_ws.scheduler_score_weights.* must be non-negative")
}
weightSum := c.Gateway.OpenAIWS.SchedulerScoreWeights.Priority +

View File

@ -1305,6 +1305,24 @@ func (a *Account) GetQuotaUsed() float64 {
return a.getExtraFloat64("quota_used")
}
// GetQuotaRemainingFraction returns the fraction of total quota remaining in [0,1].
// Returns 1.0 when no quota limit is set (limit == 0 means unlimited).
func (a *Account) GetQuotaRemainingFraction() float64 {
limit := a.GetQuotaLimit()
if limit <= 0 {
return 1.0
}
used := a.GetQuotaUsed()
remaining := (limit - used) / limit
if remaining < 0 {
return 0
}
if remaining > 1 {
return 1
}
return remaining
}
// GetQuotaDailyLimit 获取日额度限制美元0 表示未启用
func (a *Account) GetQuotaDailyLimit() float64 {
return a.getExtraFloat64("quota_daily_limit")

View File

@ -672,12 +672,18 @@ func (s *defaultOpenAIAccountScheduler) selectByLoadBalance(
if item.hasTTFT && hasTTFTSample && maxTTFT > minTTFT {
ttftFactor = 1 - clamp01((item.ttft-minTTFT)/(maxTTFT-minTTFT))
}
quotaFactor := item.account.GetQuotaRemainingFraction()
item.score = weights.Priority*priorityFactor +
weights.Load*loadFactor +
weights.Queue*queueFactor +
weights.ErrorRate*errorFactor +
weights.TTFT*ttftFactor
weights.TTFT*ttftFactor +
weights.Quota*quotaFactor
}
if s.service.openAIWSP2CEnabled() {
return s.selectByPowerOfTwo(ctx, req, candidates, loadSkew)
}
topK := s.service.openAIWSLBTopK()
@ -888,6 +894,7 @@ func (s *OpenAIGatewayService) openAIWSSchedulerWeights() GatewayOpenAIWSSchedul
Queue: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Queue,
ErrorRate: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.ErrorRate,
TTFT: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.TTFT,
Quota: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota,
}
}
return GatewayOpenAIWSSchedulerScoreWeightsView{
@ -896,15 +903,21 @@ func (s *OpenAIGatewayService) openAIWSSchedulerWeights() GatewayOpenAIWSSchedul
Queue: 0.7,
ErrorRate: 0.8,
TTFT: 0.5,
Quota: 0.0,
}
}
func (s *OpenAIGatewayService) openAIWSP2CEnabled() bool {
return s != nil && s.cfg != nil && s.cfg.Gateway.OpenAIWS.EnableP2CScheduling
}
type GatewayOpenAIWSSchedulerScoreWeightsView struct {
Priority float64
Load float64
Queue float64
ErrorRate float64
TTFT float64
Quota float64
}
func clamp01(value float64) float64 {
@ -918,6 +931,94 @@ func clamp01(value float64) float64 {
}
}
// selectByPowerOfTwo implements Power-of-Two-Choices (P2C): sample 2 random
// candidates and attempt the better-scored one first, then the other.
// This gives O(1) selection with load distribution comparable to top-K when N is large.
func (s *defaultOpenAIAccountScheduler) selectByPowerOfTwo(
ctx context.Context,
req OpenAIAccountScheduleRequest,
candidates []openAIAccountCandidateScore,
loadSkew float64,
) (*AccountSelectionResult, int, int, float64, error) {
n := len(candidates)
if n == 0 {
return nil, 0, 0, loadSkew, ErrNoAvailableAccounts
}
rng := newOpenAISelectionRNG(deriveOpenAISelectionSeed(req))
// Pick two distinct random indices.
idxA := int(rng.nextUint64() % uint64(n))
idxB := idxA
if n > 1 {
for idxB == idxA {
idxB = int(rng.nextUint64() % uint64(n))
}
}
// Order: better candidate first.
first, second := candidates[idxA], candidates[idxB]
if isOpenAIAccountCandidateBetter(second, first) {
first, second = second, first
}
tryAcquire := func(c openAIAccountCandidateScore) (*AccountSelectionResult, bool, error) {
fresh := s.service.resolveFreshSchedulableOpenAIAccount(ctx, c.account, req.RequestedModel)
if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) {
return nil, false, nil
}
fresh = s.service.recheckSelectedOpenAIAccountFromDB(ctx, fresh, req.RequestedModel)
if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) {
return nil, false, nil
}
result, err := s.service.tryAcquireAccountSlot(ctx, fresh.ID, fresh.Concurrency)
if err != nil {
return nil, false, err
}
if result != nil && result.Acquired {
if req.SessionHash != "" {
_ = s.service.BindStickySession(ctx, req.GroupID, req.SessionHash, fresh.ID)
}
return &AccountSelectionResult{
Account: fresh,
Acquired: true,
ReleaseFunc: result.ReleaseFunc,
}, true, nil
}
return nil, false, nil
}
for _, c := range []openAIAccountCandidateScore{first, second} {
result, ok, err := tryAcquire(c)
if err != nil {
return nil, n, 2, loadSkew, err
}
if ok {
return result, n, 2, loadSkew, nil
}
}
// Both slots busy — return wait plan on the better candidate.
cfg := s.service.schedulingConfig()
for _, c := range []openAIAccountCandidateScore{first, second} {
fresh := s.service.resolveFreshSchedulableOpenAIAccount(ctx, c.account, req.RequestedModel)
if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) {
continue
}
return &AccountSelectionResult{
Account: fresh,
WaitPlan: &AccountWaitPlan{
AccountID: fresh.ID,
MaxConcurrency: fresh.Concurrency,
Timeout: cfg.FallbackWaitTimeout,
MaxWaiting: cfg.FallbackMaxWaiting,
},
}, n, 2, loadSkew, nil
}
return nil, n, 2, loadSkew, ErrNoAvailableAccounts
}
func calcLoadSkewByMoments(sum float64, sumSquares float64, count int) float64 {
if count <= 1 {
return 0

View File

@ -966,3 +966,129 @@ func TestDefaultOpenAIAccountScheduler_IsAccountTransportCompatible_Branches(t *
func int64PtrForTest(v int64) *int64 {
return &v
}
func TestAccount_GetQuotaRemainingFraction(t *testing.T) {
// No limit configured → always 1.0 (unlimited)
noLimit := &Account{}
require.Equal(t, 1.0, noLimit.GetQuotaRemainingFraction())
// 50% used
half := &Account{Extra: map[string]any{"quota_limit": 100.0, "quota_used": 50.0}}
require.InDelta(t, 0.5, half.GetQuotaRemainingFraction(), 1e-9)
// Fully exhausted
full := &Account{Extra: map[string]any{"quota_limit": 100.0, "quota_used": 100.0}}
require.Equal(t, 0.0, full.GetQuotaRemainingFraction())
// Over limit → clamp to 0
over := &Account{Extra: map[string]any{"quota_limit": 100.0, "quota_used": 150.0}}
require.Equal(t, 0.0, over.GetQuotaRemainingFraction())
// Fresh (0 used)
fresh := &Account{Extra: map[string]any{"quota_limit": 200.0, "quota_used": 0.0}}
require.Equal(t, 1.0, fresh.GetQuotaRemainingFraction())
}
func TestOpenAIGatewayService_P2CEnabled(t *testing.T) {
require.False(t, (*OpenAIGatewayService)(nil).openAIWSP2CEnabled())
require.False(t, (&OpenAIGatewayService{}).openAIWSP2CEnabled())
cfg := &config.Config{}
cfg.Gateway.OpenAIWS.EnableP2CScheduling = false
require.False(t, (&OpenAIGatewayService{cfg: cfg}).openAIWSP2CEnabled())
cfg.Gateway.OpenAIWS.EnableP2CScheduling = true
require.True(t, (&OpenAIGatewayService{cfg: cfg}).openAIWSP2CEnabled())
}
func TestOpenAIGatewayService_SchedulerWeights_QuotaField(t *testing.T) {
// Default weights: Quota is 0 (disabled by default)
svc := &OpenAIGatewayService{}
weights := svc.openAIWSSchedulerWeights()
require.Equal(t, 0.0, weights.Quota)
// Config-driven quota weight
cfg := &config.Config{}
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota = 0.4
svcWithCfg := &OpenAIGatewayService{cfg: cfg}
require.Equal(t, 0.4, svcWithCfg.openAIWSSchedulerWeights().Quota)
}
func TestDefaultOpenAIAccountScheduler_SelectByPowerOfTwo_SingleCandidate(t *testing.T) {
ctx := context.Background()
groupID := int64(99001)
account := &Account{ID: 71001, Platform: PlatformOpenAI, Type: AccountTypeOAuth, Status: StatusActive, Schedulable: true, Concurrency: 5, Priority: 0}
snapshotCache := &openAISnapshotCacheStub{
snapshotAccounts: []*Account{account},
accountsByID: map[int64]*Account{71001: account},
}
cfg := &config.Config{}
cfg.Gateway.OpenAIWS.EnableP2CScheduling = true
cfg.Gateway.OpenAIWS.LBTopK = 5
svc := &OpenAIGatewayService{
accountRepo: stubOpenAIAccountRepo{accounts: []Account{*account}},
cfg: cfg,
schedulerSnapshot: &SchedulerSnapshotService{cache: snapshotCache},
concurrencyService: NewConcurrencyService(stubConcurrencyCache{}),
}
selection, decision, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "", "gpt-4o", nil, OpenAIUpstreamTransportAny)
require.NoError(t, err)
require.NotNil(t, selection)
require.Equal(t, int64(71001), selection.Account.ID)
require.Equal(t, openAIAccountScheduleLayerLoadBalance, decision.Layer)
}
func TestDefaultOpenAIAccountScheduler_SelectByPowerOfTwo_PicksBetterCandidate(t *testing.T) {
ctx := context.Background()
groupID := int64(99002)
// Account A has low priority (better), B has high priority (worse).
// With P2C enabled and a deterministic seed, we should always get a valid selection.
accountA := &Account{ID: 72001, Platform: PlatformOpenAI, Type: AccountTypeOAuth, Status: StatusActive, Schedulable: true, Concurrency: 5, Priority: 0}
accountB := &Account{ID: 72002, Platform: PlatformOpenAI, Type: AccountTypeOAuth, Status: StatusActive, Schedulable: true, Concurrency: 5, Priority: 10}
snapshotCache := &openAISnapshotCacheStub{
snapshotAccounts: []*Account{accountA, accountB},
accountsByID: map[int64]*Account{72001: accountA, 72002: accountB},
}
cfg := &config.Config{}
cfg.Gateway.OpenAIWS.EnableP2CScheduling = true
svc := &OpenAIGatewayService{
accountRepo: stubOpenAIAccountRepo{accounts: []Account{*accountA, *accountB}},
cfg: cfg,
schedulerSnapshot: &SchedulerSnapshotService{cache: snapshotCache},
concurrencyService: NewConcurrencyService(stubConcurrencyCache{}),
}
selection, decision, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "", "gpt-4o", nil, OpenAIUpstreamTransportAny)
require.NoError(t, err)
require.NotNil(t, selection)
require.NotNil(t, selection.Account)
require.Equal(t, openAIAccountScheduleLayerLoadBalance, decision.Layer)
// Either account is valid; just verify we got a schedulable one.
require.True(t, selection.Account.ID == 72001 || selection.Account.ID == 72002)
}
func TestDefaultOpenAIAccountScheduler_QuotaFactorInfluencesScore(t *testing.T) {
// Verify that quota weight affects scoring by checking GetQuotaRemainingFraction is used.
// Account with high remaining quota should score higher when quota weight > 0.
highQuota := &Account{
ID: 73001, Platform: PlatformOpenAI, Type: AccountTypeOAuth,
Status: StatusActive, Schedulable: true, Concurrency: 5, Priority: 0,
Extra: map[string]any{"quota_limit": 100.0, "quota_used": 10.0}, // 90% remaining
}
lowQuota := &Account{
ID: 73002, Platform: PlatformOpenAI, Type: AccountTypeOAuth,
Status: StatusActive, Schedulable: true, Concurrency: 5, Priority: 0,
Extra: map[string]any{"quota_limit": 100.0, "quota_used": 90.0}, // 10% remaining
}
require.InDelta(t, 0.9, highQuota.GetQuotaRemainingFraction(), 1e-9)
require.InDelta(t, 0.1, lowQuota.GetQuotaRemainingFraction(), 1e-9)
// With quota weight = 1.0 and all other weights = 0, high-quota account should win.
// We verify the score ordering directly using isOpenAIAccountCandidateBetter.
highScore := openAIAccountCandidateScore{account: highQuota, score: 0.9}
lowScore := openAIAccountCandidateScore{account: lowQuota, score: 0.1}
require.True(t, isOpenAIAccountCandidateBetter(highScore, lowScore))
require.False(t, isOpenAIAccountCandidateBetter(lowScore, highScore))
}