fix(gemini): close tool_use block before text in messages streaming
When the Gemini->Anthropic streaming bridge for the /v1/messages endpoint receives a functionCall part followed by a text part, the text branch in handleStreamingResponse opened a new text content block without closing the already-open tool_use block. The tool block's content_block_stop was only emitted at end-of-stream, after the text block's content_block_start, so the Anthropic SSE stream contained overlapping/unterminated content blocks. Clients that assemble messages by block index (e.g. Claude Code) can drop the tool input or mis-parse the response. The functionCall branch already closes an open text block before opening a tool block, and the chat-completions sibling closes the tool block in its text branch via closeOpenTool(). This applies the same symmetric handling to the messages variant: close any open tool_use block (resetting openToolIndex/openToolName/ seenToolJSON) before starting text. Adds a regression test that replays a tool->text Gemini stream and asserts the Anthropic content-block lifecycle never overlaps.
This commit is contained in:
parent
1d46be02ae
commit
0a521f09fb
@ -2031,6 +2031,22 @@ func (s *GeminiMessagesCompatService) handleStreamingResponse(c *gin.Context, re
|
||||
parts := extractGeminiParts(geminiResp)
|
||||
for _, part := range parts {
|
||||
if text, ok := part["text"].(string); ok && text != "" {
|
||||
// Close an open tool_use block before starting text, mirroring
|
||||
// the functionCall branch (which closes open text blocks) and
|
||||
// the chat-completions sibling's closeOpenTool(). Otherwise a
|
||||
// tool→text sequence keeps the tool_use block open while the
|
||||
// text block starts, emitting overlapping Anthropic content
|
||||
// blocks that violate the SSE contract.
|
||||
if openToolIndex >= 0 {
|
||||
writeSSE(c.Writer, "content_block_stop", map[string]any{
|
||||
"type": "content_block_stop",
|
||||
"index": openToolIndex,
|
||||
})
|
||||
openToolIndex = -1
|
||||
openToolName = ""
|
||||
seenToolJSON = ""
|
||||
}
|
||||
|
||||
delta, newSeen := computeGeminiTextDelta(seenText, text)
|
||||
seenText = newSeen
|
||||
if delta == "" {
|
||||
|
||||
@ -832,3 +832,108 @@ func TestParseGeminiRateLimitResetTime(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestGeminiMessagesHandleStreamingResponse_ClosesToolBlockBeforeText guards the
|
||||
// tool→text ordering in the Gemini→Anthropic (messages) streaming bridge. When
|
||||
// Gemini emits a functionCall part followed by a text part, the tool_use content
|
||||
// block must be closed before the text block opens; otherwise the Anthropic SSE
|
||||
// stream contains overlapping content blocks. The chat-completions sibling
|
||||
// already enforces this via closeOpenTool().
|
||||
func TestGeminiMessagesHandleStreamingResponse_ClosesToolBlockBeforeText(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
upstreamBody := `data: {"candidates":[{"content":{"parts":[{"functionCall":{"name":"get_weather","args":{"city":"SF"}}}]}}]}` + "\n\n" +
|
||||
`data: {"candidates":[{"content":{"parts":[{"text":"All done."}]},"finishReason":"STOP"}],"usageMetadata":{"promptTokenCount":5,"candidatesTokenCount":3}}` + "\n\n" +
|
||||
"data: [DONE]\n\n"
|
||||
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Header: http.Header{"Content-Type": []string{"text/event-stream"}},
|
||||
Body: io.NopCloser(strings.NewReader(upstreamBody)),
|
||||
}
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
|
||||
svc := &GeminiMessagesCompatService{}
|
||||
result, err := svc.handleStreamingResponse(c, resp, time.Now(), "claude-3-5-sonnet")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, result)
|
||||
|
||||
events := parseAnthropicContentBlockEvents(t, rec.Body.String())
|
||||
|
||||
// Anthropic allows at most one content block open at a time: every
|
||||
// content_block_start must be matched by a content_block_stop before the
|
||||
// next start. Replay the lifecycle and assert there is no overlap.
|
||||
open := -1
|
||||
blockTypes := map[int]string{}
|
||||
textStarted := false
|
||||
toolClosed := false
|
||||
toolClosedBeforeText := false
|
||||
for _, ev := range events {
|
||||
switch ev.event {
|
||||
case "content_block_start":
|
||||
require.Equalf(t, -1, open,
|
||||
"content block %d opened while block %d was still open (overlapping blocks)", ev.index, open)
|
||||
open = ev.index
|
||||
blockTypes[ev.index] = ev.blockType
|
||||
if ev.blockType == "text" {
|
||||
textStarted = true
|
||||
if toolClosed {
|
||||
toolClosedBeforeText = true
|
||||
}
|
||||
}
|
||||
case "content_block_stop":
|
||||
require.Equalf(t, open, ev.index,
|
||||
"content_block_stop index %d does not match the open block %d", ev.index, open)
|
||||
if blockTypes[ev.index] == "tool_use" {
|
||||
toolClosed = true
|
||||
}
|
||||
open = -1
|
||||
}
|
||||
}
|
||||
|
||||
require.True(t, textStarted, "expected a text content block to be emitted after the tool call")
|
||||
require.True(t, toolClosedBeforeText, "tool_use block must be closed before the text block starts")
|
||||
require.Equal(t, -1, open, "stream ended with a content block still open")
|
||||
}
|
||||
|
||||
type anthropicContentBlockEvent struct {
|
||||
event string
|
||||
index int
|
||||
blockType string
|
||||
}
|
||||
|
||||
// parseAnthropicContentBlockEvents extracts content_block_start/stop events (with
|
||||
// their index and, for starts, the content block type) from an Anthropic SSE body.
|
||||
func parseAnthropicContentBlockEvents(t *testing.T, raw string) []anthropicContentBlockEvent {
|
||||
t.Helper()
|
||||
var events []anthropicContentBlockEvent
|
||||
for _, chunk := range strings.Split(raw, "\n\n") {
|
||||
var eventName, dataLine string
|
||||
for _, line := range strings.Split(chunk, "\n") {
|
||||
switch {
|
||||
case strings.HasPrefix(line, "event:"):
|
||||
eventName = strings.TrimSpace(strings.TrimPrefix(line, "event:"))
|
||||
case strings.HasPrefix(line, "data:"):
|
||||
dataLine = strings.TrimSpace(strings.TrimPrefix(line, "data:"))
|
||||
}
|
||||
}
|
||||
if eventName != "content_block_start" && eventName != "content_block_stop" {
|
||||
continue
|
||||
}
|
||||
var payload struct {
|
||||
Index int `json:"index"`
|
||||
ContentBlock struct {
|
||||
Type string `json:"type"`
|
||||
} `json:"content_block"`
|
||||
}
|
||||
require.NoError(t, json.Unmarshal([]byte(dataLine), &payload))
|
||||
events = append(events, anthropicContentBlockEvent{
|
||||
event: eventName,
|
||||
index: payload.Index,
|
||||
blockType: payload.ContentBlock.Type,
|
||||
})
|
||||
}
|
||||
return events
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user