From 5e5c2062bf436be34b5bb8dc5603c6ead9af45cc Mon Sep 17 00:00:00 2001 From: Jamie Wong Date: Sun, 24 May 2026 10:58:29 +0800 Subject: [PATCH 1/3] fix(openai): emit response.failed for /v1/responses after stream started When /v1/responses streaming hits the user/account concurrency wait, the wait loop sends SSE ping comments to keep the connection alive, which flushes HTTP 200 + headers. If the wait then times out (or any other post-flush error fires), handleStreamingAwareError previously emitted a generic `event: error` frame. Codex CLI requires the stream to end with a Responses terminal event (response.completed/failed/incomplete/cancelled), so it reports "stream closed before response.completed" and the user-facing rate-limit intent is lost. This change detects inbound = /v1/responses in both handleStreamingAwareError implementations and emits a protocol-compliant response.failed event whose field set mirrors apicompat.makeResponsesCompletedEvent (id/object/model/status/output/error). The synthetic id reuses ctxkey.RequestID so client errors can be grepped against server logs. sequence_number is intentionally omitted to preserve monotonicity on streams that already emitted real events. Other inbound endpoints (/v1/chat/completions, /v1/messages) keep their legacy formats untouched. Co-Authored-By: Claude Opus 4.7 --- backend/internal/handler/gateway_handler.go | 8 + .../handler/openai_gateway_handler.go | 9 + .../internal/handler/stream_error_event.go | 111 ++++++++++ .../handler/stream_error_event_test.go | 202 ++++++++++++++++++ 4 files changed, 330 insertions(+) create mode 100644 backend/internal/handler/stream_error_event.go create mode 100644 backend/internal/handler/stream_error_event_test.go diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 4420a87c..970d7472 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -1420,6 +1420,14 @@ func (h *GatewayHandler) mapUpstreamError(statusCode int) (int, string, string) // handleStreamingAwareError handles errors that may occur after streaming has started func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, errType, message string, streamStarted bool) { if streamStarted { + // /v1/responses 的严格 SDK(Codex CLI)要求终止事件必须属于 + // response.completed/failed/incomplete/cancelled 集合。 + // Anthropic-backed Responses 路径同样会因为通用 error 帧被拒。 + if GetInboundEndpoint(c) == EndpointResponses { + if writeResponsesFailedSSE(c, errType, message) { + return + } + } // Stream already started, send error as SSE event then close flusher, ok := c.Writer.(http.Flusher) if ok { diff --git a/backend/internal/handler/openai_gateway_handler.go b/backend/internal/handler/openai_gateway_handler.go index 9c5560f5..dd00a244 100644 --- a/backend/internal/handler/openai_gateway_handler.go +++ b/backend/internal/handler/openai_gateway_handler.go @@ -1691,6 +1691,15 @@ func (h *OpenAIGatewayHandler) mapUpstreamError(statusCode int) (int, string, st // handleStreamingAwareError handles errors that may occur after streaming has started func (h *OpenAIGatewayHandler) handleStreamingAwareError(c *gin.Context, status int, errType, message string, streamStarted bool) { if streamStarted { + // /v1/responses 的严格 SDK(Codex CLI)要求终止事件必须属于 + // response.completed/failed/incomplete/cancelled 集合。 + // 通用 `event: error` 帧不被识别为终止事件,会导致 + // "stream closed before response.completed"。 + if GetInboundEndpoint(c) == EndpointResponses { + if writeResponsesFailedSSE(c, errType, message) { + return + } + } // Stream already started, send error as SSE event then close flusher, ok := c.Writer.(http.Flusher) if ok { diff --git a/backend/internal/handler/stream_error_event.go b/backend/internal/handler/stream_error_event.go new file mode 100644 index 00000000..9c6378c2 --- /dev/null +++ b/backend/internal/handler/stream_error_event.go @@ -0,0 +1,111 @@ +package handler + +import ( + "fmt" + "net/http" + "strconv" + "strings" + + "github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey" + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +// writeResponsesFailedSSE emits a `response.failed` SSE event in the OpenAI +// Responses API protocol after the stream has already started. +// +// 必要性:一旦 SSE 头和任意数据(例如等待槽位时的 ping comment)已经 flush, +// HTTP 200 状态码就被固化。此后若网关需要回报错误,只能继续通过 SSE 事件传达。 +// 通用的 `event: error` 帧不是 Responses 协议规定的终止事件, +// Codex CLI 等严格 SDK 会因为没收到 `response.completed/failed/incomplete/cancelled` +// 而抛出 "stream closed before response.completed"。 +// +// 字段集对齐 apicompat.makeResponsesCompletedEvent:id/object/model/status/output/error。 +// 故意不写 sequence_number:本函数被调用时无法可靠拿到当前流的 last sequence, +// 而 OpenAI spec 将 sequence_number 设为可选;省略避免破坏单调性约束。 +// +// 返回 true 表示已尝试 SSE 写出(不论 Write 是否成功,caller 都应直接 return)。 +// 返回 false 表示 writer 不支持 Flusher,无法以 SSE 形式回报错误; +// 此时 caller 也无法回退到 JSON(HTTP 200 已固化),通常意味着连接已经损坏, +// 应当让请求处理函数 return,由上层关闭连接。 +func writeResponsesFailedSSE(c *gin.Context, errType, message string) bool { + flusher, ok := c.Writer.(http.Flusher) + if !ok { + return false + } + rid := synthesizeResponseID(c) + model := requestModel(c) + code := mapResponsesErrorCode(errType) + + var b strings.Builder + b.Grow(256 + len(message) + len(model)) + b.WriteString(`{"type":"response.failed","response":{`) + b.WriteString(`"id":`) + b.WriteString(strconv.Quote(rid)) + b.WriteString(`,"object":"response"`) + if model != "" { + b.WriteString(`,"model":`) + b.WriteString(strconv.Quote(model)) + } + b.WriteString(`,"status":"failed","output":[],"error":{"code":`) + b.WriteString(strconv.Quote(code)) + b.WriteString(`,"message":`) + b.WriteString(strconv.Quote(message)) + b.WriteString(`}}}`) + + if _, err := fmt.Fprintf(c.Writer, "event: response.failed\ndata: %s\n\n", b.String()); err != nil { + _ = c.Error(err) + return true + } + flusher.Flush() + return true +} + +// synthesizeResponseID 为合成的 response.failed 事件生成一个稳定的 id。 +// 优先复用 server 端生成的 request_id(存在 request.Context 里,由 request_logger 写入), +// 以便客户端报错能与 server 日志关联;缺失时回退 uuid。 +func synthesizeResponseID(c *gin.Context) string { + if c != nil && c.Request != nil { + if rid, ok := c.Request.Context().Value(ctxkey.RequestID).(string); ok { + if rid = strings.TrimSpace(rid); rid != "" { + return "resp_" + strings.ReplaceAll(rid, "-", "") + } + } + } + return "resp_" + strings.ReplaceAll(uuid.NewString(), "-", "") +} + +// requestModel 取当前请求的 inbound model(由 setOpsRequestContext 写入)。 +// 缺失时返回 "";caller 据此决定是否忽略该字段。 +func requestModel(c *gin.Context) string { + if c == nil { + return "" + } + if v, ok := c.Get(opsModelKey); ok { + if s, ok := v.(string); ok { + return strings.TrimSpace(s) + } + } + return "" +} + +// mapResponsesErrorCode 把内部 errType 映射为 Responses 协议常见的 error.code。 +// 无明确映射时原样返回,保证至少可读。 +func mapResponsesErrorCode(errType string) string { + switch errType { + case "rate_limit_error": + return "rate_limit_exceeded" + case "invalid_request_error": + return "invalid_request" + case "permission_error": + return "permission_denied" + case "authentication_error": + return "authentication_failed" + case "upstream_error": + return "upstream_error" + case "server_error", "api_error", "": + return "server_error" + default: + return errType + } +} diff --git a/backend/internal/handler/stream_error_event_test.go b/backend/internal/handler/stream_error_event_test.go new file mode 100644 index 00000000..e1abbc1d --- /dev/null +++ b/backend/internal/handler/stream_error_event_test.go @@ -0,0 +1,202 @@ +package handler + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey" + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Regression for the production incident on 2026-05-24 around 9:13 CST: +// user 16 sent /v1/responses with stream:true via Codex CLI; the user-concurrency +// slot wait sent SSE ping comments (flushing HTTP 200 + headers), then the 30s +// timeout fired and the handler emitted `event: error\ndata: {...}`. Codex CLI +// does not recognize that as a Responses terminal event and reports +// "stream closed before response.completed". The fix is to emit a synthetic +// response.failed event when the inbound endpoint is /v1/responses. + +func newGinContextForEndpoint(t *testing.T, endpoint string) (*gin.Context, *httptest.ResponseRecorder) { + t.Helper() + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodPost, endpoint, nil) + return c, w +} + +// parseResponsesFailedSSE 抽出 SSE 中 data 行的 JSON,返回 (response 对象, error 对象)。 +func parseResponsesFailedSSE(t *testing.T, body string) (map[string]any, map[string]any) { + t.Helper() + require.True(t, strings.HasPrefix(body, "event: response.failed\n"), + "expect event: response.failed prefix, got: %q", body) + require.True(t, strings.HasSuffix(body, "\n\n")) + + lines := strings.SplitN(strings.TrimSuffix(body, "\n\n"), "\n", 2) + require.Len(t, lines, 2) + require.True(t, strings.HasPrefix(lines[1], "data: ")) + jsonStr := strings.TrimPrefix(lines[1], "data: ") + + var parsed map[string]any + require.NoError(t, json.Unmarshal([]byte(jsonStr), &parsed), "data must be valid JSON: %s", jsonStr) + + assert.Equal(t, "response.failed", parsed["type"]) + // 故意不发 sequence_number,避免与后续真实事件的序号冲突。 + _, hasSeq := parsed["sequence_number"] + assert.False(t, hasSeq, "synthetic event must not emit sequence_number") + + resp, ok := parsed["response"].(map[string]any) + require.True(t, ok, "response object missing") + assert.Equal(t, "response", resp["object"]) + assert.Equal(t, "failed", resp["status"]) + + errObj, ok := resp["error"].(map[string]any) + require.True(t, ok, "error object missing") + + return resp, errObj +} + +// OpenAI handler: /v1/responses streaming, after stream started, must emit response.failed. +func TestOpenAIHandleStreamingAwareError_ResponsesStreamingEmitsResponseFailed(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointResponses) + h := &OpenAIGatewayHandler{} + h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", + "Concurrency limit exceeded for user, please retry later", true) + + resp, errObj := parseResponsesFailedSSE(t, w.Body.String()) + + id, _ := resp["id"].(string) + assert.True(t, strings.HasPrefix(id, "resp_"), "id should start with resp_, got %q", id) + assert.Equal(t, "rate_limit_exceeded", errObj["code"]) + assert.Equal(t, "Concurrency limit exceeded for user, please retry later", errObj["message"]) +} + +// 当 setOpsRequestContext 写过 model,合成事件应回填该字段(与 codebase 已有 makeResponsesCompletedEvent 对齐)。 +func TestOpenAIHandleStreamingAwareError_ResponsesStreamingIncludesModel(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointResponses) + setOpsRequestContext(c, "gpt-5.5", true) + + h := &OpenAIGatewayHandler{} + h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "boom", true) + + resp, _ := parseResponsesFailedSSE(t, w.Body.String()) + assert.Equal(t, "gpt-5.5", resp["model"]) +} + +// 没有 model 时 model 字段不应出现(避免发空字符串污染下游解析)。 +func TestOpenAIHandleStreamingAwareError_ResponsesStreamingOmitsEmptyModel(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointResponses) + h := &OpenAIGatewayHandler{} + h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "boom", true) + + resp, _ := parseResponsesFailedSSE(t, w.Body.String()) + _, hasModel := resp["model"] + assert.False(t, hasModel, "model field must be omitted when unknown") +} + +// 当 request.Context 携带 ctxkey.RequestID 时,合成 id 应与之关联,便于和 server log 串起来。 +func TestOpenAIHandleStreamingAwareError_ResponsesStreamingReusesRequestID(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointResponses) + c.Request = c.Request.WithContext( + context.WithValue(c.Request.Context(), ctxkey.RequestID, "fd277bc5-ff7e-45d1-8aa9-f54e1df318f1"), + ) + + h := &OpenAIGatewayHandler{} + h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "x", true) + + resp, _ := parseResponsesFailedSSE(t, w.Body.String()) + assert.Equal(t, "resp_fd277bc5ff7e45d18aa9f54e1df318f1", resp["id"]) +} + +// 与旧分支的 TestOpenAIHandleStreamingAwareError_JSONEscaping 对齐: +// 新的 response.failed payload 也必须正确转义 message 里的特殊字符, +// 否则下游 SDK 解析 JSON 时会失败。 +func TestOpenAIHandleStreamingAwareError_ResponsesStreamingJSONEscaping(t *testing.T) { + cases := []struct { + name string + errType string + message string + }{ + {"双引号", "server_error", `upstream returned "invalid" response`}, + {"反斜杠", "server_error", `path C:\Users\test\file.txt not found`}, + {"双引号+反斜杠", "upstream_error", `error parsing "key\value": unexpected token`}, + {"换行与制表", "server_error", "line1\nline2\ttab"}, + {"普通", "upstream_error", "Upstream service temporarily unavailable"}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointResponses) + h := &OpenAIGatewayHandler{} + h.handleStreamingAwareError(c, http.StatusBadGateway, tc.errType, tc.message, true) + + _, errObj := parseResponsesFailedSSE(t, w.Body.String()) + assert.Equal(t, tc.message, errObj["message"], "message 必须被原样还原") + }) + } +} + +// OpenAI handler: /v1/chat/completions streaming keeps the legacy event: error format +// (out of scope for this fix; covered to prevent regression of unrelated paths). +func TestOpenAIHandleStreamingAwareError_ChatCompletionsStreamingKeepsLegacy(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointChatCompletions) + h := &OpenAIGatewayHandler{} + h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "boom", true) + + body := w.Body.String() + assert.True(t, strings.HasPrefix(body, "event: error\n"), "got: %q", body) +} + +// Gateway (Anthropic-backed) handler: /v1/responses path also must emit response.failed. +func TestGatewayHandleStreamingAwareError_ResponsesStreamingEmitsResponseFailed(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointResponses) + h := &GatewayHandler{} + h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "upstream gone", true) + + _, errObj := parseResponsesFailedSSE(t, w.Body.String()) + assert.Equal(t, "upstream_error", errObj["code"]) + assert.Equal(t, "upstream gone", errObj["message"]) +} + +// Gateway handler: /v1/messages preserves the legacy data:{type:error,...} format +// (Anthropic spec accepts a type:"error" stream event). +func TestGatewayHandleStreamingAwareError_MessagesStreamingKeepsLegacy(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointMessages) + h := &GatewayHandler{} + h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "boom", true) + + body := w.Body.String() + assert.True(t, strings.HasPrefix(body, `data: {"type":"error"`), "got: %q", body) +} + +// Synthesized response.failed id falls back to uuid when no request_id is present. +func TestSynthesizeResponseID_FallbackUUID(t *testing.T) { + c, _ := newGinContextForEndpoint(t, EndpointResponses) + id := synthesizeResponseID(c) + assert.True(t, strings.HasPrefix(id, "resp_")) + // uuid 去掉短横线后 32 hex 字符;前缀 "resp_" 共 37。 + assert.Len(t, id, 37) +} + +func TestMapResponsesErrorCode(t *testing.T) { + cases := []struct{ in, out string }{ + {"rate_limit_error", "rate_limit_exceeded"}, + {"invalid_request_error", "invalid_request"}, + {"permission_error", "permission_denied"}, + {"authentication_error", "authentication_failed"}, + {"upstream_error", "upstream_error"}, + {"server_error", "server_error"}, + {"api_error", "server_error"}, + {"", "server_error"}, + {"custom_thing", "custom_thing"}, + } + for _, tc := range cases { + assert.Equal(t, tc.out, mapResponsesErrorCode(tc.in), "in=%q", tc.in) + } +} From cff2f291be21f9d8d3367cec949a1ecf97182ede Mon Sep 17 00:00:00 2001 From: Jamie Wong Date: Sun, 24 May 2026 19:32:08 +0800 Subject: [PATCH 2/3] fix(openai): also match bare /responses route in handleStreamingAwareError MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The first revision compared GetInboundEndpoint(c) against EndpointResponses ("/v1/responses"). NormalizeInboundEndpoint only recognizes paths that contain the literal "/v1/responses" substring, but the project actually registers six /responses routes — three of which (top-level r.POST("/responses", ...) and codexDirect's "/backend-api/codex/responses") have FullPath values without the "/v1" prefix and therefore fall through to the default branch. Codex CLI users targeting the bare /responses route at the production deployment (observed 2026-05-24 ~11:05 UTC, user 16) never reached the new writeResponsesFailedSSE path: the endpoint check was false, the legacy `event: error` frame fired, and the strict SDK kept reporting "stream closed before response.completed". Replace the strict equality check with inboundIsResponses(c), which uses suffix detection on FullPath (falling back to URL.Path when FullPath is empty in test fixtures) and covers all six route variants: /v1/responses[/...] /responses[/...] /backend-api/codex/responses[/...] Add test table covering all routes plus negative cases. Co-Authored-By: Claude Opus 4.7 --- backend/internal/handler/gateway_handler.go | 2 +- .../handler/openai_gateway_handler.go | 2 +- .../internal/handler/stream_error_event.go | 30 +++++++++++ .../handler/stream_error_event_test.go | 51 +++++++++++++++++++ 4 files changed, 83 insertions(+), 2 deletions(-) diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 970d7472..be55e69b 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -1423,7 +1423,7 @@ func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, e // /v1/responses 的严格 SDK(Codex CLI)要求终止事件必须属于 // response.completed/failed/incomplete/cancelled 集合。 // Anthropic-backed Responses 路径同样会因为通用 error 帧被拒。 - if GetInboundEndpoint(c) == EndpointResponses { + if inboundIsResponses(c) { if writeResponsesFailedSSE(c, errType, message) { return } diff --git a/backend/internal/handler/openai_gateway_handler.go b/backend/internal/handler/openai_gateway_handler.go index dd00a244..ea95b812 100644 --- a/backend/internal/handler/openai_gateway_handler.go +++ b/backend/internal/handler/openai_gateway_handler.go @@ -1695,7 +1695,7 @@ func (h *OpenAIGatewayHandler) handleStreamingAwareError(c *gin.Context, status // response.completed/failed/incomplete/cancelled 集合。 // 通用 `event: error` 帧不被识别为终止事件,会导致 // "stream closed before response.completed"。 - if GetInboundEndpoint(c) == EndpointResponses { + if inboundIsResponses(c) { if writeResponsesFailedSSE(c, errType, message) { return } diff --git a/backend/internal/handler/stream_error_event.go b/backend/internal/handler/stream_error_event.go index 9c6378c2..fc5bf61d 100644 --- a/backend/internal/handler/stream_error_event.go +++ b/backend/internal/handler/stream_error_event.go @@ -61,6 +61,36 @@ func writeResponsesFailedSSE(c *gin.Context, errType, message string) bool { return true } +// inboundIsResponses 判断当前请求是否落在任何 /responses 路由上。 +// +// 不能直接用 GetInboundEndpoint(c) == EndpointResponses 比较,因为 +// NormalizeInboundEndpoint 只识别包含 "/v1/responses" 子串的路径; +// 项目里实际注册了多组路由(gateway_v1、top-level bare、codex direct), +// 其中 r.POST("/responses", ...) 和 codexDirect.POST("/responses", ...) +// 的 c.FullPath() 不含 "/v1/" 前缀,会被归一化为原始路径, +// 导致协议合规终止事件没法发出去。 +// +// 这里用 FullPath 的后缀判断,覆盖所有变体: +// - /v1/responses +// - /v1/responses/compact +// - /responses +// - /responses/compact +// - /backend-api/codex/responses +// - /backend-api/codex/responses/compact +func inboundIsResponses(c *gin.Context) bool { + if c == nil { + return false + } + p := strings.TrimRight(c.FullPath(), "/") + if p == "" && c.Request != nil && c.Request.URL != nil { + p = strings.TrimRight(c.Request.URL.Path, "/") + } + if p == "" { + return false + } + return strings.HasSuffix(p, "/responses") || strings.Contains(p, "/responses/") +} + // synthesizeResponseID 为合成的 response.failed 事件生成一个稳定的 id。 // 优先复用 server 端生成的 request_id(存在 request.Context 里,由 request_logger 写入), // 以便客户端报错能与 server 日志关联;缺失时回退 uuid。 diff --git a/backend/internal/handler/stream_error_event_test.go b/backend/internal/handler/stream_error_event_test.go index e1abbc1d..721b5856 100644 --- a/backend/internal/handler/stream_error_event_test.go +++ b/backend/internal/handler/stream_error_event_test.go @@ -175,6 +175,57 @@ func TestGatewayHandleStreamingAwareError_MessagesStreamingKeepsLegacy(t *testin assert.True(t, strings.HasPrefix(body, `data: {"type":"error"`), "got: %q", body) } +// 项目里 /responses 注册在多组路由:/v1/responses(gateway)、裸 /responses(top-level)、 +// /backend-api/codex/responses(codex direct)。我们 fix 必须覆盖全部, +// 否则一些客户端走的路径就不会发 response.failed,照样报 stream closed。 +// 这是生产 2026-05-24 ~11:05 UTC user 16 实际命中的 bug。 +func TestInboundIsResponses_CoversAllRoutes(t *testing.T) { + cases := []struct { + route string + want bool + }{ + {"/v1/responses", true}, + {"/v1/responses/compact", true}, + {"/responses", true}, // <-- 用户 16 实际走这条 + {"/responses/compact", true}, + {"/backend-api/codex/responses", true}, + {"/backend-api/codex/responses/compact", true}, + {"/v1/chat/completions", false}, + {"/v1/messages", false}, + {"/", false}, + {"/responses-fake", false}, + } + for _, tc := range cases { + t.Run(tc.route, func(t *testing.T) { + c, _ := newGinContextForEndpoint(t, tc.route) + assert.Equal(t, tc.want, inboundIsResponses(c), "route=%q", tc.route) + }) + } +} + +// 用 c.Request.URL.Path 作为 fallback(当 c.FullPath() 为空时,例如某些测试 fixture)。 +func TestInboundIsResponses_FallsBackToURLPath(t *testing.T) { + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodPost, "/responses", nil) + // 这种情况下 c.FullPath() 是 "",必须 fallback 到 URL.Path + assert.True(t, inboundIsResponses(c), "URL.Path fallback must work when FullPath is empty") +} + +// 回归生产事故:用户 16 走 /responses 路径,必须发 response.failed。 +func TestOpenAIHandleStreamingAwareError_BareResponsesRouteEmitsResponseFailed(t *testing.T) { + c, w := newGinContextForEndpoint(t, "/responses") + h := &OpenAIGatewayHandler{} + h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", + "Concurrency limit exceeded for user, please retry later", true) + + resp, errObj := parseResponsesFailedSSE(t, w.Body.String()) + id, _ := resp["id"].(string) + assert.True(t, strings.HasPrefix(id, "resp_")) + assert.Equal(t, "rate_limit_exceeded", errObj["code"]) +} + // Synthesized response.failed id falls back to uuid when no request_id is present. func TestSynthesizeResponseID_FallbackUUID(t *testing.T) { c, _ := newGinContextForEndpoint(t, EndpointResponses) From b34cc71bee0f531f657ae1ff5c5eeed5c8d00c1d Mon Sep 17 00:00:00 2001 From: Jamie Wong Date: Sun, 24 May 2026 22:00:56 +0800 Subject: [PATCH 3/3] fix(openai): also emit response.failed in ensureForwardErrorResponse after Writer.Written MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Case B: when a slot wait flushes SSE ping comments first (Writer.Written becomes true), the previous ensureForwardErrorResponse short-circuited on `c.Writer.Written()` and returned false without notifying the client. Subsequent upstream errors (http2 timeout, stream INTERNAL_ERROR, etc.) produced silent EOF; Codex CLI reported "stream closed before response.completed" just like the user-slot timeout case. Remove the Written() early return; coerce streamStarted to true when Writer has already been written to, and let handleStreamingAwareError walk the existing logic — which now (thanks to the previous commits) emits a protocol-compliant response.failed for /responses paths and the legacy `event: error` for others. Update tests that previously asserted "do not override written response": the new contract is to *append* an SSE terminal frame so the client sees a clean close instead of EOF. recoverResponsesPanic inherits this fix. Co-Authored-By: Claude Opus 4.7 --- backend/internal/handler/gateway_handler.go | 8 +++- .../gateway_handler_error_fallback_test.go | 28 ++++++++++-- .../handler/openai_gateway_handler.go | 10 ++++- .../handler/openai_gateway_handler_test.go | 43 ++++++++++++++++--- 4 files changed, 79 insertions(+), 10 deletions(-) diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index be55e69b..51c2d94e 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -1446,10 +1446,16 @@ func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, e } // ensureForwardErrorResponse 在 Forward 返回错误但尚未写响应时补写统一错误响应。 +// Writer 已被写过时(ping 已 flush)走 streamStarted 分支, +// 让 handleStreamingAwareError 通过 SSE 发协议合规的终止事件, +// 否则下游收到的就是 silent EOF。 func (h *GatewayHandler) ensureForwardErrorResponse(c *gin.Context, streamStarted bool) bool { - if c == nil || c.Writer == nil || c.Writer.Written() { + if c == nil || c.Writer == nil { return false } + if c.Writer.Written() { + streamStarted = true + } h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed", streamStarted) return true } diff --git a/backend/internal/handler/gateway_handler_error_fallback_test.go b/backend/internal/handler/gateway_handler_error_fallback_test.go index 4fce5ec1..fe9e2ebf 100644 --- a/backend/internal/handler/gateway_handler_error_fallback_test.go +++ b/backend/internal/handler/gateway_handler_error_fallback_test.go @@ -33,7 +33,9 @@ func TestGatewayEnsureForwardErrorResponse_WritesFallbackWhenNotWritten(t *testi assert.Equal(t, "Upstream request failed", errorObj["message"]) } -func TestGatewayEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *testing.T) { +// Writer 已写后 ensureForwardErrorResponse 必须把错误以 SSE 形式追加, +// 而不是 silent EOF。非 /responses 路径走 legacy data:{"type":"error"} 分支。 +func TestGatewayEnsureForwardErrorResponse_AppendsSSEAfterWritten(t *testing.T) { gin.SetMode(gin.TestMode) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -43,7 +45,27 @@ func TestGatewayEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *tes h := &GatewayHandler{} wrote := h.ensureForwardErrorResponse(c, false) - require.False(t, wrote) + require.True(t, wrote) require.Equal(t, http.StatusTeapot, w.Code) - assert.Equal(t, "already written", w.Body.String()) + assert.Contains(t, w.Body.String(), "already written") + assert.Contains(t, w.Body.String(), `data: {"type":"error"`) +} + +// case B 回归:Anthropic-backed /responses,Writer 已被写过时 +// ensureForwardErrorResponse 仍要发 response.failed。 +func TestGatewayEnsureForwardErrorResponse_ResponsesRouteAfterWrittenEmitsResponseFailed(t *testing.T) { + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodPost, EndpointResponses, nil) + _, _ = c.Writer.WriteString(":\n\n") + + h := &GatewayHandler{} + wrote := h.ensureForwardErrorResponse(c, false) + + require.True(t, wrote) + body := w.Body.String() + assert.Contains(t, body, ":\n\n") + assert.Contains(t, body, "event: response.failed\n") + assert.Contains(t, body, `"type":"response.failed"`) } diff --git a/backend/internal/handler/openai_gateway_handler.go b/backend/internal/handler/openai_gateway_handler.go index ea95b812..5464d654 100644 --- a/backend/internal/handler/openai_gateway_handler.go +++ b/backend/internal/handler/openai_gateway_handler.go @@ -1719,9 +1719,17 @@ func (h *OpenAIGatewayHandler) handleStreamingAwareError(c *gin.Context, status // ensureForwardErrorResponse 在 Forward 返回错误但尚未写响应时补写统一错误响应。 func (h *OpenAIGatewayHandler) ensureForwardErrorResponse(c *gin.Context, streamStarted bool) bool { - if c == nil || c.Writer == nil || c.Writer.Written() { + if c == nil || c.Writer == nil { return false } + // 旧实现在 Writer.Written 时直接 return false,导致 ping 已 flush 之后的 + // 上游错误(http2 timeout、连接中断等)完全无法把错误传给客户端—— + // HTTP 200 已锁死,TCP 直接 EOF,Codex CLI 报 "stream closed before response.completed"。 + // 这里改成:Writer 已写过时强制走 streamStarted 分支,让 + // handleStreamingAwareError 通过 SSE 发协议合规的 response.failed。 + if c.Writer.Written() { + streamStarted = true + } h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed", streamStarted) return true } diff --git a/backend/internal/handler/openai_gateway_handler_test.go b/backend/internal/handler/openai_gateway_handler_test.go index 6bddbce9..49b7b9ec 100644 --- a/backend/internal/handler/openai_gateway_handler_test.go +++ b/backend/internal/handler/openai_gateway_handler_test.go @@ -174,7 +174,11 @@ func TestOpenAIEnsureForwardErrorResponse_WritesFallbackWhenNotWritten(t *testin assert.Equal(t, "Upstream request failed", errorObj["message"]) } -func TestOpenAIEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *testing.T) { +// Writer 已写后 ensureForwardErrorResponse 必须仍然把错误信息以 SSE +// 形式追加给客户端(streamStarted 强制 true)。 +// 这是 case B 修复:旧实现遇到 Writer.Written 直接 return false, +// 客户端只能拿到 silent EOF;Codex CLI 报 "stream closed before response.completed"。 +func TestOpenAIEnsureForwardErrorResponse_AppendsSSEAfterWritten(t *testing.T) { gin.SetMode(gin.TestMode) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -184,9 +188,34 @@ func TestOpenAIEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *test h := &OpenAIGatewayHandler{} wrote := h.ensureForwardErrorResponse(c, false) - require.False(t, wrote) + require.True(t, wrote, "must attempt to communicate the failure to the client via SSE") + // 状态码改不了(headers 已 flush),但 body 应该追加 SSE 错误事件。 require.Equal(t, http.StatusTeapot, w.Code) - assert.Equal(t, "already written", w.Body.String()) + assert.Contains(t, w.Body.String(), "already written") + // 非 /responses 路径走 legacy event: error 分支。 + assert.Contains(t, w.Body.String(), "event: error\n") +} + +// case B 回归测试:/responses 路径,Writer 已被写过(模拟 ping flushed), +// ensureForwardErrorResponse 必须发 response.failed,让 Codex 收到合规终止事件。 +func TestOpenAIEnsureForwardErrorResponse_ResponsesRouteAfterWrittenEmitsResponseFailed(t *testing.T) { + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodPost, EndpointResponses, nil) + // 模拟 ping 已 flush 的状态:Writer 已写过 1 个字节 + _, _ = c.Writer.WriteString(":\n\n") + + h := &OpenAIGatewayHandler{} + wrote := h.ensureForwardErrorResponse(c, false) + + require.True(t, wrote) + body := w.Body.String() + assert.Contains(t, body, ":\n\n", "earlier ping bytes preserved") + assert.Contains(t, body, "event: response.failed\n", "appended a Responses terminal event") + assert.Contains(t, body, `"type":"response.failed"`) + assert.Contains(t, body, `"code":"upstream_error"`) + assert.Contains(t, body, "Upstream request failed") } func TestShouldLogOpenAIForwardFailureAsWarn(t *testing.T) { @@ -266,7 +295,9 @@ func TestOpenAIRecoverResponsesPanic_NoPanicNoWrite(t *testing.T) { assert.Equal(t, "", w.Body.String()) } -func TestOpenAIRecoverResponsesPanic_DoesNotOverrideWrittenResponse(t *testing.T) { +// Panic 在已 flush 的 /v1/responses 流中:状态码无法改(已 written), +// 但 body 应追加 response.failed 让客户端识别为合规截断而不是 silent EOF。 +func TestOpenAIRecoverResponsesPanic_AppendsResponseFailedAfterWritten(t *testing.T) { gin.SetMode(gin.TestMode) w := httptest.NewRecorder() @@ -284,7 +315,9 @@ func TestOpenAIRecoverResponsesPanic_DoesNotOverrideWrittenResponse(t *testing.T }) require.Equal(t, http.StatusTeapot, w.Code) - assert.Equal(t, "already written", w.Body.String()) + body := w.Body.String() + assert.Contains(t, body, "already written") + assert.Contains(t, body, "event: response.failed\n") } func TestOpenAIMissingResponsesDependencies(t *testing.T) {