x
Some checks failed
Security Scan / backend-security (push) Failing after 1m31s
Security Scan / frontend-security (push) Failing after 7s
CI / test (push) Failing after 6s
CI / frontend (push) Failing after 4s
CI / golangci-lint (push) Failing after 4s
CI / windsurf-platform (macos-latest) (push) Has been cancelled
CI / windsurf-platform (windows-latest) (push) Has been cancelled
Some checks failed
Security Scan / backend-security (push) Failing after 1m31s
Security Scan / frontend-security (push) Failing after 7s
CI / test (push) Failing after 6s
CI / frontend (push) Failing after 4s
CI / golangci-lint (push) Failing after 4s
CI / windsurf-platform (macos-latest) (push) Has been cancelled
CI / windsurf-platform (windows-latest) (push) Has been cancelled
This commit is contained in:
parent
2a9c5da91a
commit
0a3666ef24
@ -153,7 +153,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
|
|||||||
antigravityGatewayService := service.NewAntigravityGatewayService(accountRepository, gatewayCache, schedulerSnapshotService, antigravityTokenProvider, rateLimitService, httpUpstream, settingService, internal500CounterCache)
|
antigravityGatewayService := service.NewAntigravityGatewayService(accountRepository, gatewayCache, schedulerSnapshotService, antigravityTokenProvider, rateLimitService, httpUpstream, settingService, internal500CounterCache)
|
||||||
windsurfLSService := service.ProvideWindsurfLSService(configConfig)
|
windsurfLSService := service.ProvideWindsurfLSService(configConfig)
|
||||||
windsurfTokenProvider := service.ProvideWindsurfTokenProvider(configConfig, accountRepository, proxyRepository)
|
windsurfTokenProvider := service.ProvideWindsurfTokenProvider(configConfig, accountRepository, proxyRepository)
|
||||||
windsurfChatService := service.ProvideWindsurfChatService(configConfig, windsurfLSService, windsurfTokenProvider)
|
windsurfChatService := service.ProvideWindsurfChatService(configConfig, windsurfLSService, windsurfTokenProvider, gatewayCache)
|
||||||
windsurfGatewayService := service.ProvideWindsurfGatewayService(configConfig, windsurfChatService, accountRepository)
|
windsurfGatewayService := service.ProvideWindsurfGatewayService(configConfig, windsurfChatService, accountRepository)
|
||||||
accountTestService := service.NewAccountTestService(accountRepository, geminiTokenProvider, antigravityGatewayService, windsurfChatService, httpUpstream, configConfig, tlsFingerprintProfileService)
|
accountTestService := service.NewAccountTestService(accountRepository, geminiTokenProvider, antigravityGatewayService, windsurfChatService, httpUpstream, configConfig, tlsFingerprintProfileService)
|
||||||
crsSyncService := service.NewCRSSyncService(accountRepository, proxyRepository, oAuthService, openAIOAuthService, geminiOAuthService, configConfig)
|
crsSyncService := service.NewCRSSyncService(accountRepository, proxyRepository, oAuthService, openAIOAuthService, geminiOAuthService, configConfig)
|
||||||
|
|||||||
@ -42,12 +42,6 @@ const (
|
|||||||
stickyGraceRetries = 1
|
stickyGraceRetries = 1
|
||||||
// stickyGraceDelay 粘性宽限重试间隔(默认)
|
// stickyGraceDelay 粘性宽限重试间隔(默认)
|
||||||
stickyGraceDelay = 1500 * time.Millisecond
|
stickyGraceDelay = 1500 * time.Millisecond
|
||||||
// windsurfStickyGraceRetries Windsurf 平台专属粘性宽限次数。
|
|
||||||
// Windsurf 的 LS 进程有冷启动开销,且切号后需要重建完整历史上下文(最多 3.5MB),
|
|
||||||
// 宽限次数更多可减少不必要切号,保留 cascade 会话连续性。
|
|
||||||
windsurfStickyGraceRetries = 3
|
|
||||||
// windsurfStickyGraceDelay Windsurf 平台粘性宽限重试间隔(LS 处理更耗时)
|
|
||||||
windsurfStickyGraceDelay = 2000 * time.Millisecond
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// FailoverState 跨循环迭代共享的 failover 状态
|
// FailoverState 跨循环迭代共享的 failover 状态
|
||||||
|
|||||||
@ -749,15 +749,16 @@ func TestHandleFailoverError_StickyGraceConfig(t *testing.T) {
|
|||||||
require.Equal(t, 1, fs.SwitchCount, "宽限用完后应切换")
|
require.Equal(t, 1, fs.SwitchCount, "宽限用完后应切换")
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Windsurf专属配置grace=3次", func(t *testing.T) {
|
t.Run("自定义grace=3次", func(t *testing.T) {
|
||||||
mock := &mockTempUnscheduler{}
|
mock := &mockTempUnscheduler{}
|
||||||
|
const customGrace = 3
|
||||||
fs := NewFailoverState(5, true).
|
fs := NewFailoverState(5, true).
|
||||||
WithStickyBoundAccount(100).
|
WithStickyBoundAccount(100).
|
||||||
WithStickyGraceConfig(windsurfStickyGraceRetries, 10*time.Millisecond) // 用极短间隔加速测试
|
WithStickyGraceConfig(customGrace, 10*time.Millisecond) // 用极短间隔加速测试
|
||||||
err := newTestFailoverErr(500, false, false)
|
err := newTestFailoverErr(500, false, false)
|
||||||
|
|
||||||
// 前 3 次都应该是宽限重试
|
// 前 3 次都应该是宽限重试
|
||||||
for i := 1; i <= windsurfStickyGraceRetries; i++ {
|
for i := 1; i <= customGrace; i++ {
|
||||||
action := fs.HandleFailoverError(context.Background(), mock, 100, service.PlatformWindsurf, err)
|
action := fs.HandleFailoverError(context.Background(), mock, 100, service.PlatformWindsurf, err)
|
||||||
require.Equal(t, FailoverContinue, action, "第%d次应为宽限重试", i)
|
require.Equal(t, FailoverContinue, action, "第%d次应为宽限重试", i)
|
||||||
require.Equal(t, i, fs.stickyGraceUsed)
|
require.Equal(t, i, fs.stickyGraceUsed)
|
||||||
@ -774,7 +775,7 @@ func TestHandleFailoverError_StickyGraceConfig(t *testing.T) {
|
|||||||
mock := &mockTempUnscheduler{}
|
mock := &mockTempUnscheduler{}
|
||||||
fs := NewFailoverState(5, true).
|
fs := NewFailoverState(5, true).
|
||||||
WithStickyBoundAccount(100).
|
WithStickyBoundAccount(100).
|
||||||
WithStickyGraceConfig(windsurfStickyGraceRetries, 10*time.Millisecond)
|
WithStickyGraceConfig(3, 10*time.Millisecond)
|
||||||
err := newTestFailoverErr(500, false, false)
|
err := newTestFailoverErr(500, false, false)
|
||||||
|
|
||||||
// 非 sticky 账号 200 不走宽限,直接切换
|
// 非 sticky 账号 200 不走宽限,直接切换
|
||||||
|
|||||||
@ -543,9 +543,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
fs := NewFailoverState(h.maxAccountSwitches, hasBoundSession).WithStickyBoundAccount(sessionBoundAccountID)
|
fs := NewFailoverState(h.maxAccountSwitches, hasBoundSession).WithStickyBoundAccount(sessionBoundAccountID)
|
||||||
if platform == service.PlatformWindsurf {
|
|
||||||
fs.WithStickyGraceConfig(windsurfStickyGraceRetries, windsurfStickyGraceDelay)
|
|
||||||
}
|
|
||||||
retryWithFallback := false
|
retryWithFallback := false
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@ -725,7 +722,11 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
|||||||
// 记录 Forward 前已写入字节数,Forward 后若增加则说明 SSE 内容已发,禁止 failover
|
// 记录 Forward 前已写入字节数,Forward 后若增加则说明 SSE 内容已发,禁止 failover
|
||||||
writerSizeBeforeForward := c.Writer.Size()
|
writerSizeBeforeForward := c.Writer.Size()
|
||||||
if account.Platform == service.PlatformWindsurf {
|
if account.Platform == service.PlatformWindsurf {
|
||||||
result, err = h.windsurfGatewayService.Forward(requestCtx, c, account, body, hasBoundSession)
|
windsurfGroupID := int64(0)
|
||||||
|
if apiKey.GroupID != nil {
|
||||||
|
windsurfGroupID = *apiKey.GroupID
|
||||||
|
}
|
||||||
|
result, err = h.windsurfGatewayService.Forward(requestCtx, c, account, body, hasBoundSession, windsurfGroupID, sessionHash)
|
||||||
} else if account.Platform == service.PlatformAntigravity && account.Type != service.AccountTypeAPIKey {
|
} else if account.Platform == service.PlatformAntigravity && account.Type != service.AccountTypeAPIKey {
|
||||||
result, err = h.antigravityGatewayService.Forward(requestCtx, c, account, body, hasBoundSession)
|
result, err = h.antigravityGatewayService.Forward(requestCtx, c, account, body, hasBoundSession)
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -9,7 +9,10 @@ import (
|
|||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
const stickySessionPrefix = "sticky_session:"
|
const (
|
||||||
|
stickySessionPrefix = "sticky_session:"
|
||||||
|
cascadeIDPrefix = "windsurf_cascade:"
|
||||||
|
)
|
||||||
|
|
||||||
type gatewayCache struct {
|
type gatewayCache struct {
|
||||||
rdb *redis.Client
|
rdb *redis.Client
|
||||||
@ -51,3 +54,29 @@ func (c *gatewayCache) DeleteSessionAccountID(ctx context.Context, groupID int64
|
|||||||
key := buildSessionKey(groupID, sessionHash)
|
key := buildSessionKey(groupID, sessionHash)
|
||||||
return c.rdb.Del(ctx, key).Err()
|
return c.rdb.Del(ctx, key).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// buildCascadeKey 构造 Windsurf Cascade ID 缓存 key。
|
||||||
|
// 上层已将 (groupID, accountID, modelUID, lsEndpoint, sessionHash, sysPromptHash) 哈希为单一字符串。
|
||||||
|
func buildCascadeKey(key string) string {
|
||||||
|
return cascadeIDPrefix + key
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetCascadeID 读取 Cascade 会话 ID。redis.Nil 视为未命中(返回空串与 nil)。
|
||||||
|
func (c *gatewayCache) GetCascadeID(ctx context.Context, key string) (string, error) {
|
||||||
|
v, err := c.rdb.Get(ctx, buildCascadeKey(key)).Result()
|
||||||
|
if err == redis.Nil {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
return v, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetCascadeID 写入 Cascade 会话 ID。
|
||||||
|
func (c *gatewayCache) SetCascadeID(ctx context.Context, key string, cascadeID string, ttl time.Duration) error {
|
||||||
|
return c.rdb.Set(ctx, buildCascadeKey(key), cascadeID, ttl).Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteCascadeID 失效 Cascade 会话 ID。
|
||||||
|
func (c *gatewayCache) DeleteCascadeID(ctx context.Context, key string) error {
|
||||||
|
return c.rdb.Del(ctx, buildCascadeKey(key)).Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
@ -104,6 +104,61 @@ func (s *GatewayCacheSuite) TestGetSessionAccountID_CorruptedValue() {
|
|||||||
require.False(s.T(), errors.Is(err, redis.Nil), "expected parsing error, not redis.Nil")
|
require.False(s.T(), errors.Is(err, redis.Nil), "expected parsing error, not redis.Nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *GatewayCacheSuite) TestGetCascadeID_Missing() {
|
||||||
|
id, err := s.cache.GetCascadeID(s.ctx, "nonexistent-cascade-key")
|
||||||
|
require.NoError(s.T(), err, "missing cascade key should return nil error")
|
||||||
|
require.Equal(s.T(), "", id, "missing cascade key should return empty string")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *GatewayCacheSuite) TestSetAndGetCascadeID() {
|
||||||
|
key := "ab12cd34"
|
||||||
|
cascadeID := "cascade-uuid-xyz"
|
||||||
|
ttl := 30 * time.Minute
|
||||||
|
|
||||||
|
require.NoError(s.T(), s.cache.SetCascadeID(s.ctx, key, cascadeID, ttl), "SetCascadeID")
|
||||||
|
|
||||||
|
got, err := s.cache.GetCascadeID(s.ctx, key)
|
||||||
|
require.NoError(s.T(), err, "GetCascadeID")
|
||||||
|
require.Equal(s.T(), cascadeID, got, "cascade id round-trip mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *GatewayCacheSuite) TestCascadeID_TTL() {
|
||||||
|
key := "ttl-key"
|
||||||
|
ttl := 30 * time.Minute
|
||||||
|
|
||||||
|
require.NoError(s.T(), s.cache.SetCascadeID(s.ctx, key, "cid", ttl), "SetCascadeID")
|
||||||
|
|
||||||
|
redisKey := buildCascadeKey(key)
|
||||||
|
got, err := s.rdb.TTL(s.ctx, redisKey).Result()
|
||||||
|
require.NoError(s.T(), err, "TTL after SetCascadeID")
|
||||||
|
s.AssertTTLWithin(got, 1*time.Second, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *GatewayCacheSuite) TestDeleteCascadeID() {
|
||||||
|
key := "del-key"
|
||||||
|
require.NoError(s.T(), s.cache.SetCascadeID(s.ctx, key, "cid", 1*time.Minute), "SetCascadeID")
|
||||||
|
require.NoError(s.T(), s.cache.DeleteCascadeID(s.ctx, key), "DeleteCascadeID")
|
||||||
|
|
||||||
|
got, err := s.cache.GetCascadeID(s.ctx, key)
|
||||||
|
require.NoError(s.T(), err, "GetCascadeID after delete should not error")
|
||||||
|
require.Equal(s.T(), "", got, "deleted cascade key should return empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 验证 cascade key 与 sticky session key 命名空间隔离(前缀不同)。
|
||||||
|
func (s *GatewayCacheSuite) TestCascadeID_NamespaceIsolation() {
|
||||||
|
commonID := "shared-id"
|
||||||
|
require.NoError(s.T(), s.cache.SetCascadeID(s.ctx, commonID, "cascade-value", 1*time.Minute), "SetCascadeID")
|
||||||
|
require.NoError(s.T(), s.cache.SetSessionAccountID(s.ctx, 1, commonID, 42, 1*time.Minute), "SetSessionAccountID")
|
||||||
|
|
||||||
|
gotCascade, err := s.cache.GetCascadeID(s.ctx, commonID)
|
||||||
|
require.NoError(s.T(), err, "GetCascadeID")
|
||||||
|
require.Equal(s.T(), "cascade-value", gotCascade, "cascade namespace must not be polluted by sticky session")
|
||||||
|
|
||||||
|
gotAccount, err := s.cache.GetSessionAccountID(s.ctx, 1, commonID)
|
||||||
|
require.NoError(s.T(), err, "GetSessionAccountID")
|
||||||
|
require.Equal(s.T(), int64(42), gotAccount, "sticky session must not be polluted by cascade write")
|
||||||
|
}
|
||||||
|
|
||||||
func TestGatewayCacheSuite(t *testing.T) {
|
func TestGatewayCacheSuite(t *testing.T) {
|
||||||
suite.Run(t, new(GatewayCacheSuite))
|
suite.Run(t, new(GatewayCacheSuite))
|
||||||
}
|
}
|
||||||
|
|||||||
@ -238,6 +238,18 @@ func (m *mockGatewayCacheForPlatform) DeleteSessionAccountID(ctx context.Context
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockGatewayCacheForPlatform) GetCascadeID(ctx context.Context, key string) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockGatewayCacheForPlatform) SetCascadeID(ctx context.Context, key string, cascadeID string, ttl time.Duration) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockGatewayCacheForPlatform) DeleteCascadeID(ctx context.Context, key string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type mockGroupRepoForGateway struct {
|
type mockGroupRepoForGateway struct {
|
||||||
groups map[int64]*Group
|
groups map[int64]*Group
|
||||||
getByIDCalls int
|
getByIDCalls int
|
||||||
|
|||||||
@ -402,6 +402,16 @@ type GatewayCache interface {
|
|||||||
// DeleteSessionAccountID 删除粘性会话绑定,用于账号不可用时主动清理
|
// DeleteSessionAccountID 删除粘性会话绑定,用于账号不可用时主动清理
|
||||||
// Delete sticky session binding, used to proactively clean up when account becomes unavailable
|
// Delete sticky session binding, used to proactively clean up when account becomes unavailable
|
||||||
DeleteSessionAccountID(ctx context.Context, groupID int64, sessionHash string) error
|
DeleteSessionAccountID(ctx context.Context, groupID int64, sessionHash string) error
|
||||||
|
|
||||||
|
// GetCascadeID 获取 Windsurf Cascade 会话 ID(用于 LS 多轮复用)
|
||||||
|
// Get the Windsurf Cascade ID bound to a chat session for multi-turn LS reuse.
|
||||||
|
GetCascadeID(ctx context.Context, key string) (string, error)
|
||||||
|
// SetCascadeID 写入 Cascade 会话 ID
|
||||||
|
// Persist the Cascade session ID with the given TTL.
|
||||||
|
SetCascadeID(ctx context.Context, key string, cascadeID string, ttl time.Duration) error
|
||||||
|
// DeleteCascadeID 失效 Cascade 会话 ID(panel-not-found / 错误时调用)
|
||||||
|
// Invalidate the cached Cascade session ID on panel-not-found or upstream error.
|
||||||
|
DeleteCascadeID(ctx context.Context, key string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// derefGroupID safely dereferences *int64 to int64, returning 0 if nil
|
// derefGroupID safely dereferences *int64 to int64, returning 0 if nil
|
||||||
|
|||||||
@ -149,6 +149,30 @@ func (c *schedulerTestGatewayCache) DeleteSessionAccountID(ctx context.Context,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *schedulerTestGatewayCache) GetCascadeID(ctx context.Context, key string) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *schedulerTestGatewayCache) SetCascadeID(ctx context.Context, key string, cascadeID string, ttl time.Duration) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *schedulerTestGatewayCache) DeleteCascadeID(ctx context.Context, key string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *schedulerTestGatewayCache) GetWindsurfCascadeID(_ context.Context, _ string) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *schedulerTestGatewayCache) SetWindsurfCascadeID(_ context.Context, _ string, _ string, _ time.Duration) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *schedulerTestGatewayCache) DeleteWindsurfCascadeID(_ context.Context, _ string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func newSchedulerTestOpenAIWSV2Config() *config.Config {
|
func newSchedulerTestOpenAIWSV2Config() *config.Config {
|
||||||
cfg := &config.Config{}
|
cfg := &config.Config{}
|
||||||
cfg.Gateway.OpenAIWS.Enabled = true
|
cfg.Gateway.OpenAIWS.Enabled = true
|
||||||
|
|||||||
@ -344,6 +344,30 @@ func (c *stubGatewayCache) DeleteSessionAccountID(ctx context.Context, groupID i
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *stubGatewayCache) GetCascadeID(ctx context.Context, key string) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *stubGatewayCache) SetCascadeID(ctx context.Context, key string, cascadeID string, ttl time.Duration) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *stubGatewayCache) DeleteCascadeID(ctx context.Context, key string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *stubGatewayCache) GetWindsurfCascadeID(_ context.Context, _ string) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *stubGatewayCache) SetWindsurfCascadeID(_ context.Context, _ string, _ string, _ time.Duration) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *stubGatewayCache) DeleteWindsurfCascadeID(_ context.Context, _ string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestOpenAISelectAccountWithLoadAwareness_FiltersUnschedulable(t *testing.T) {
|
func TestOpenAISelectAccountWithLoadAwareness_FiltersUnschedulable(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
resetAt := now.Add(10 * time.Minute)
|
resetAt := now.Add(10 * time.Minute)
|
||||||
|
|||||||
@ -185,6 +185,18 @@ func (c *openAIWSStateStoreTimeoutProbeCache) RefreshSessionTTL(context.Context,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *openAIWSStateStoreTimeoutProbeCache) GetCascadeID(ctx context.Context, _ string) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *openAIWSStateStoreTimeoutProbeCache) SetCascadeID(ctx context.Context, _ string, _ string, _ time.Duration) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *openAIWSStateStoreTimeoutProbeCache) DeleteCascadeID(ctx context.Context, _ string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *openAIWSStateStoreTimeoutProbeCache) DeleteSessionAccountID(ctx context.Context, _ int64, _ string) error {
|
func (c *openAIWSStateStoreTimeoutProbeCache) DeleteSessionAccountID(ctx context.Context, _ int64, _ string) error {
|
||||||
if deadline, ok := ctx.Deadline(); ok {
|
if deadline, ok := ctx.Deadline(); ok {
|
||||||
c.deleteHasDeadline = true
|
c.deleteHasDeadline = true
|
||||||
@ -193,6 +205,18 @@ func (c *openAIWSStateStoreTimeoutProbeCache) DeleteSessionAccountID(ctx context
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *openAIWSStateStoreTimeoutProbeCache) GetWindsurfCascadeID(_ context.Context, _ string) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *openAIWSStateStoreTimeoutProbeCache) SetWindsurfCascadeID(_ context.Context, _ string, _ string, _ time.Duration) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *openAIWSStateStoreTimeoutProbeCache) DeleteWindsurfCascadeID(_ context.Context, _ string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestOpenAIWSStateStore_RedisOpsUseShortTimeout(t *testing.T) {
|
func TestOpenAIWSStateStore_RedisOpsUseShortTimeout(t *testing.T) {
|
||||||
probe := &openAIWSStateStoreTimeoutProbeCache{}
|
probe := &openAIWSStateStoreTimeoutProbeCache{}
|
||||||
store := NewOpenAIWSStateStore(probe)
|
store := NewOpenAIWSStateStore(probe)
|
||||||
|
|||||||
@ -2,6 +2,8 @@ package service
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"strings"
|
"strings"
|
||||||
@ -11,28 +13,38 @@ import (
|
|||||||
"github.com/Wei-Shaw/sub2api/internal/pkg/windsurf"
|
"github.com/Wei-Shaw/sub2api/internal/pkg/windsurf"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// cascadeReuseTTL 是 Windsurf Cascade 会话 ID 在 Redis 中的存活时长。
|
||||||
|
// 与 LS 端 cascade 老化窗口对齐:超过 30 分钟即使本地 cache 命中,
|
||||||
|
// LS 也大概率返回 panel-not-found,由 chatCascade 的回退路径兜底。
|
||||||
|
const cascadeReuseTTL = 30 * time.Minute
|
||||||
|
|
||||||
type WindsurfChatService struct {
|
type WindsurfChatService struct {
|
||||||
cfg config.WindsurfConfig
|
cfg config.WindsurfConfig
|
||||||
lsService *WindsurfLSService
|
lsService *WindsurfLSService
|
||||||
tokenProvider *WindsurfTokenProvider
|
tokenProvider *WindsurfTokenProvider
|
||||||
pool *windsurf.ConversationPool
|
cache GatewayCache
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWindsurfChatService(
|
func NewWindsurfChatService(
|
||||||
cfg config.WindsurfConfig,
|
cfg config.WindsurfConfig,
|
||||||
lsService *WindsurfLSService,
|
lsService *WindsurfLSService,
|
||||||
tokenProvider *WindsurfTokenProvider,
|
tokenProvider *WindsurfTokenProvider,
|
||||||
|
cache GatewayCache,
|
||||||
) *WindsurfChatService {
|
) *WindsurfChatService {
|
||||||
return &WindsurfChatService{
|
return &WindsurfChatService{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
lsService: lsService,
|
lsService: lsService,
|
||||||
tokenProvider: tokenProvider,
|
tokenProvider: tokenProvider,
|
||||||
pool: windsurf.NewConversationPool(),
|
cache: cache,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type WindsurfChatRequest struct {
|
type WindsurfChatRequest struct {
|
||||||
AccountID int64
|
AccountID int64
|
||||||
|
// GroupID 来自 apiKey.GroupID,用于 Cascade 复用 cache 隔离。0 表示不参与缓存。
|
||||||
|
GroupID int64
|
||||||
|
// SessionHash 与 sticky session 共用,标识同一对话流。空串表示不复用 cascade。
|
||||||
|
SessionHash string
|
||||||
Model string
|
Model string
|
||||||
Messages []windsurf.ChatMessage
|
Messages []windsurf.ChatMessage
|
||||||
Stream bool
|
Stream bool
|
||||||
@ -83,11 +95,11 @@ func (s *WindsurfChatService) Chat(ctx context.Context, req *WindsurfChatRequest
|
|||||||
var resp *WindsurfChatResponse
|
var resp *WindsurfChatResponse
|
||||||
switch mode {
|
switch mode {
|
||||||
case "cascade":
|
case "cascade":
|
||||||
resp, err = s.chatCascade(ctx, lease.Client, token.APIKey, meta, req.Messages, req.ToolPreamble, modelKey, lease.Endpoint, req.Images)
|
resp, err = s.chatCascade(ctx, lease.Client, token.APIKey, meta, req.Messages, req.ToolPreamble, modelKey, lease.Endpoint, req.Images, req.AccountID, req.GroupID, req.SessionHash)
|
||||||
case "legacy":
|
case "legacy":
|
||||||
resp, err = s.chatLegacy(ctx, lease.Client, token.APIKey, meta, req.Messages, modelKey)
|
resp, err = s.chatLegacy(ctx, lease.Client, token.APIKey, meta, req.Messages, modelKey)
|
||||||
default:
|
default:
|
||||||
resp, err = s.chatCascade(ctx, lease.Client, token.APIKey, meta, req.Messages, req.ToolPreamble, modelKey, lease.Endpoint, req.Images)
|
resp, err = s.chatCascade(ctx, lease.Client, token.APIKey, meta, req.Messages, req.ToolPreamble, modelKey, lease.Endpoint, req.Images, req.AccountID, req.GroupID, req.SessionHash)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -146,7 +158,20 @@ func injectModelIdentity(messages []windsurf.ChatMessage, meta *windsurf.ModelMe
|
|||||||
return append([]windsurf.ChatMessage{identity}, messages...)
|
return append([]windsurf.ChatMessage{identity}, messages...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WindsurfChatService) chatCascade(ctx context.Context, client *windsurf.LocalLSClient, apiKey string, meta *windsurf.ModelMeta, messages []windsurf.ChatMessage, toolPreamble string, modelKey string, lsEndpoint string, images []windsurf.CascadeImage) (*WindsurfChatResponse, error) {
|
func (s *WindsurfChatService) chatCascade(
|
||||||
|
ctx context.Context,
|
||||||
|
client *windsurf.LocalLSClient,
|
||||||
|
apiKey string,
|
||||||
|
meta *windsurf.ModelMeta,
|
||||||
|
messages []windsurf.ChatMessage,
|
||||||
|
toolPreamble string,
|
||||||
|
modelKey string,
|
||||||
|
lsEndpoint string,
|
||||||
|
images []windsurf.CascadeImage,
|
||||||
|
accountID int64,
|
||||||
|
groupID int64,
|
||||||
|
sessionHash string,
|
||||||
|
) (*WindsurfChatResponse, error) {
|
||||||
modelUID := ""
|
modelUID := ""
|
||||||
modelEnumHint := 0
|
modelEnumHint := 0
|
||||||
if meta != nil {
|
if meta != nil {
|
||||||
@ -154,68 +179,63 @@ func (s *WindsurfChatService) chatCascade(ctx context.Context, client *windsurf.
|
|||||||
modelEnumHint = meta.EnumValue
|
modelEnumHint = meta.EnumValue
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Model identity prompt injection ──
|
|
||||||
// When the client doesn't provide its own system prompt, prepend one so
|
|
||||||
// the model identifies itself as the requested model rather than leaking
|
|
||||||
// the underlying Windsurf/Cascade backend identity.
|
|
||||||
// Skip when the client already has a system message (Claude Code / Cline)
|
|
||||||
// to avoid triggering Cascade anti-injection on reasoning models.
|
|
||||||
messages = injectModelIdentity(messages, meta, modelKey)
|
messages = injectModelIdentity(messages, meta, modelKey)
|
||||||
|
|
||||||
// 图像能力 gate:仅在请求含图时检查。
|
|
||||||
// 策略:fail-open on RPC error;显式 supports_images=false 时拒绝(返回 CascadeModelError 触发 failover)。
|
|
||||||
if len(images) > 0 {
|
if len(images) > 0 {
|
||||||
found, ok, err := client.ModelSupportsImages(ctx, apiKey, modelUID)
|
found, ok, err := client.ModelSupportsImages(ctx, apiKey, modelUID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Warn("windsurf_cascade_caps_fetch_failed", "model", modelUID, "error", err)
|
slog.Warn("windsurf_cascade_caps_fetch_failed", "model", modelUID, "error", err)
|
||||||
// fail-open
|
|
||||||
} else if found && !ok {
|
} else if found && !ok {
|
||||||
return nil, fmt.Errorf("model %q does not support image inputs in Windsurf Cascade", modelUID)
|
return nil, fmt.Errorf("model %q does not support image inputs in Windsurf Cascade", modelUID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fpBefore := windsurf.FingerprintBefore(messages, modelKey, apiKey)
|
// reuse 路径:sessionHash 非空且 cache 命中即复用 cascade,
|
||||||
// failover 切号后禁止复用 cascade:cascade_id 属于上一个账号的 LS,
|
// userText 缩为"system + 最后一条 user"——cascade trajectory 已承载历史。
|
||||||
// 在当前账号上一定会触发 "panel state not found" 浪费一次请求。
|
canReuse := sessionHash != "" && s.cache != nil && groupID != 0
|
||||||
// 同时切号场景下需要提升历史预算——新账号完全没有服务端上下文,
|
cacheKey := ""
|
||||||
// 必须把完整聊天记录塞进文本里。
|
reuseID := ""
|
||||||
skipReuse := false
|
if canReuse {
|
||||||
switchover := false
|
cacheKey = buildCascadeCacheKey(groupID, accountID, modelUID, lsEndpoint, sessionHash, sysPromptHash(messages))
|
||||||
if switches, ok := AccountSwitchCountFromContext(ctx); ok && switches > 0 {
|
if id, err := s.cache.GetCascadeID(ctx, cacheKey); err == nil && id != "" {
|
||||||
skipReuse = true
|
reuseID = id
|
||||||
switchover = true
|
} else if err != nil {
|
||||||
}
|
slog.Warn("windsurf_cascade_cache_get_failed", "error", err)
|
||||||
var entry *windsurf.ConversationEntry
|
}
|
||||||
if !skipReuse {
|
|
||||||
entry = s.pool.Checkout(fpBefore)
|
|
||||||
}
|
|
||||||
isResume := entry != nil && entry.CascadeID != ""
|
|
||||||
|
|
||||||
var reuseCascadeID string
|
|
||||||
if isResume {
|
|
||||||
reuseCascadeID = entry.CascadeID
|
|
||||||
slog.Info("windsurf_cascade_reuse_hit", "cascade_id", reuseCascadeID[:8], "model", modelKey)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
userText := buildCascadeText(messages, modelUID, isResume, switchover)
|
var userText string
|
||||||
|
if reuseID != "" {
|
||||||
|
userText = buildCascadeTextForReuse(messages)
|
||||||
|
} else {
|
||||||
|
userText = buildCascadeText(messages, modelUID)
|
||||||
|
}
|
||||||
|
|
||||||
result, err := client.StreamCascadeChat(ctx, apiKey, modelUID, userText, toolPreamble, reuseCascadeID, modelEnumHint, images)
|
result, err := client.StreamCascadeChat(ctx, apiKey, modelUID, userText, toolPreamble, reuseID, modelEnumHint, images)
|
||||||
if err != nil && isResume {
|
|
||||||
slog.Warn("windsurf_cascade_reuse_failed", "error", err, "model", modelKey)
|
// reuse 触发 panel-not-found:清缓存 + 用 full-history 重试一次。
|
||||||
// panel-state-not-found 恢复:新 cascade 没有服务端历史,必须发完整聊天记录。
|
if err != nil && reuseID != "" && isPanelNotFound(err) {
|
||||||
userText = buildCascadeText(messages, modelUID, false, true)
|
slog.Info("windsurf_cascade_reuse_invalidated", "cascade_id", reuseID, "reason", "panel_not_found")
|
||||||
|
if cacheKey != "" {
|
||||||
|
_ = s.cache.DeleteCascadeID(ctx, cacheKey)
|
||||||
|
}
|
||||||
|
userText = buildCascadeText(messages, modelUID)
|
||||||
result, err = client.StreamCascadeChat(ctx, apiKey, modelUID, userText, toolPreamble, "", modelEnumHint, images)
|
result, err = client.StreamCascadeChat(ctx, apiKey, modelUID, userText, toolPreamble, "", modelEnumHint, images)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// 任何错误都失效缓存(保守策略,避免下次复用到坏 cascade)。
|
||||||
|
if canReuse && cacheKey != "" {
|
||||||
|
_ = s.cache.DeleteCascadeID(ctx, cacheKey)
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if result.CascadeID != "" && result.Text != "" {
|
// 成功:写回 cache。
|
||||||
fpAfter := windsurf.FingerprintAfter(messages, modelKey, apiKey)
|
if canReuse && cacheKey != "" && result.CascadeID != "" {
|
||||||
s.pool.Checkin(fpAfter, &windsurf.ConversationEntry{
|
if setErr := s.cache.SetCascadeID(ctx, cacheKey, result.CascadeID, cascadeReuseTTL); setErr != nil {
|
||||||
CascadeID: result.CascadeID,
|
slog.Warn("windsurf_cascade_cache_set_failed", "error", setErr)
|
||||||
APIKey: apiKey,
|
}
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &WindsurfChatResponse{
|
return &WindsurfChatResponse{
|
||||||
@ -229,6 +249,64 @@ func (s *WindsurfChatService) chatCascade(ctx context.Context, client *windsurf.
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// buildCascadeCacheKey 构造 Cascade 复用 cache 的 key。
|
||||||
|
// 任一组件变化(账号、模型、LS 实例、会话、system prompt)都会自动 cache miss。
|
||||||
|
func buildCascadeCacheKey(groupID, accountID int64, modelUID, lsEndpoint, sessionHash, sysHash string) string {
|
||||||
|
h := sha256.New()
|
||||||
|
fmt.Fprintf(h, "%d|%d|%s|%s|%s|%s", groupID, accountID, modelUID, lsEndpoint, sessionHash, sysHash)
|
||||||
|
return hex.EncodeToString(h.Sum(nil))[:24]
|
||||||
|
}
|
||||||
|
|
||||||
|
// sysPromptHash 计算 system 消息内容的指纹。system 变化必须强制开新 cascade,
|
||||||
|
// 否则旧 cascade 内已建立的"角色/约束"语境会污染新对话。
|
||||||
|
func sysPromptHash(messages []windsurf.ChatMessage) string {
|
||||||
|
h := sha256.New()
|
||||||
|
for _, m := range messages {
|
||||||
|
if m.Role == "system" {
|
||||||
|
h.Write([]byte(m.Content))
|
||||||
|
h.Write([]byte{0})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return hex.EncodeToString(h.Sum(nil))[:16]
|
||||||
|
}
|
||||||
|
|
||||||
|
// isPanelNotFound 判定 LS 端"cascade panel state not found"错误。
|
||||||
|
// 与 windsurf.LocalLSClient 内部 panel-state-not-found 检测保持一致。
|
||||||
|
func isPanelNotFound(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
s := strings.ToLower(err.Error())
|
||||||
|
if strings.Contains(s, "panel state not found") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return strings.Contains(s, "not_found") && strings.Contains(s, "panel")
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildCascadeTextForReuse 构造 reuse 模式下的 user turn 文本:
|
||||||
|
// 仅含 system instructions + 最新一条 user 消息。Cascade 内部已通过 trajectory 保留前序历史。
|
||||||
|
// 注意:cacheKey 已包含 sysPromptHash,system 变化会强制 cache miss → 走 buildCascadeText 全量路径。
|
||||||
|
func buildCascadeTextForReuse(messages []windsurf.ChatMessage) string {
|
||||||
|
var sysParts []string
|
||||||
|
var lastUser string
|
||||||
|
for _, m := range messages {
|
||||||
|
if m.Role == "system" {
|
||||||
|
sysParts = append(sysParts, m.Content)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i := len(messages) - 1; i >= 0; i-- {
|
||||||
|
if messages[i].Role == "user" {
|
||||||
|
lastUser = messages[i].Content
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sys := strings.TrimSpace(strings.Join(sysParts, "\n"))
|
||||||
|
if sys != "" {
|
||||||
|
return "<system_instructions>\n" + sys + "\n</system_instructions>\n\n" + lastUser
|
||||||
|
}
|
||||||
|
return lastUser
|
||||||
|
}
|
||||||
|
|
||||||
func (s *WindsurfChatService) chatLegacy(ctx context.Context, client *windsurf.LocalLSClient, apiKey string, meta *windsurf.ModelMeta, messages []windsurf.ChatMessage, modelKey string) (*WindsurfChatResponse, error) {
|
func (s *WindsurfChatService) chatLegacy(ctx context.Context, client *windsurf.LocalLSClient, apiKey string, meta *windsurf.ModelMeta, messages []windsurf.ChatMessage, modelKey string) (*WindsurfChatResponse, error) {
|
||||||
modelEnum := 0
|
modelEnum := 0
|
||||||
modelName := ""
|
modelName := ""
|
||||||
@ -249,19 +327,12 @@ func (s *WindsurfChatService) chatLegacy(ctx context.Context, client *windsurf.L
|
|||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
cascadeMaxHistoryBytes = 200_000
|
cascadeMaxHistoryBytes = 200_000
|
||||||
cascade1MHistoryBytes = 900_000
|
cascade1MHistoryBytes = 900_000
|
||||||
// cascadeSwitchoverHistoryBytes 是切号 / panel-state-not-found 恢复场景下的
|
cascadeMultiTurnPreamble = "The following is a multi-turn conversation. You MUST remember and use all information from prior turns."
|
||||||
// "尽量塞进完整历史" 预算。目标是让新账号拿到尽可能完整的对话上下文。
|
|
||||||
// 3.5MB 留了 500KB 给 proto 其它字段(metadata/config/images),避开 gRPC 4MB 默认上限。
|
|
||||||
cascadeSwitchoverHistoryBytes = 3_500_000
|
|
||||||
cascadeMultiTurnPreamble = "The following is a multi-turn conversation. You MUST remember and use all information from prior turns."
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func cascadeHistoryBudget(modelUID string, switchover bool) int {
|
func cascadeHistoryBudget(modelUID string) int {
|
||||||
if switchover {
|
|
||||||
return cascadeSwitchoverHistoryBytes
|
|
||||||
}
|
|
||||||
if strings.Contains(strings.ToLower(modelUID), "1m") {
|
if strings.Contains(strings.ToLower(modelUID), "1m") {
|
||||||
return cascade1MHistoryBytes
|
return cascade1MHistoryBytes
|
||||||
}
|
}
|
||||||
@ -269,14 +340,9 @@ func cascadeHistoryBudget(modelUID string, switchover bool) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// buildCascadeText constructs the full text payload for SendUserCascadeMessage.
|
// buildCascadeText constructs the full text payload for SendUserCascadeMessage.
|
||||||
// If isResume is true, only the last user message is sent (cascade already has context).
|
// System prompt is wrapped in <system_instructions>, multi-turn history uses
|
||||||
// Otherwise: system prompt wrapped in <system_instructions>, multi-turn history
|
// <human>/<assistant> tags with a budget cap to trim the oldest turns.
|
||||||
// with <human>/<assistant> tags, and a budget cap to trim old turns.
|
func buildCascadeText(messages []windsurf.ChatMessage, modelUID string) string {
|
||||||
//
|
|
||||||
// switchover=true 提升历史预算到 cascadeSwitchoverHistoryBytes(~3.5MB),
|
|
||||||
// 用于切号 / panel-state-not-found 恢复场景——新账号/新 cascade 没有服务端历史,
|
|
||||||
// 必须把完整聊天记录塞进文本里。isResume=true 时该参数被忽略(resume 只发最后一条)。
|
|
||||||
func buildCascadeText(messages []windsurf.ChatMessage, modelUID string, isResume, switchover bool) string {
|
|
||||||
var systemParts []string
|
var systemParts []string
|
||||||
var convo []windsurf.ChatMessage
|
var convo []windsurf.ChatMessage
|
||||||
|
|
||||||
@ -292,11 +358,6 @@ func buildCascadeText(messages []windsurf.ChatMessage, modelUID string, isResume
|
|||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resume: cascade already has context, only send last user message
|
|
||||||
if isResume {
|
|
||||||
return convo[len(convo)-1].Content
|
|
||||||
}
|
|
||||||
|
|
||||||
sysText := strings.TrimSpace(strings.Join(systemParts, "\n"))
|
sysText := strings.TrimSpace(strings.Join(systemParts, "\n"))
|
||||||
if sysText != "" {
|
if sysText != "" {
|
||||||
sysText = "<system_instructions>\n" + sysText + "\n</system_instructions>"
|
sysText = "<system_instructions>\n" + sysText + "\n</system_instructions>"
|
||||||
@ -312,10 +373,9 @@ func buildCascadeText(messages []windsurf.ChatMessage, modelUID string, isResume
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Multi-turn: build history with budget trimming
|
// Multi-turn: build history with budget trimming
|
||||||
maxBytes := cascadeHistoryBudget(modelUID, switchover)
|
maxBytes := cascadeHistoryBudget(modelUID)
|
||||||
historyBytes := len(sysText)
|
historyBytes := len(sysText)
|
||||||
|
|
||||||
// Walk backward from second-to-last, collecting turns that fit
|
|
||||||
var lines []string
|
var lines []string
|
||||||
droppedTurns := 0
|
droppedTurns := 0
|
||||||
for i := len(convo) - 2; i >= 0; i-- {
|
for i := len(convo) - 2; i >= 0; i-- {
|
||||||
@ -332,20 +392,12 @@ func buildCascadeText(messages []windsurf.ChatMessage, modelUID string, isResume
|
|||||||
"total_turns", len(convo),
|
"total_turns", len(convo),
|
||||||
"kept_kb", historyBytes/1024,
|
"kept_kb", historyBytes/1024,
|
||||||
"dropped_turns", droppedTurns,
|
"dropped_turns", droppedTurns,
|
||||||
"switchover", switchover,
|
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
lines = append([]string{line}, lines...)
|
lines = append([]string{line}, lines...)
|
||||||
historyBytes += len(line)
|
historyBytes += len(line)
|
||||||
}
|
}
|
||||||
if switchover && droppedTurns == 0 {
|
|
||||||
slog.Info("windsurf_cascade_switchover_history",
|
|
||||||
"total_turns", len(convo),
|
|
||||||
"kept_kb", historyBytes/1024,
|
|
||||||
"dropped_turns", 0,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
latest := convo[len(convo)-1]
|
latest := convo[len(convo)-1]
|
||||||
text := cascadeMultiTurnPreamble + "\n\n" +
|
text := cascadeMultiTurnPreamble + "\n\n" +
|
||||||
|
|||||||
@ -1,21 +1,17 @@
|
|||||||
package service
|
package service
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/Wei-Shaw/sub2api/internal/pkg/windsurf"
|
"github.com/Wei-Shaw/sub2api/internal/pkg/windsurf"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Test that the switchover flag expands the history budget to ~3.5MB and preserves
|
// buildCascadeText always sends full history regardless of account switches.
|
||||||
// all turns for a large multi-turn conversation that would otherwise be trimmed
|
func TestBuildCascadeText_AlwaysFullHistory(t *testing.T) {
|
||||||
// under the normal 200KB budget. This guards the core fix: after a Windsurf
|
|
||||||
// account switch, the new account must receive the full chat history.
|
|
||||||
func TestBuildCascadeText_SwitchoverKeepsFullHistory(t *testing.T) {
|
|
||||||
// Build a ~1.5MB multi-turn history: 30 turns of ~50KB each (alternating
|
|
||||||
// user/assistant). Exceeds the normal 200KB cap; well within the 3.5MB cap.
|
|
||||||
const perTurnBytes = 50 * 1024
|
const perTurnBytes = 50 * 1024
|
||||||
const turns = 30
|
const turns = 3 // keep small so it fits in the 200KB budget
|
||||||
bulk := strings.Repeat("x", perTurnBytes)
|
bulk := strings.Repeat("x", perTurnBytes)
|
||||||
|
|
||||||
var messages []windsurf.ChatMessage
|
var messages []windsurf.ChatMessage
|
||||||
@ -27,47 +23,29 @@ func TestBuildCascadeText_SwitchoverKeepsFullHistory(t *testing.T) {
|
|||||||
}
|
}
|
||||||
messages = append(messages, windsurf.ChatMessage{Role: role, Content: bulk})
|
messages = append(messages, windsurf.ChatMessage{Role: role, Content: bulk})
|
||||||
}
|
}
|
||||||
// Latest user message (the one actually being answered).
|
|
||||||
messages = append(messages, windsurf.ChatMessage{Role: "user", Content: "final question"})
|
messages = append(messages, windsurf.ChatMessage{Role: "user", Content: "final question"})
|
||||||
|
|
||||||
normalText := buildCascadeText(messages, "claude-sonnet-4", false, false)
|
text := buildCascadeText(messages, "claude-sonnet-4")
|
||||||
switchoverText := buildCascadeText(messages, "claude-sonnet-4", false, true)
|
|
||||||
|
|
||||||
if len(normalText) >= len(switchoverText) {
|
if !strings.Contains(text, "final question") {
|
||||||
t.Fatalf("switchover text (%d bytes) must be larger than normal (%d bytes)",
|
t.Fatal("text must include the final user message")
|
||||||
len(switchoverText), len(normalText))
|
|
||||||
}
|
}
|
||||||
if len(normalText) > cascadeMaxHistoryBytes+perTurnBytes {
|
if !strings.Contains(text, "sys") {
|
||||||
t.Fatalf("normal text (%d bytes) must fit near %d budget", len(normalText), cascadeMaxHistoryBytes)
|
t.Fatal("text must include the system prompt")
|
||||||
}
|
|
||||||
if len(switchoverText) < perTurnBytes*turns {
|
|
||||||
t.Fatalf("switchover text (%d bytes) dropped turns; expected >= %d (all %d turns kept)",
|
|
||||||
len(switchoverText), perTurnBytes*turns, turns)
|
|
||||||
}
|
|
||||||
if len(switchoverText) > cascadeSwitchoverHistoryBytes+perTurnBytes {
|
|
||||||
t.Fatalf("switchover text (%d bytes) exceeded budget %d", len(switchoverText), cascadeSwitchoverHistoryBytes)
|
|
||||||
}
|
|
||||||
// Final user message must always be preserved (it's the question being asked).
|
|
||||||
if !strings.Contains(switchoverText, "final question") {
|
|
||||||
t.Fatal("switchover text must include the final user message")
|
|
||||||
}
|
|
||||||
if !strings.Contains(normalText, "final question") {
|
|
||||||
t.Fatal("normal text must include the final user message")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resume mode ignores switchover — only the last user message is sent because
|
func TestBuildCascadeText_SingleTurn(t *testing.T) {
|
||||||
// Cascade server already has the history for the reused cascade_id.
|
|
||||||
func TestBuildCascadeText_ResumeIgnoresSwitchover(t *testing.T) {
|
|
||||||
messages := []windsurf.ChatMessage{
|
messages := []windsurf.ChatMessage{
|
||||||
{Role: "user", Content: "first"},
|
{Role: "system", Content: "be helpful"},
|
||||||
{Role: "assistant", Content: "reply"},
|
{Role: "user", Content: "hello"},
|
||||||
{Role: "user", Content: "second question"},
|
|
||||||
}
|
}
|
||||||
|
got := buildCascadeText(messages, "claude-sonnet-4")
|
||||||
got := buildCascadeText(messages, "claude-sonnet-4", true, true)
|
if !strings.Contains(got, "hello") {
|
||||||
if got != "second question" {
|
t.Fatal("single turn text must contain user message")
|
||||||
t.Fatalf("resume=true must return only last user message, got %q", got)
|
}
|
||||||
|
if !strings.Contains(got, "be helpful") {
|
||||||
|
t.Fatal("single turn text must contain system prompt")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -144,23 +122,140 @@ func TestInjectModelIdentity(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCascadeHistoryBudget(t *testing.T) {
|
// reuse 路径只送 system + 最后一条 user,不携带历史
|
||||||
tests := []struct {
|
func TestBuildCascadeTextForReuse_SystemAndLastUser(t *testing.T) {
|
||||||
name string
|
messages := []windsurf.ChatMessage{
|
||||||
modelUID string
|
{Role: "system", Content: "be helpful"},
|
||||||
switchover bool
|
{Role: "user", Content: "first turn"},
|
||||||
want int
|
{Role: "assistant", Content: "first response"},
|
||||||
}{
|
{Role: "user", Content: "second turn"},
|
||||||
{"normal model normal budget", "claude-sonnet-4", false, cascadeMaxHistoryBytes},
|
|
||||||
{"1m model normal budget", "claude-sonnet-4-1m", false, cascade1MHistoryBytes},
|
|
||||||
{"normal model switchover", "claude-sonnet-4", true, cascadeSwitchoverHistoryBytes},
|
|
||||||
{"1m model switchover", "claude-sonnet-4-1m", true, cascadeSwitchoverHistoryBytes},
|
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
got := buildCascadeTextForReuse(messages)
|
||||||
|
|
||||||
|
if !strings.Contains(got, "second turn") {
|
||||||
|
t.Fatal("reuse text must contain the latest user message")
|
||||||
|
}
|
||||||
|
if !strings.Contains(got, "be helpful") {
|
||||||
|
t.Fatal("reuse text must contain the system prompt")
|
||||||
|
}
|
||||||
|
if strings.Contains(got, "first turn") || strings.Contains(got, "first response") {
|
||||||
|
t.Fatalf("reuse text must NOT carry prior history (Cascade trajectory has it). got=%q", got)
|
||||||
|
}
|
||||||
|
if !strings.Contains(got, "<system_instructions>") {
|
||||||
|
t.Fatalf("reuse text must wrap system in <system_instructions> tag. got=%q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildCascadeTextForReuse_NoSystem(t *testing.T) {
|
||||||
|
messages := []windsurf.ChatMessage{
|
||||||
|
{Role: "user", Content: "hello"},
|
||||||
|
}
|
||||||
|
got := buildCascadeTextForReuse(messages)
|
||||||
|
if got != "hello" {
|
||||||
|
t.Fatalf("expected raw user message when no system; got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildCascadeCacheKey_Stable(t *testing.T) {
|
||||||
|
a := buildCascadeCacheKey(1, 2, "claude-sonnet", "http://localhost:42100", "sess-x", "syshash-x")
|
||||||
|
b := buildCascadeCacheKey(1, 2, "claude-sonnet", "http://localhost:42100", "sess-x", "syshash-x")
|
||||||
|
if a != b {
|
||||||
|
t.Fatalf("same inputs must yield same key; %q vs %q", a, b)
|
||||||
|
}
|
||||||
|
if len(a) != 24 {
|
||||||
|
t.Fatalf("cache key length expected 24, got %d (%q)", len(a), a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cacheKey 任一组件变化都必须产生不同的 key(避免错误复用)
|
||||||
|
func TestBuildCascadeCacheKey_DifferentInputsDiffer(t *testing.T) {
|
||||||
|
base := buildCascadeCacheKey(1, 2, "model-a", "ep1", "sess1", "sys1")
|
||||||
|
cases := map[string]string{
|
||||||
|
"groupID": buildCascadeCacheKey(99, 2, "model-a", "ep1", "sess1", "sys1"),
|
||||||
|
"accountID": buildCascadeCacheKey(1, 99, "model-a", "ep1", "sess1", "sys1"),
|
||||||
|
"modelUID": buildCascadeCacheKey(1, 2, "model-b", "ep1", "sess1", "sys1"),
|
||||||
|
"lsEndpoint": buildCascadeCacheKey(1, 2, "model-a", "ep2", "sess1", "sys1"),
|
||||||
|
"sessionHash": buildCascadeCacheKey(1, 2, "model-a", "ep1", "sess2", "sys1"),
|
||||||
|
"sysHash": buildCascadeCacheKey(1, 2, "model-a", "ep1", "sess1", "sys2"),
|
||||||
|
}
|
||||||
|
for name, k := range cases {
|
||||||
|
if k == base {
|
||||||
|
t.Fatalf("changing %s must produce a different cache key", name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSysPromptHash_DetectsSystemChange(t *testing.T) {
|
||||||
|
a := sysPromptHash([]windsurf.ChatMessage{
|
||||||
|
{Role: "system", Content: "be helpful"},
|
||||||
|
{Role: "user", Content: "hi"},
|
||||||
|
})
|
||||||
|
b := sysPromptHash([]windsurf.ChatMessage{
|
||||||
|
{Role: "system", Content: "be helpful"},
|
||||||
|
{Role: "user", Content: "different user msg — should NOT affect sys hash"},
|
||||||
|
})
|
||||||
|
c := sysPromptHash([]windsurf.ChatMessage{
|
||||||
|
{Role: "system", Content: "be VERY helpful"},
|
||||||
|
{Role: "user", Content: "hi"},
|
||||||
|
})
|
||||||
|
if a != b {
|
||||||
|
t.Fatalf("sysPromptHash must ignore non-system content; %q vs %q", a, b)
|
||||||
|
}
|
||||||
|
if a == c {
|
||||||
|
t.Fatalf("sysPromptHash must reflect system content changes; both are %q", a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 多条 system 拼接顺序敏感(合并成 multipart 时不应让两段交换后产生相同 hash)
|
||||||
|
func TestSysPromptHash_OrderSensitive(t *testing.T) {
|
||||||
|
a := sysPromptHash([]windsurf.ChatMessage{
|
||||||
|
{Role: "system", Content: "alpha"},
|
||||||
|
{Role: "system", Content: "beta"},
|
||||||
|
})
|
||||||
|
b := sysPromptHash([]windsurf.ChatMessage{
|
||||||
|
{Role: "system", Content: "beta"},
|
||||||
|
{Role: "system", Content: "alpha"},
|
||||||
|
})
|
||||||
|
if a == b {
|
||||||
|
t.Fatalf("sysPromptHash must be order-sensitive across multiple system messages")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIsPanelNotFound(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
err error
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
{"nil", nil, false},
|
||||||
|
{"unrelated error", errors.New("network unreachable"), false},
|
||||||
|
{"exact phrase", errors.New("Cascade panel state not found"), true},
|
||||||
|
{"lower case variant", errors.New("panel state not found: id=abc"), true},
|
||||||
|
{"not_found code", errors.New("rpc error: code=not_found, panel missing"), true},
|
||||||
|
{"missing one keyword", errors.New("not_found"), false},
|
||||||
|
}
|
||||||
|
for _, tt := range cases {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
if got := cascadeHistoryBudget(tt.modelUID, tt.switchover); got != tt.want {
|
if got := isPanelNotFound(tt.err); got != tt.want {
|
||||||
t.Errorf("cascadeHistoryBudget(%q, %v) = %d, want %d",
|
t.Fatalf("isPanelNotFound(%v) = %v, want %v", tt.err, got, tt.want)
|
||||||
tt.modelUID, tt.switchover, got, tt.want)
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCascadeHistoryBudget(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
modelUID string
|
||||||
|
want int
|
||||||
|
}{
|
||||||
|
{"normal model", "claude-sonnet-4", cascadeMaxHistoryBytes},
|
||||||
|
{"1m model", "claude-sonnet-4-1m", cascade1MHistoryBytes},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
if got := cascadeHistoryBudget(tt.modelUID); got != tt.want {
|
||||||
|
t.Errorf("cascadeHistoryBudget(%q) = %d, want %d", tt.modelUID, got, tt.want)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -36,7 +36,10 @@ func NewWindsurfGatewayService(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WindsurfGatewayService) Forward(ctx context.Context, c *gin.Context, account *Account, body []byte, _ bool) (*ForwardResult, error) {
|
// Forward 处理 Windsurf 平台的 Anthropic-兼容请求。
|
||||||
|
// groupID 与 sessionHash 用于 Cascade 多轮复用:在同一 sticky session 上复用上游 LS cascade,
|
||||||
|
// 跳过 StartCascade 的额外 RPC,并避免每轮把 full-history 重灌进 trajectory。空值表示不复用。
|
||||||
|
func (s *WindsurfGatewayService) Forward(ctx context.Context, c *gin.Context, account *Account, body []byte, _ bool, groupID int64, sessionHash string) (*ForwardResult, error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
reqLog := windsurfLogger(c, "windsurf_gateway.forward",
|
reqLog := windsurfLogger(c, "windsurf_gateway.forward",
|
||||||
zap.Int64("account_id", account.ID),
|
zap.Int64("account_id", account.ID),
|
||||||
@ -228,6 +231,8 @@ func (s *WindsurfGatewayService) Forward(ctx context.Context, c *gin.Context, ac
|
|||||||
|
|
||||||
chatReq := &WindsurfChatRequest{
|
chatReq := &WindsurfChatRequest{
|
||||||
AccountID: account.ID,
|
AccountID: account.ID,
|
||||||
|
GroupID: groupID,
|
||||||
|
SessionHash: sessionHash,
|
||||||
Model: req.Model,
|
Model: req.Model,
|
||||||
Messages: chatMessages,
|
Messages: chatMessages,
|
||||||
Stream: req.Stream,
|
Stream: req.Stream,
|
||||||
|
|||||||
@ -562,11 +562,11 @@ func ProvideWindsurfTokenProvider(cfg *config.Config, accountRepo AccountReposit
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ProvideWindsurfChatService creates WindsurfChatService (nil when disabled).
|
// ProvideWindsurfChatService creates WindsurfChatService (nil when disabled).
|
||||||
func ProvideWindsurfChatService(cfg *config.Config, lsService *WindsurfLSService, tokenProvider *WindsurfTokenProvider) *WindsurfChatService {
|
func ProvideWindsurfChatService(cfg *config.Config, lsService *WindsurfLSService, tokenProvider *WindsurfTokenProvider, cache GatewayCache) *WindsurfChatService {
|
||||||
if !cfg.Windsurf.Enabled || lsService == nil || tokenProvider == nil {
|
if !cfg.Windsurf.Enabled || lsService == nil || tokenProvider == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return NewWindsurfChatService(cfg.Windsurf, lsService, tokenProvider)
|
return NewWindsurfChatService(cfg.Windsurf, lsService, tokenProvider, cache)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProvideWindsurfGatewayService creates WindsurfGatewayService (nil when disabled).
|
// ProvideWindsurfGatewayService creates WindsurfGatewayService (nil when disabled).
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user