package service import ( "context" "encoding/json" "errors" "fmt" "net/http" "strings" "time" "github.com/Wei-Shaw/sub2api/internal/config" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" "github.com/Wei-Shaw/sub2api/internal/pkg/windsurf" "github.com/gin-gonic/gin" "go.uber.org/zap" ) type WindsurfGatewayService struct { chatService *WindsurfChatService cfg config.WindsurfConfig accountRepo AccountRepository } func NewWindsurfGatewayService( chatService *WindsurfChatService, cfg config.WindsurfConfig, accountRepo AccountRepository, ) *WindsurfGatewayService { return &WindsurfGatewayService{ chatService: chatService, cfg: cfg, accountRepo: accountRepo, } } func (s *WindsurfGatewayService) Forward(ctx context.Context, c *gin.Context, account *Account, body []byte, _ bool) (*ForwardResult, error) { startTime := time.Now() reqLog := windsurfLogger(c, "windsurf_gateway.forward", zap.Int64("account_id", account.ID), ) var req windsurfMessagesRequest if err := json.Unmarshal(body, &req); err != nil { s.writeClaudeError(c, http.StatusBadRequest, "invalid_request_error", "Invalid request body") return nil, fmt.Errorf("unmarshal request: %w", err) } normalizeWindsurfRequest(&req) if strings.TrimSpace(req.Model) == "" { s.writeClaudeError(c, http.StatusBadRequest, "invalid_request_error", "Missing model") return nil, fmt.Errorf("missing model") } reqLog = reqLog.With(zap.String("model", req.Model), zap.Bool("stream", req.Stream), zap.Int("tools_count", len(req.Tools))) // Convert Anthropic tools to OpenAI format var openAITools []windsurf.OpenAITool for _, t := range req.Tools { openAITools = append(openAITools, windsurf.OpenAITool{ Type: "function", Function: windsurf.OpenAIFunction{ Name: t.Name, Description: t.Description, Parameters: t.InputSchema, }, }) } hasTools := len(openAITools) > 0 // Convert Anthropic messages to intermediate form var anthropicMsgs []windsurf.AnthropicMessage hasToolHistory := false if len(req.System) > 0 { anthropicMsgs = append(anthropicMsgs, windsurf.AnthropicMessage{ Role: "system", Content: req.System, }) } for _, m := range req.Messages { contentBlocks := windsurfParseContentBlocks(m.Content) var toolResultMsgs []windsurf.AnthropicMessage var toolUseMsgs []windsurf.OpenAIToolCall var textParts []string for _, block := range contentBlocks { switch block.Type { case "tool_result": hasToolHistory = true resultContent := "" if block.Content != nil { resultContent = windsurfExtractContentTextFromRaw(block.Content) } contentJSON, _ := json.Marshal(resultContent) toolResultMsgs = append(toolResultMsgs, windsurf.AnthropicMessage{ Role: "tool", Content: contentJSON, ToolCallID: block.ToolUseID, }) case "tool_use": hasToolHistory = true inputJSON, _ := json.Marshal(block.Input) toolUseMsgs = append(toolUseMsgs, windsurf.OpenAIToolCall{ ID: block.ID, Type: "function", Function: windsurf.OpenAIToolCallFunc{ Name: block.Name, Arguments: string(inputJSON), }, }) case "text": textParts = append(textParts, block.Text) case "thinking": // skip default: if block.Text != "" { textParts = append(textParts, block.Text) } } } if len(toolUseMsgs) > 0 { contentJSON, _ := json.Marshal(strings.Join(textParts, "\n")) anthropicMsgs = append(anthropicMsgs, windsurf.AnthropicMessage{ Role: m.Role, Content: contentJSON, ToolCalls: toolUseMsgs, }) } else if len(toolResultMsgs) > 0 { for _, tr := range toolResultMsgs { anthropicMsgs = append(anthropicMsgs, tr) } } else { text := windsurfExtractContentText(m.Content) contentJSON, _ := json.Marshal(text) anthropicMsgs = append(anthropicMsgs, windsurf.AnthropicMessage{ Role: m.Role, Content: contentJSON, }) } } emulateTools := hasTools || hasToolHistory var chatMessages []windsurf.ChatMessage var toolPreamble string if emulateTools { toolPreamble = windsurf.BuildToolPreambleForProto(openAITools, req.ToolChoice) chatMessages = windsurf.NormalizeMessagesForCascade(anthropicMsgs, []windsurf.OpenAITool{}) reqLog.Info("windsurf_gateway.tool_emulation", zap.Int("tools_count", len(openAITools)), zap.Int("preamble_len", len(toolPreamble)), zap.Int("messages_count", len(chatMessages)), zap.Bool("has_tool_history", hasToolHistory), ) } else { for _, m := range anthropicMsgs { text := windsurfExtractContentText(json.RawMessage(m.Content)) chatMessages = append(chatMessages, windsurf.ChatMessage{ Role: m.Role, Content: text, }) } } chatReq := &WindsurfChatRequest{ AccountID: account.ID, Model: req.Model, Messages: chatMessages, Stream: req.Stream, Tools: openAITools, ToolPreamble: toolPreamble, } upstreamStart := time.Now() resp, err := s.chatService.Chat(ctx, chatReq) SetOpsLatencyMs(c, OpsUpstreamLatencyMsKey, time.Since(upstreamStart).Milliseconds()) if err != nil { reqLog.Error("windsurf_gateway.chat_failed", zap.Error(err)) appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ Platform: PlatformWindsurf, AccountID: account.ID, AccountName: account.Name, Kind: "http_error", Message: err.Error(), }) // CascadeModelError → set model rate limit + trigger account failover var modelErr *windsurf.CascadeModelError if errors.As(err, &modelErr) { modelKey := windsurf.ResolveModel(req.Model) cooldown := 5 * time.Minute if strings.Contains(modelErr.Msg, "stall") { cooldown = 60 * time.Second } resetAt := time.Now().Add(cooldown) if s.accountRepo != nil { if rlErr := s.accountRepo.SetModelRateLimit(ctx, account.ID, modelKey, resetAt); rlErr != nil { reqLog.Error("windsurf_gateway.set_model_rate_limit_failed", zap.Error(rlErr)) } else { reqLog.Info("windsurf_gateway.model_rate_limited", zap.String("model_key", modelKey), zap.Duration("cooldown", cooldown), ) } } setOpsUpstreamError(c, 502, modelErr.Msg, "") return nil, &UpstreamFailoverError{ StatusCode: 502, ResponseBody: []byte(modelErr.Msg), } } setOpsUpstreamError(c, http.StatusBadGateway, "Upstream LS request failed", err.Error()) s.writeClaudeError(c, http.StatusBadGateway, "api_error", "Upstream LS request failed") return nil, fmt.Errorf("chat: %w", err) } durationMs := time.Since(startTime).Milliseconds() if !resp.FirstTextAt.IsZero() { SetOpsLatencyMs(c, OpsTimeToFirstTokenMsKey, resp.FirstTextAt.Sub(startTime).Milliseconds()) } msgID := fmt.Sprintf("msg_ws_%d", time.Now().UnixNano()) // Prefer native structured tool calls from trajectory steps; // fallback to text-based parsing when none found. var parsed windsurf.FeedResult if len(resp.ToolCalls) > 0 { parsed.Text = resp.Text for _, tc := range resp.ToolCalls { parsed.ToolCalls = append(parsed.ToolCalls, windsurf.ToolCall{ ID: tc.ID, Name: tc.Name, ArgumentsJSON: tc.ArgumentsJSON, }) } reqLog.Info("windsurf_gateway.native_tool_calls", zap.Int("count", len(resp.ToolCalls)), ) } else { parsed = windsurf.ParseToolCallsFromText(resp.Text) } // Prefer server-reported usage; fallback to chars/4 estimate var inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int if resp.Usage != nil && (resp.Usage.InputTokens > 0 || resp.Usage.OutputTokens > 0) { inputTokens = resp.Usage.InputTokens outputTokens = resp.Usage.OutputTokens cacheReadTokens = resp.Usage.CacheReadTokens cacheWriteTokens = resp.Usage.CacheWriteTokens } else { inputTokens = windsurf.EstimateInputTokensFromMessages(chatMessages) outputTokens = windsurf.EstimateTokens(len(parsed.Text) + len(resp.Thinking)) } reqLog.Info("windsurf_gateway.completed", zap.Int64("duration_ms", durationMs), zap.String("upstream_model", resp.Model), zap.Int("text_len", len(parsed.Text)), zap.Int("thinking_len", len(resp.Thinking)), zap.Int("tool_calls_count", len(parsed.ToolCalls)), zap.Bool("native_tools", len(resp.ToolCalls) > 0), zap.Int("input_tokens", inputTokens), zap.Int("output_tokens", outputTokens), ) if req.Stream { s.streamAnthropicResponse(c, msgID, resp, parsed, inputTokens, outputTokens) } else { s.writeAnthropicResponse(c, msgID, resp, parsed, inputTokens, outputTokens) } upstreamModel := resp.Model if upstreamModel == req.Model { upstreamModel = "" } var firstTokenMs *int if !resp.FirstTextAt.IsZero() { ms := int(resp.FirstTextAt.Sub(startTime).Milliseconds()) firstTokenMs = &ms } return &ForwardResult{ RequestID: msgID, Usage: ClaudeUsage{ InputTokens: inputTokens, OutputTokens: outputTokens, CacheReadInputTokens: cacheReadTokens, CacheCreationInputTokens: cacheWriteTokens, }, Model: req.Model, UpstreamModel: upstreamModel, Stream: req.Stream, Duration: time.Since(startTime), FirstTokenMs: firstTokenMs, }, nil } func (s *WindsurfGatewayService) writeClaudeError(c *gin.Context, status int, errType, message string) { c.JSON(status, gin.H{ "type": "error", "error": gin.H{"type": errType, "message": message}, }) } func (s *WindsurfGatewayService) writeAnthropicResponse(c *gin.Context, id string, resp *WindsurfChatResponse, parsed windsurf.FeedResult, inputTokens, outputTokens int) { var content []gin.H if resp.Thinking != "" { content = append(content, gin.H{"type": "thinking", "thinking": resp.Thinking}) } if parsed.Text != "" { content = append(content, gin.H{"type": "text", "text": parsed.Text}) } for _, tc := range parsed.ToolCalls { var input interface{} if err := json.Unmarshal([]byte(tc.ArgumentsJSON), &input); err != nil { input = map[string]interface{}{} } content = append(content, gin.H{ "type": "tool_use", "id": tc.ID, "name": tc.Name, "input": input, }) } if len(content) == 0 { content = append(content, gin.H{"type": "text", "text": ""}) } stopReason := "end_turn" if len(parsed.ToolCalls) > 0 { stopReason = "tool_use" } c.JSON(http.StatusOK, gin.H{ "id": id, "type": "message", "role": "assistant", "model": resp.Model, "content": content, "stop_reason": stopReason, "stop_sequence": nil, "usage": gin.H{ "input_tokens": inputTokens, "output_tokens": outputTokens, }, }) } func (s *WindsurfGatewayService) streamAnthropicResponse(c *gin.Context, id string, resp *WindsurfChatResponse, parsed windsurf.FeedResult, inputTokens, outputTokens int) { c.Header("Content-Type", "text/event-stream") c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") // 与 antigravity/gateway 保持一致,显式禁用 nginx/反代缓冲,防止 SSE 在代理侧被攒齐再转发 // 导致 Claude Code 等客户端长时间收不到任何帧而超时断开。 c.Header("X-Accel-Buffering", "no") writeSSE := func(event string, data any) { b, _ := json.Marshal(data) fmt.Fprintf(c.Writer, "event: %s\ndata: %s\n\n", event, b) c.Writer.Flush() } stopReason := "end_turn" if len(parsed.ToolCalls) > 0 { stopReason = "tool_use" } writeSSE("message_start", gin.H{ "type": "message_start", "message": gin.H{ "id": id, "type": "message", "role": "assistant", "model": resp.Model, "content": []any{}, "usage": gin.H{ "input_tokens": inputTokens, "output_tokens": outputTokens, }, }, }) blockIndex := 0 // Thinking block (reasoning_content) if resp.Thinking != "" { writeSSE("content_block_start", gin.H{ "type": "content_block_start", "index": blockIndex, "content_block": gin.H{"type": "thinking", "thinking": ""}, }) writeSSE("content_block_delta", gin.H{ "type": "content_block_delta", "index": blockIndex, "delta": gin.H{"type": "thinking_delta", "thinking": resp.Thinking}, }) writeSSE("content_block_stop", gin.H{ "type": "content_block_stop", "index": blockIndex, }) blockIndex++ } if parsed.Text != "" { writeSSE("content_block_start", gin.H{ "type": "content_block_start", "index": blockIndex, "content_block": gin.H{"type": "text", "text": ""}, }) writeSSE("content_block_delta", gin.H{ "type": "content_block_delta", "index": blockIndex, "delta": gin.H{"type": "text_delta", "text": parsed.Text}, }) writeSSE("content_block_stop", gin.H{ "type": "content_block_stop", "index": blockIndex, }) blockIndex++ } for _, tc := range parsed.ToolCalls { var input interface{} if err := json.Unmarshal([]byte(tc.ArgumentsJSON), &input); err != nil { input = map[string]interface{}{} } writeSSE("content_block_start", gin.H{ "type": "content_block_start", "index": blockIndex, "content_block": gin.H{ "type": "tool_use", "id": tc.ID, "name": tc.Name, "input": map[string]interface{}{}, }, }) writeSSE("content_block_delta", gin.H{ "type": "content_block_delta", "index": blockIndex, "delta": gin.H{"type": "input_json_delta", "partial_json": tc.ArgumentsJSON}, }) writeSSE("content_block_stop", gin.H{ "type": "content_block_stop", "index": blockIndex, }) blockIndex++ } if blockIndex == 0 { writeSSE("content_block_start", gin.H{ "type": "content_block_start", "index": 0, "content_block": gin.H{"type": "text", "text": ""}, }) writeSSE("content_block_stop", gin.H{ "type": "content_block_stop", "index": 0, }) } writeSSE("message_delta", gin.H{ "type": "message_delta", "delta": gin.H{"stop_reason": stopReason, "stop_sequence": nil}, "usage": gin.H{"output_tokens": outputTokens}, }) writeSSE("message_stop", gin.H{ "type": "message_stop", }) } // ---- Request types ---- type windsurfMessagesRequest struct { Model string `json:"model"` Stream bool `json:"stream"` System json.RawMessage `json:"system"` Messages []windsurfRequestMessage `json:"messages"` Tools []windsurfRequestTool `json:"tools,omitempty"` ToolChoice interface{} `json:"tool_choice,omitempty"` MaxTokens int `json:"max_tokens"` } type windsurfRequestMessage struct { Role string `json:"role"` Content json.RawMessage `json:"content"` ToolCallID string `json:"tool_call_id,omitempty"` } type windsurfRequestTool struct { Name string `json:"name"` Description string `json:"description"` InputSchema json.RawMessage `json:"input_schema"` } // ---- Helper functions (prefixed to avoid collision with windsurf_gateway_handler.go) ---- type windsurfContentBlock struct { Type string `json:"type"` Text string `json:"text,omitempty"` ID string `json:"id,omitempty"` Name string `json:"name,omitempty"` Input interface{} `json:"input,omitempty"` ToolUseID string `json:"tool_use_id,omitempty"` Content json.RawMessage `json:"content,omitempty"` } func windsurfParseContentBlocks(raw json.RawMessage) []windsurfContentBlock { if len(raw) == 0 { return nil } var s string if json.Unmarshal(raw, &s) == nil { return []windsurfContentBlock{{Type: "text", Text: s}} } var blocks []windsurfContentBlock if json.Unmarshal(raw, &blocks) == nil { return blocks } return []windsurfContentBlock{{Type: "text", Text: string(raw)}} } func normalizeWindsurfRequest(req *windsurfMessagesRequest) { if req == nil { return } req.Tools = normalizeWindsurfRequestTools(req.Tools) req.ToolChoice = normalizeWindsurfToolChoice(req.ToolChoice) for i := range req.Messages { req.Messages[i].Content = normalizeWindsurfMessageContent(req.Messages[i].Content) } } func normalizeWindsurfRequestTools(tools []windsurfRequestTool) []windsurfRequestTool { if len(tools) == 0 { return nil } out := make([]windsurfRequestTool, 0, len(tools)) seen := make(map[string]int, len(tools)) for _, tool := range tools { tool.Name = windsurf.NormalizeToolName(tool.Name) key := strings.ToLower(strings.TrimSpace(tool.Name)) if key == "" { continue } if idx, ok := seen[key]; ok { if out[idx].Description == "" { out[idx].Description = tool.Description } if len(out[idx].InputSchema) == 0 { out[idx].InputSchema = tool.InputSchema } continue } seen[key] = len(out) out = append(out, tool) } return out } func normalizeWindsurfToolChoice(toolChoice interface{}) interface{} { switch tc := toolChoice.(type) { case map[string]interface{}: normalized := make(map[string]interface{}, len(tc)) for key, value := range tc { normalized[key] = value } if name, ok := normalized["name"].(string); ok { normalized["name"] = windsurf.NormalizeToolName(name) } if fn, ok := normalized["function"].(map[string]interface{}); ok { nextFn := make(map[string]interface{}, len(fn)) for key, value := range fn { nextFn[key] = value } if name, ok := nextFn["name"].(string); ok { nextFn["name"] = windsurf.NormalizeToolName(name) } normalized["function"] = nextFn } return normalized default: return toolChoice } } func normalizeWindsurfMessageContent(raw json.RawMessage) json.RawMessage { if len(raw) == 0 { return raw } var text string if json.Unmarshal(raw, &text) == nil { return raw } var blocks []windsurfContentBlock if json.Unmarshal(raw, &blocks) != nil { return raw } changed := false for i := range blocks { if blocks[i].Type == "tool_use" { normalized := windsurf.NormalizeToolName(blocks[i].Name) if normalized != blocks[i].Name { blocks[i].Name = normalized changed = true } } } if !changed { return raw } updated, err := json.Marshal(blocks) if err != nil { return raw } return updated } func windsurfExtractContentText(raw json.RawMessage) string { var s string if json.Unmarshal(raw, &s) == nil { return s } var blocks []struct { Type string `json:"type"` Text string `json:"text"` } if json.Unmarshal(raw, &blocks) == nil { var out string for _, b := range blocks { if b.Type == "text" { out += b.Text } } return out } return string(raw) } func windsurfExtractContentTextFromRaw(raw json.RawMessage) string { if len(raw) == 0 { return "" } var s string if json.Unmarshal(raw, &s) == nil { return s } var blocks []struct { Type string `json:"type"` Text string `json:"text"` } if json.Unmarshal(raw, &blocks) == nil { textOnly := len(blocks) > 0 var parts []string for _, b := range blocks { if b.Type != "text" { textOnly = false break } parts = append(parts, b.Text) } if textOnly { return strings.Join(parts, "\n") } } return string(raw) } func windsurfLogger(c *gin.Context, component string, fields ...zap.Field) *zap.Logger { l := logger.L().With(zap.String("component", component)) if c != nil { if reqID := c.GetHeader("X-Request-ID"); reqID != "" { l = l.With(zap.String("request_id", reqID)) } } return l.With(fields...) }