From 6327573534d903efc1d63d0e3af92d683d3d293e Mon Sep 17 00:00:00 2001 From: alfadb Date: Tue, 28 Apr 2026 19:12:48 +0800 Subject: [PATCH] fix(gateway): wrap Anthropic stream EOF as failover error before client output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Anthropic streaming path (gateway_service.go) returned a plain error on upstream SSE read failure, so the handler-level UpstreamFailoverError check never fired and the client received a bare `stream_read_error` event, breaking long-running tasks even when no bytes had been written yet. The most common trigger is HTTP/2 GOAWAY from api.anthropic.com edge backends doing graceful rotation: Go's http.Transport surfaces this as `unexpected EOF` and never auto-retries. Mirror what the OpenAI and antigravity gateways already do: when the read error happens before any byte has reached the client (`!c.Writer.Written()`), return `*UpstreamFailoverError{StatusCode: 502, RetryableOnSameAccount: true}` so the handler can retry on the same or another account. After client output has begun, SSE has no resume protocol — keep the existing passthrough behavior. Tests cover both branches via streamReadCloser-based fixtures. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/internal/service/gateway_service.go | 14 ++++ .../service/gateway_streaming_test.go | 70 +++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 6be19ba6..911bc6fc 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -7041,6 +7041,20 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http sendErrorEvent("response_too_large") return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, ev.err } + // 上游中途读错误(unexpected EOF / connection reset 等,常见于 HTTP/2 GOAWAY): + // 若尚未向客户端写过任何字节,包成 UpstreamFailoverError 让 handler 层走 failover/重试。 + // 已经开始写流时 SSE 协议无 resume,只能透传错误事件给客户端。 + if !c.Writer.Written() { + logger.LegacyPrintf("service.gateway", "Upstream stream read error before any client output (account=%d), failing over: %v", account.ID, ev.err) + body, _ := json.Marshal(map[string]string{ + "error": fmt.Sprintf("upstream stream disconnected: %s", ev.err), + }) + return nil, &UpstreamFailoverError{ + StatusCode: http.StatusBadGateway, + ResponseBody: body, + RetryableOnSameAccount: true, + } + } sendErrorEvent("stream_read_error") return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream read error: %w", ev.err) } diff --git a/backend/internal/service/gateway_streaming_test.go b/backend/internal/service/gateway_streaming_test.go index b1584827..389831fa 100644 --- a/backend/internal/service/gateway_streaming_test.go +++ b/backend/internal/service/gateway_streaming_test.go @@ -4,9 +4,11 @@ package service import ( "context" + "errors" "io" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -218,3 +220,71 @@ func TestHandleStreamingResponse_SpecialCharactersInJSON(t *testing.T) { body := rec.Body.String() require.Contains(t, body, "content_block_delta", "响应应包含转发的 SSE 事件") } + +// 上游中途读错误(如 HTTP/2 GOAWAY 触发的 unexpected EOF)发生在向客户端写入任何字节前: +// 网关应返回 *UpstreamFailoverError 触发账号 failover/重试,而不是把错误事件直接发给客户端。 +func TestHandleStreamingResponse_StreamReadErrorBeforeOutput_TriggersFailover(t *testing.T) { + gin.SetMode(gin.TestMode) + svc := newMinimalGatewayService() + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: &streamReadCloser{err: io.ErrUnexpectedEOF}, + } + + result, err := svc.handleStreamingResponse(context.Background(), resp, c, &Account{ID: 1}, time.Now(), "model", "model", false) + + require.Error(t, err) + require.Nil(t, result, "失败移交场景下不应返回 streamingResult") + + var failoverErr *UpstreamFailoverError + require.True(t, errors.As(err, &failoverErr), "未输出过字节时 stream read error 必须包成 UpstreamFailoverError,期望: %v", err) + require.Equal(t, http.StatusBadGateway, failoverErr.StatusCode) + require.True(t, failoverErr.RetryableOnSameAccount, "GOAWAY 类错误应允许同账号重试") + require.Contains(t, string(failoverErr.ResponseBody), "upstream stream disconnected") + + // 客户端应收不到任何 stream_read_error 事件,由 handler 层根据 failover 结果再决定 + require.NotContains(t, rec.Body.String(), "stream_read_error") +} + +// 上游已经发送过事件(c.Writer 已写过字节)后再发生读错误: +// SSE 协议无 resume,网关只能透传 stream_read_error 错误事件给客户端,不能 failover。 +func TestHandleStreamingResponse_StreamReadErrorAfterOutput_PassesThrough(t *testing.T) { + gin.SetMode(gin.TestMode) + svc := newMinimalGatewayService() + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + // 第一次 Read 返回完整 SSE 事件让网关向 client 写入字节,第二次 Read 返回 EOF + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: &streamReadCloser{ + payload: []byte("data: {\"type\":\"message_start\",\"message\":{\"usage\":{\"input_tokens\":5}}}\n\n"), + err: io.ErrUnexpectedEOF, + }, + } + + result, err := svc.handleStreamingResponse(context.Background(), resp, c, &Account{ID: 1}, time.Now(), "model", "model", false) + + require.Error(t, err) + require.Contains(t, err.Error(), "stream read error", "已开始流后应透传普通 stream read error") + require.NotNil(t, result, "透传场景下应返回已收集的 streamingResult") + + // 不应被错误地包成 failover error + var failoverErr *UpstreamFailoverError + require.False(t, errors.As(err, &failoverErr), "已经向客户端写过字节时不能再 failover") + + // 客户端必须收到 stream_read_error 事件 + body := rec.Body.String() + require.True(t, + strings.Contains(body, "stream_read_error"), + "已开始流后必须发送 stream_read_error 事件给客户端,实际响应: %q", body) +}