From b34cc71bee0f531f657ae1ff5c5eeed5c8d00c1d Mon Sep 17 00:00:00 2001 From: Jamie Wong Date: Sun, 24 May 2026 22:00:56 +0800 Subject: [PATCH] fix(openai): also emit response.failed in ensureForwardErrorResponse after Writer.Written MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Case B: when a slot wait flushes SSE ping comments first (Writer.Written becomes true), the previous ensureForwardErrorResponse short-circuited on `c.Writer.Written()` and returned false without notifying the client. Subsequent upstream errors (http2 timeout, stream INTERNAL_ERROR, etc.) produced silent EOF; Codex CLI reported "stream closed before response.completed" just like the user-slot timeout case. Remove the Written() early return; coerce streamStarted to true when Writer has already been written to, and let handleStreamingAwareError walk the existing logic — which now (thanks to the previous commits) emits a protocol-compliant response.failed for /responses paths and the legacy `event: error` for others. Update tests that previously asserted "do not override written response": the new contract is to *append* an SSE terminal frame so the client sees a clean close instead of EOF. recoverResponsesPanic inherits this fix. Co-Authored-By: Claude Opus 4.7 --- backend/internal/handler/gateway_handler.go | 8 +++- .../gateway_handler_error_fallback_test.go | 28 ++++++++++-- .../handler/openai_gateway_handler.go | 10 ++++- .../handler/openai_gateway_handler_test.go | 43 ++++++++++++++++--- 4 files changed, 79 insertions(+), 10 deletions(-) diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index be55e69b..51c2d94e 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -1446,10 +1446,16 @@ func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, e } // ensureForwardErrorResponse 在 Forward 返回错误但尚未写响应时补写统一错误响应。 +// Writer 已被写过时(ping 已 flush)走 streamStarted 分支, +// 让 handleStreamingAwareError 通过 SSE 发协议合规的终止事件, +// 否则下游收到的就是 silent EOF。 func (h *GatewayHandler) ensureForwardErrorResponse(c *gin.Context, streamStarted bool) bool { - if c == nil || c.Writer == nil || c.Writer.Written() { + if c == nil || c.Writer == nil { return false } + if c.Writer.Written() { + streamStarted = true + } h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed", streamStarted) return true } diff --git a/backend/internal/handler/gateway_handler_error_fallback_test.go b/backend/internal/handler/gateway_handler_error_fallback_test.go index 4fce5ec1..fe9e2ebf 100644 --- a/backend/internal/handler/gateway_handler_error_fallback_test.go +++ b/backend/internal/handler/gateway_handler_error_fallback_test.go @@ -33,7 +33,9 @@ func TestGatewayEnsureForwardErrorResponse_WritesFallbackWhenNotWritten(t *testi assert.Equal(t, "Upstream request failed", errorObj["message"]) } -func TestGatewayEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *testing.T) { +// Writer 已写后 ensureForwardErrorResponse 必须把错误以 SSE 形式追加, +// 而不是 silent EOF。非 /responses 路径走 legacy data:{"type":"error"} 分支。 +func TestGatewayEnsureForwardErrorResponse_AppendsSSEAfterWritten(t *testing.T) { gin.SetMode(gin.TestMode) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -43,7 +45,27 @@ func TestGatewayEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *tes h := &GatewayHandler{} wrote := h.ensureForwardErrorResponse(c, false) - require.False(t, wrote) + require.True(t, wrote) require.Equal(t, http.StatusTeapot, w.Code) - assert.Equal(t, "already written", w.Body.String()) + assert.Contains(t, w.Body.String(), "already written") + assert.Contains(t, w.Body.String(), `data: {"type":"error"`) +} + +// case B 回归:Anthropic-backed /responses,Writer 已被写过时 +// ensureForwardErrorResponse 仍要发 response.failed。 +func TestGatewayEnsureForwardErrorResponse_ResponsesRouteAfterWrittenEmitsResponseFailed(t *testing.T) { + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodPost, EndpointResponses, nil) + _, _ = c.Writer.WriteString(":\n\n") + + h := &GatewayHandler{} + wrote := h.ensureForwardErrorResponse(c, false) + + require.True(t, wrote) + body := w.Body.String() + assert.Contains(t, body, ":\n\n") + assert.Contains(t, body, "event: response.failed\n") + assert.Contains(t, body, `"type":"response.failed"`) } diff --git a/backend/internal/handler/openai_gateway_handler.go b/backend/internal/handler/openai_gateway_handler.go index ea95b812..5464d654 100644 --- a/backend/internal/handler/openai_gateway_handler.go +++ b/backend/internal/handler/openai_gateway_handler.go @@ -1719,9 +1719,17 @@ func (h *OpenAIGatewayHandler) handleStreamingAwareError(c *gin.Context, status // ensureForwardErrorResponse 在 Forward 返回错误但尚未写响应时补写统一错误响应。 func (h *OpenAIGatewayHandler) ensureForwardErrorResponse(c *gin.Context, streamStarted bool) bool { - if c == nil || c.Writer == nil || c.Writer.Written() { + if c == nil || c.Writer == nil { return false } + // 旧实现在 Writer.Written 时直接 return false,导致 ping 已 flush 之后的 + // 上游错误(http2 timeout、连接中断等)完全无法把错误传给客户端—— + // HTTP 200 已锁死,TCP 直接 EOF,Codex CLI 报 "stream closed before response.completed"。 + // 这里改成:Writer 已写过时强制走 streamStarted 分支,让 + // handleStreamingAwareError 通过 SSE 发协议合规的 response.failed。 + if c.Writer.Written() { + streamStarted = true + } h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed", streamStarted) return true } diff --git a/backend/internal/handler/openai_gateway_handler_test.go b/backend/internal/handler/openai_gateway_handler_test.go index 6bddbce9..49b7b9ec 100644 --- a/backend/internal/handler/openai_gateway_handler_test.go +++ b/backend/internal/handler/openai_gateway_handler_test.go @@ -174,7 +174,11 @@ func TestOpenAIEnsureForwardErrorResponse_WritesFallbackWhenNotWritten(t *testin assert.Equal(t, "Upstream request failed", errorObj["message"]) } -func TestOpenAIEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *testing.T) { +// Writer 已写后 ensureForwardErrorResponse 必须仍然把错误信息以 SSE +// 形式追加给客户端(streamStarted 强制 true)。 +// 这是 case B 修复:旧实现遇到 Writer.Written 直接 return false, +// 客户端只能拿到 silent EOF;Codex CLI 报 "stream closed before response.completed"。 +func TestOpenAIEnsureForwardErrorResponse_AppendsSSEAfterWritten(t *testing.T) { gin.SetMode(gin.TestMode) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -184,9 +188,34 @@ func TestOpenAIEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *test h := &OpenAIGatewayHandler{} wrote := h.ensureForwardErrorResponse(c, false) - require.False(t, wrote) + require.True(t, wrote, "must attempt to communicate the failure to the client via SSE") + // 状态码改不了(headers 已 flush),但 body 应该追加 SSE 错误事件。 require.Equal(t, http.StatusTeapot, w.Code) - assert.Equal(t, "already written", w.Body.String()) + assert.Contains(t, w.Body.String(), "already written") + // 非 /responses 路径走 legacy event: error 分支。 + assert.Contains(t, w.Body.String(), "event: error\n") +} + +// case B 回归测试:/responses 路径,Writer 已被写过(模拟 ping flushed), +// ensureForwardErrorResponse 必须发 response.failed,让 Codex 收到合规终止事件。 +func TestOpenAIEnsureForwardErrorResponse_ResponsesRouteAfterWrittenEmitsResponseFailed(t *testing.T) { + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodPost, EndpointResponses, nil) + // 模拟 ping 已 flush 的状态:Writer 已写过 1 个字节 + _, _ = c.Writer.WriteString(":\n\n") + + h := &OpenAIGatewayHandler{} + wrote := h.ensureForwardErrorResponse(c, false) + + require.True(t, wrote) + body := w.Body.String() + assert.Contains(t, body, ":\n\n", "earlier ping bytes preserved") + assert.Contains(t, body, "event: response.failed\n", "appended a Responses terminal event") + assert.Contains(t, body, `"type":"response.failed"`) + assert.Contains(t, body, `"code":"upstream_error"`) + assert.Contains(t, body, "Upstream request failed") } func TestShouldLogOpenAIForwardFailureAsWarn(t *testing.T) { @@ -266,7 +295,9 @@ func TestOpenAIRecoverResponsesPanic_NoPanicNoWrite(t *testing.T) { assert.Equal(t, "", w.Body.String()) } -func TestOpenAIRecoverResponsesPanic_DoesNotOverrideWrittenResponse(t *testing.T) { +// Panic 在已 flush 的 /v1/responses 流中:状态码无法改(已 written), +// 但 body 应追加 response.failed 让客户端识别为合规截断而不是 silent EOF。 +func TestOpenAIRecoverResponsesPanic_AppendsResponseFailedAfterWritten(t *testing.T) { gin.SetMode(gin.TestMode) w := httptest.NewRecorder() @@ -284,7 +315,9 @@ func TestOpenAIRecoverResponsesPanic_DoesNotOverrideWrittenResponse(t *testing.T }) require.Equal(t, http.StatusTeapot, w.Code) - assert.Equal(t, "already written", w.Body.String()) + body := w.Body.String() + assert.Contains(t, body, "already written") + assert.Contains(t, body, "event: response.failed\n") } func TestOpenAIMissingResponsesDependencies(t *testing.T) {