x
Some checks are pending
CI / golangci-lint (push) Waiting to run
CI / windsurf-platform (macos-latest) (push) Waiting to run
CI / windsurf-platform (windows-latest) (push) Waiting to run
CI / test (push) Waiting to run
CI / frontend (push) Waiting to run
Security Scan / backend-security (push) Waiting to run
Security Scan / frontend-security (push) Waiting to run

This commit is contained in:
win 2026-05-30 16:30:59 +08:00
parent a420179abb
commit 3cffaa1e8e
3 changed files with 41 additions and 9 deletions

View File

@ -22,16 +22,18 @@ const (
PlatformOpenAI = "openai" PlatformOpenAI = "openai"
PlatformGemini = "gemini" PlatformGemini = "gemini"
PlatformAntigravity = "antigravity" PlatformAntigravity = "antigravity"
PlatformWindsurf = "windsurf"
) )
// Account type constants // Account type constants
const ( const (
AccountTypeOAuth = "oauth" // OAuth类型账号full scope: profile + inference AccountTypeOAuth = "oauth" // OAuth类型账号full scope: profile + inference
AccountTypeSetupToken = "setup-token" // Setup Token类型账号inference only scope AccountTypeSetupToken = "setup-token" // Setup Token类型账号inference only scope
AccountTypeAPIKey = "apikey" // API Key类型账号 AccountTypeAPIKey = "apikey" // API Key类型账号
AccountTypeUpstream = "upstream" // 上游透传类型账号(通过 Base URL + API Key 连接上游) AccountTypeUpstream = "upstream" // 上游透传类型账号(通过 Base URL + API Key 连接上游)
AccountTypeBedrock = "bedrock" // AWS Bedrock 类型账号(通过 SigV4 签名或 API Key 连接 Bedrock由 credentials.auth_mode 区分) AccountTypeBedrock = "bedrock" // AWS Bedrock 类型账号(通过 SigV4 签名或 API Key 连接 Bedrock由 credentials.auth_mode 区分)
AccountTypeServiceAccount = "service_account" // Google Service Account 类型账号(用于 Vertex AI AccountTypeServiceAccount = "service_account" // Google Service Account 类型账号(用于 Vertex AI
AccountTypeWindsurfSession = "windsurf-session" // Windsurf Session 类型账号(邮箱密码登录获取的 session token + api_key
) )
// Redeem type constants // Redeem type constants

View File

@ -417,6 +417,16 @@ type GatewayCache interface {
// DeleteSessionAccountID 删除粘性会话绑定,用于账号不可用时主动清理 // DeleteSessionAccountID 删除粘性会话绑定,用于账号不可用时主动清理
// Delete sticky session binding, used to proactively clean up when account becomes unavailable // Delete sticky session binding, used to proactively clean up when account becomes unavailable
DeleteSessionAccountID(ctx context.Context, groupID int64, sessionHash string) error DeleteSessionAccountID(ctx context.Context, groupID int64, sessionHash string) error
// GetCascadeID 获取 Windsurf Cascade 会话 ID用于 LS 多轮复用)
// Get the Windsurf Cascade ID bound to a chat session for multi-turn LS reuse.
GetCascadeID(ctx context.Context, key string) (string, error)
// SetCascadeID 写入 Cascade 会话 ID
// Persist the Cascade session ID with the given TTL.
SetCascadeID(ctx context.Context, key string, cascadeID string, ttl time.Duration) error
// DeleteCascadeID 失效 Cascade 会话 IDpanel-not-found / 错误时调用)
// Invalidate the cached Cascade session ID on panel-not-found or upstream error.
DeleteCascadeID(ctx context.Context, key string) error
} }
// derefGroupID safely dereferences *int64 to int64, returning 0 if nil // derefGroupID safely dereferences *int64 to int64, returning 0 if nil
@ -580,6 +590,7 @@ type GatewayService struct {
claudeTokenProvider *ClaudeTokenProvider claudeTokenProvider *ClaudeTokenProvider
sessionLimitCache SessionLimitCache // 会话数量限制缓存(仅 Anthropic OAuth/SetupToken sessionLimitCache SessionLimitCache // 会话数量限制缓存(仅 Anthropic OAuth/SetupToken
rpmCache RPMCache // RPM 计数缓存(仅 Anthropic OAuth/SetupToken rpmCache RPMCache // RPM 计数缓存(仅 Anthropic OAuth/SetupToken
rpmTokenBucket *RPMTokenBucketService // RPM 令牌桶平滑(可选,由配置开关控制)
userGroupRateResolver *userGroupRateResolver userGroupRateResolver *userGroupRateResolver
userGroupRateCache *gocache.Cache userGroupRateCache *gocache.Cache
userGroupRateSF singleflight.Group userGroupRateSF singleflight.Group
@ -625,6 +636,7 @@ func NewGatewayService(
channelService *ChannelService, channelService *ChannelService,
resolver *ModelPricingResolver, resolver *ModelPricingResolver,
balanceNotifyService *BalanceNotifyService, balanceNotifyService *BalanceNotifyService,
rpmTokenBucketSvc *RPMTokenBucketService,
userPlatformQuotaRepo UserPlatformQuotaRepository, userPlatformQuotaRepo UserPlatformQuotaRepository,
) *GatewayService { ) *GatewayService {
userGroupRateTTL := resolveUserGroupRateCacheTTL(cfg) userGroupRateTTL := resolveUserGroupRateCacheTTL(cfg)
@ -652,6 +664,7 @@ func NewGatewayService(
claudeTokenProvider: claudeTokenProvider, claudeTokenProvider: claudeTokenProvider,
sessionLimitCache: sessionLimitCache, sessionLimitCache: sessionLimitCache,
rpmCache: rpmCache, rpmCache: rpmCache,
rpmTokenBucket: rpmTokenBucketSvc,
userGroupRateCache: gocache.New(userGroupRateTTL, time.Minute), userGroupRateCache: gocache.New(userGroupRateTTL, time.Minute),
settingService: settingService, settingService: settingService,
modelsListCache: gocache.New(modelsListTTL, time.Minute), modelsListCache: gocache.New(modelsListTTL, time.Minute),
@ -2361,6 +2374,23 @@ func (s *GatewayService) IsSingleAntigravityAccountGroup(ctx context.Context, gr
return len(accounts) == 1 return len(accounts) == 1
} }
func (s *GatewayService) IsSingleWindsurfAccountGroup(ctx context.Context, groupID *int64) bool {
accounts, _, err := s.listSchedulableAccounts(ctx, groupID, PlatformWindsurf, true)
if err != nil {
return false
}
return len(accounts) == 1
}
// AcquireRPMToken consumes one RPM token for the given account, waiting up to maxWait if needed.
// Returns nil immediately when RPM smoothing is not configured or the account has no RPM limit.
func (s *GatewayService) AcquireRPMToken(ctx context.Context, accountID int64, rpm int, maxWait time.Duration) error {
if s.rpmTokenBucket == nil {
return nil
}
return s.rpmTokenBucket.AcquireWithWait(ctx, accountID, rpm, maxWait)
}
func (s *GatewayService) isAccountAllowedForPlatform(account *Account, platform string, useMixed bool) bool { func (s *GatewayService) isAccountAllowedForPlatform(account *Account, platform string, useMixed bool) bool {
if account == nil { if account == nil {
return false return false

View File

@ -1393,11 +1393,11 @@ func (s *defaultOpenAIAccountScheduler) selectByPowerOfTwo(
} }
tryAcquire := func(c openAIAccountCandidateScore) (*AccountSelectionResult, bool, error) { tryAcquire := func(c openAIAccountCandidateScore) (*AccountSelectionResult, bool, error) {
fresh := s.service.resolveFreshSchedulableOpenAIAccount(ctx, c.account, req.RequestedModel, req.RequireCompact) fresh := s.service.resolveFreshSchedulableOpenAIAccount(ctx, c.account, req.RequestedModel, req.RequireCompact, req.RequiredCapability)
if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) { if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) {
return nil, false, nil return nil, false, nil
} }
fresh = s.service.recheckSelectedOpenAIAccountFromDB(ctx, fresh, req.RequestedModel, req.RequireCompact) fresh = s.service.recheckSelectedOpenAIAccountFromDB(ctx, fresh, req.RequestedModel, req.RequireCompact, req.RequiredCapability)
if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) { if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) {
return nil, false, nil return nil, false, nil
} }
@ -1431,7 +1431,7 @@ func (s *defaultOpenAIAccountScheduler) selectByPowerOfTwo(
// Both slots busy — return wait plan on the better candidate. // Both slots busy — return wait plan on the better candidate.
cfg := s.service.schedulingConfig() cfg := s.service.schedulingConfig()
for _, c := range []openAIAccountCandidateScore{first, second} { for _, c := range []openAIAccountCandidateScore{first, second} {
fresh := s.service.resolveFreshSchedulableOpenAIAccount(ctx, c.account, req.RequestedModel, req.RequireCompact) fresh := s.service.resolveFreshSchedulableOpenAIAccount(ctx, c.account, req.RequestedModel, req.RequireCompact, req.RequiredCapability)
if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) { if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) {
continue continue
} }