From 0a521f09fbc481ccdd2a0d6bccfaa8f80df220fc Mon Sep 17 00:00:00 2001 From: Pluviobyte Date: Fri, 29 May 2026 06:46:49 +0000 Subject: [PATCH] fix(gemini): close tool_use block before text in messages streaming When the Gemini->Anthropic streaming bridge for the /v1/messages endpoint receives a functionCall part followed by a text part, the text branch in handleStreamingResponse opened a new text content block without closing the already-open tool_use block. The tool block's content_block_stop was only emitted at end-of-stream, after the text block's content_block_start, so the Anthropic SSE stream contained overlapping/unterminated content blocks. Clients that assemble messages by block index (e.g. Claude Code) can drop the tool input or mis-parse the response. The functionCall branch already closes an open text block before opening a tool block, and the chat-completions sibling closes the tool block in its text branch via closeOpenTool(). This applies the same symmetric handling to the messages variant: close any open tool_use block (resetting openToolIndex/openToolName/ seenToolJSON) before starting text. Adds a regression test that replays a tool->text Gemini stream and asserts the Anthropic content-block lifecycle never overlaps. --- .../service/gemini_messages_compat_service.go | 16 +++ .../gemini_messages_compat_service_test.go | 105 ++++++++++++++++++ 2 files changed, 121 insertions(+) diff --git a/backend/internal/service/gemini_messages_compat_service.go b/backend/internal/service/gemini_messages_compat_service.go index 516556ca..64f19b2e 100644 --- a/backend/internal/service/gemini_messages_compat_service.go +++ b/backend/internal/service/gemini_messages_compat_service.go @@ -2031,6 +2031,22 @@ func (s *GeminiMessagesCompatService) handleStreamingResponse(c *gin.Context, re parts := extractGeminiParts(geminiResp) for _, part := range parts { if text, ok := part["text"].(string); ok && text != "" { + // Close an open tool_use block before starting text, mirroring + // the functionCall branch (which closes open text blocks) and + // the chat-completions sibling's closeOpenTool(). Otherwise a + // tool→text sequence keeps the tool_use block open while the + // text block starts, emitting overlapping Anthropic content + // blocks that violate the SSE contract. + if openToolIndex >= 0 { + writeSSE(c.Writer, "content_block_stop", map[string]any{ + "type": "content_block_stop", + "index": openToolIndex, + }) + openToolIndex = -1 + openToolName = "" + seenToolJSON = "" + } + delta, newSeen := computeGeminiTextDelta(seenText, text) seenText = newSeen if delta == "" { diff --git a/backend/internal/service/gemini_messages_compat_service_test.go b/backend/internal/service/gemini_messages_compat_service_test.go index d0560344..79db633a 100644 --- a/backend/internal/service/gemini_messages_compat_service_test.go +++ b/backend/internal/service/gemini_messages_compat_service_test.go @@ -832,3 +832,108 @@ func TestParseGeminiRateLimitResetTime(t *testing.T) { }) } } + +// TestGeminiMessagesHandleStreamingResponse_ClosesToolBlockBeforeText guards the +// tool→text ordering in the Gemini→Anthropic (messages) streaming bridge. When +// Gemini emits a functionCall part followed by a text part, the tool_use content +// block must be closed before the text block opens; otherwise the Anthropic SSE +// stream contains overlapping content blocks. The chat-completions sibling +// already enforces this via closeOpenTool(). +func TestGeminiMessagesHandleStreamingResponse_ClosesToolBlockBeforeText(t *testing.T) { + gin.SetMode(gin.TestMode) + + upstreamBody := `data: {"candidates":[{"content":{"parts":[{"functionCall":{"name":"get_weather","args":{"city":"SF"}}}]}}]}` + "\n\n" + + `data: {"candidates":[{"content":{"parts":[{"text":"All done."}]},"finishReason":"STOP"}],"usageMetadata":{"promptTokenCount":5,"candidatesTokenCount":3}}` + "\n\n" + + "data: [DONE]\n\n" + + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: io.NopCloser(strings.NewReader(upstreamBody)), + } + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + + svc := &GeminiMessagesCompatService{} + result, err := svc.handleStreamingResponse(c, resp, time.Now(), "claude-3-5-sonnet") + require.NoError(t, err) + require.NotNil(t, result) + + events := parseAnthropicContentBlockEvents(t, rec.Body.String()) + + // Anthropic allows at most one content block open at a time: every + // content_block_start must be matched by a content_block_stop before the + // next start. Replay the lifecycle and assert there is no overlap. + open := -1 + blockTypes := map[int]string{} + textStarted := false + toolClosed := false + toolClosedBeforeText := false + for _, ev := range events { + switch ev.event { + case "content_block_start": + require.Equalf(t, -1, open, + "content block %d opened while block %d was still open (overlapping blocks)", ev.index, open) + open = ev.index + blockTypes[ev.index] = ev.blockType + if ev.blockType == "text" { + textStarted = true + if toolClosed { + toolClosedBeforeText = true + } + } + case "content_block_stop": + require.Equalf(t, open, ev.index, + "content_block_stop index %d does not match the open block %d", ev.index, open) + if blockTypes[ev.index] == "tool_use" { + toolClosed = true + } + open = -1 + } + } + + require.True(t, textStarted, "expected a text content block to be emitted after the tool call") + require.True(t, toolClosedBeforeText, "tool_use block must be closed before the text block starts") + require.Equal(t, -1, open, "stream ended with a content block still open") +} + +type anthropicContentBlockEvent struct { + event string + index int + blockType string +} + +// parseAnthropicContentBlockEvents extracts content_block_start/stop events (with +// their index and, for starts, the content block type) from an Anthropic SSE body. +func parseAnthropicContentBlockEvents(t *testing.T, raw string) []anthropicContentBlockEvent { + t.Helper() + var events []anthropicContentBlockEvent + for _, chunk := range strings.Split(raw, "\n\n") { + var eventName, dataLine string + for _, line := range strings.Split(chunk, "\n") { + switch { + case strings.HasPrefix(line, "event:"): + eventName = strings.TrimSpace(strings.TrimPrefix(line, "event:")) + case strings.HasPrefix(line, "data:"): + dataLine = strings.TrimSpace(strings.TrimPrefix(line, "data:")) + } + } + if eventName != "content_block_start" && eventName != "content_block_stop" { + continue + } + var payload struct { + Index int `json:"index"` + ContentBlock struct { + Type string `json:"type"` + } `json:"content_block"` + } + require.NoError(t, json.Unmarshal([]byte(dataLine), &payload)) + events = append(events, anthropicContentBlockEvent{ + event: eventName, + index: payload.Index, + blockType: payload.ContentBlock.Type, + }) + } + return events +}