fix(openai): also emit response.failed in ensureForwardErrorResponse after Writer.Written

Case B: when a slot wait flushes SSE ping comments first (Writer.Written
becomes true), the previous ensureForwardErrorResponse short-circuited
on `c.Writer.Written()` and returned false without notifying the client.
Subsequent upstream errors (http2 timeout, stream INTERNAL_ERROR, etc.)
produced silent EOF; Codex CLI reported "stream closed before
response.completed" just like the user-slot timeout case.

Remove the Written() early return; coerce streamStarted to true when
Writer has already been written to, and let handleStreamingAwareError
walk the existing logic — which now (thanks to the previous commits)
emits a protocol-compliant response.failed for /responses paths and the
legacy `event: error` for others.

Update tests that previously asserted "do not override written response":
the new contract is to *append* an SSE terminal frame so the client sees
a clean close instead of EOF. recoverResponsesPanic inherits this fix.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Jamie Wong 2026-05-24 22:00:56 +08:00
parent cff2f291be
commit b34cc71bee
4 changed files with 79 additions and 10 deletions

View File

@ -1446,10 +1446,16 @@ func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, e
}
// ensureForwardErrorResponse 在 Forward 返回错误但尚未写响应时补写统一错误响应。
// Writer 已被写过时ping 已 flush走 streamStarted 分支,
// 让 handleStreamingAwareError 通过 SSE 发协议合规的终止事件,
// 否则下游收到的就是 silent EOF。
func (h *GatewayHandler) ensureForwardErrorResponse(c *gin.Context, streamStarted bool) bool {
if c == nil || c.Writer == nil || c.Writer.Written() {
if c == nil || c.Writer == nil {
return false
}
if c.Writer.Written() {
streamStarted = true
}
h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed", streamStarted)
return true
}

View File

@ -33,7 +33,9 @@ func TestGatewayEnsureForwardErrorResponse_WritesFallbackWhenNotWritten(t *testi
assert.Equal(t, "Upstream request failed", errorObj["message"])
}
func TestGatewayEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *testing.T) {
// Writer 已写后 ensureForwardErrorResponse 必须把错误以 SSE 形式追加,
// 而不是 silent EOF。非 /responses 路径走 legacy data:{"type":"error"} 分支。
func TestGatewayEnsureForwardErrorResponse_AppendsSSEAfterWritten(t *testing.T) {
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@ -43,7 +45,27 @@ func TestGatewayEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *tes
h := &GatewayHandler{}
wrote := h.ensureForwardErrorResponse(c, false)
require.False(t, wrote)
require.True(t, wrote)
require.Equal(t, http.StatusTeapot, w.Code)
assert.Equal(t, "already written", w.Body.String())
assert.Contains(t, w.Body.String(), "already written")
assert.Contains(t, w.Body.String(), `data: {"type":"error"`)
}
// case B 回归Anthropic-backed /responsesWriter 已被写过时
// ensureForwardErrorResponse 仍要发 response.failed。
func TestGatewayEnsureForwardErrorResponse_ResponsesRouteAfterWrittenEmitsResponseFailed(t *testing.T) {
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodPost, EndpointResponses, nil)
_, _ = c.Writer.WriteString(":\n\n")
h := &GatewayHandler{}
wrote := h.ensureForwardErrorResponse(c, false)
require.True(t, wrote)
body := w.Body.String()
assert.Contains(t, body, ":\n\n")
assert.Contains(t, body, "event: response.failed\n")
assert.Contains(t, body, `"type":"response.failed"`)
}

View File

@ -1719,9 +1719,17 @@ func (h *OpenAIGatewayHandler) handleStreamingAwareError(c *gin.Context, status
// ensureForwardErrorResponse 在 Forward 返回错误但尚未写响应时补写统一错误响应。
func (h *OpenAIGatewayHandler) ensureForwardErrorResponse(c *gin.Context, streamStarted bool) bool {
if c == nil || c.Writer == nil || c.Writer.Written() {
if c == nil || c.Writer == nil {
return false
}
// 旧实现在 Writer.Written 时直接 return false导致 ping 已 flush 之后的
// 上游错误http2 timeout、连接中断等完全无法把错误传给客户端——
// HTTP 200 已锁死TCP 直接 EOFCodex CLI 报 "stream closed before response.completed"。
// 这里改成Writer 已写过时强制走 streamStarted 分支,让
// handleStreamingAwareError 通过 SSE 发协议合规的 response.failed。
if c.Writer.Written() {
streamStarted = true
}
h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed", streamStarted)
return true
}

View File

@ -174,7 +174,11 @@ func TestOpenAIEnsureForwardErrorResponse_WritesFallbackWhenNotWritten(t *testin
assert.Equal(t, "Upstream request failed", errorObj["message"])
}
func TestOpenAIEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *testing.T) {
// Writer 已写后 ensureForwardErrorResponse 必须仍然把错误信息以 SSE
// 形式追加给客户端streamStarted 强制 true
// 这是 case B 修复:旧实现遇到 Writer.Written 直接 return false
// 客户端只能拿到 silent EOFCodex CLI 报 "stream closed before response.completed"。
func TestOpenAIEnsureForwardErrorResponse_AppendsSSEAfterWritten(t *testing.T) {
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@ -184,9 +188,34 @@ func TestOpenAIEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *test
h := &OpenAIGatewayHandler{}
wrote := h.ensureForwardErrorResponse(c, false)
require.False(t, wrote)
require.True(t, wrote, "must attempt to communicate the failure to the client via SSE")
// 状态码改不了headers 已 flush但 body 应该追加 SSE 错误事件。
require.Equal(t, http.StatusTeapot, w.Code)
assert.Equal(t, "already written", w.Body.String())
assert.Contains(t, w.Body.String(), "already written")
// 非 /responses 路径走 legacy event: error 分支。
assert.Contains(t, w.Body.String(), "event: error\n")
}
// case B 回归测试:/responses 路径Writer 已被写过(模拟 ping flushed
// ensureForwardErrorResponse 必须发 response.failed让 Codex 收到合规终止事件。
func TestOpenAIEnsureForwardErrorResponse_ResponsesRouteAfterWrittenEmitsResponseFailed(t *testing.T) {
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodPost, EndpointResponses, nil)
// 模拟 ping 已 flush 的状态Writer 已写过 1 个字节
_, _ = c.Writer.WriteString(":\n\n")
h := &OpenAIGatewayHandler{}
wrote := h.ensureForwardErrorResponse(c, false)
require.True(t, wrote)
body := w.Body.String()
assert.Contains(t, body, ":\n\n", "earlier ping bytes preserved")
assert.Contains(t, body, "event: response.failed\n", "appended a Responses terminal event")
assert.Contains(t, body, `"type":"response.failed"`)
assert.Contains(t, body, `"code":"upstream_error"`)
assert.Contains(t, body, "Upstream request failed")
}
func TestShouldLogOpenAIForwardFailureAsWarn(t *testing.T) {
@ -266,7 +295,9 @@ func TestOpenAIRecoverResponsesPanic_NoPanicNoWrite(t *testing.T) {
assert.Equal(t, "", w.Body.String())
}
func TestOpenAIRecoverResponsesPanic_DoesNotOverrideWrittenResponse(t *testing.T) {
// Panic 在已 flush 的 /v1/responses 流中:状态码无法改(已 written
// 但 body 应追加 response.failed 让客户端识别为合规截断而不是 silent EOF。
func TestOpenAIRecoverResponsesPanic_AppendsResponseFailedAfterWritten(t *testing.T) {
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@ -284,7 +315,9 @@ func TestOpenAIRecoverResponsesPanic_DoesNotOverrideWrittenResponse(t *testing.T
})
require.Equal(t, http.StatusTeapot, w.Code)
assert.Equal(t, "already written", w.Body.String())
body := w.Body.String()
assert.Contains(t, body, "already written")
assert.Contains(t, body, "event: response.failed\n")
}
func TestOpenAIMissingResponsesDependencies(t *testing.T) {