diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 4420a87c..51c2d94e 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -1420,6 +1420,14 @@ func (h *GatewayHandler) mapUpstreamError(statusCode int) (int, string, string) // handleStreamingAwareError handles errors that may occur after streaming has started func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, errType, message string, streamStarted bool) { if streamStarted { + // /v1/responses 的严格 SDK(Codex CLI)要求终止事件必须属于 + // response.completed/failed/incomplete/cancelled 集合。 + // Anthropic-backed Responses 路径同样会因为通用 error 帧被拒。 + if inboundIsResponses(c) { + if writeResponsesFailedSSE(c, errType, message) { + return + } + } // Stream already started, send error as SSE event then close flusher, ok := c.Writer.(http.Flusher) if ok { @@ -1438,10 +1446,16 @@ func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, e } // ensureForwardErrorResponse 在 Forward 返回错误但尚未写响应时补写统一错误响应。 +// Writer 已被写过时(ping 已 flush)走 streamStarted 分支, +// 让 handleStreamingAwareError 通过 SSE 发协议合规的终止事件, +// 否则下游收到的就是 silent EOF。 func (h *GatewayHandler) ensureForwardErrorResponse(c *gin.Context, streamStarted bool) bool { - if c == nil || c.Writer == nil || c.Writer.Written() { + if c == nil || c.Writer == nil { return false } + if c.Writer.Written() { + streamStarted = true + } h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed", streamStarted) return true } diff --git a/backend/internal/handler/gateway_handler_error_fallback_test.go b/backend/internal/handler/gateway_handler_error_fallback_test.go index 4fce5ec1..fe9e2ebf 100644 --- a/backend/internal/handler/gateway_handler_error_fallback_test.go +++ b/backend/internal/handler/gateway_handler_error_fallback_test.go @@ -33,7 +33,9 @@ func TestGatewayEnsureForwardErrorResponse_WritesFallbackWhenNotWritten(t *testi assert.Equal(t, "Upstream request failed", errorObj["message"]) } -func TestGatewayEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *testing.T) { +// Writer 已写后 ensureForwardErrorResponse 必须把错误以 SSE 形式追加, +// 而不是 silent EOF。非 /responses 路径走 legacy data:{"type":"error"} 分支。 +func TestGatewayEnsureForwardErrorResponse_AppendsSSEAfterWritten(t *testing.T) { gin.SetMode(gin.TestMode) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -43,7 +45,27 @@ func TestGatewayEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *tes h := &GatewayHandler{} wrote := h.ensureForwardErrorResponse(c, false) - require.False(t, wrote) + require.True(t, wrote) require.Equal(t, http.StatusTeapot, w.Code) - assert.Equal(t, "already written", w.Body.String()) + assert.Contains(t, w.Body.String(), "already written") + assert.Contains(t, w.Body.String(), `data: {"type":"error"`) +} + +// case B 回归:Anthropic-backed /responses,Writer 已被写过时 +// ensureForwardErrorResponse 仍要发 response.failed。 +func TestGatewayEnsureForwardErrorResponse_ResponsesRouteAfterWrittenEmitsResponseFailed(t *testing.T) { + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodPost, EndpointResponses, nil) + _, _ = c.Writer.WriteString(":\n\n") + + h := &GatewayHandler{} + wrote := h.ensureForwardErrorResponse(c, false) + + require.True(t, wrote) + body := w.Body.String() + assert.Contains(t, body, ":\n\n") + assert.Contains(t, body, "event: response.failed\n") + assert.Contains(t, body, `"type":"response.failed"`) } diff --git a/backend/internal/handler/openai_gateway_handler.go b/backend/internal/handler/openai_gateway_handler.go index 9c5560f5..5464d654 100644 --- a/backend/internal/handler/openai_gateway_handler.go +++ b/backend/internal/handler/openai_gateway_handler.go @@ -1691,6 +1691,15 @@ func (h *OpenAIGatewayHandler) mapUpstreamError(statusCode int) (int, string, st // handleStreamingAwareError handles errors that may occur after streaming has started func (h *OpenAIGatewayHandler) handleStreamingAwareError(c *gin.Context, status int, errType, message string, streamStarted bool) { if streamStarted { + // /v1/responses 的严格 SDK(Codex CLI)要求终止事件必须属于 + // response.completed/failed/incomplete/cancelled 集合。 + // 通用 `event: error` 帧不被识别为终止事件,会导致 + // "stream closed before response.completed"。 + if inboundIsResponses(c) { + if writeResponsesFailedSSE(c, errType, message) { + return + } + } // Stream already started, send error as SSE event then close flusher, ok := c.Writer.(http.Flusher) if ok { @@ -1710,9 +1719,17 @@ func (h *OpenAIGatewayHandler) handleStreamingAwareError(c *gin.Context, status // ensureForwardErrorResponse 在 Forward 返回错误但尚未写响应时补写统一错误响应。 func (h *OpenAIGatewayHandler) ensureForwardErrorResponse(c *gin.Context, streamStarted bool) bool { - if c == nil || c.Writer == nil || c.Writer.Written() { + if c == nil || c.Writer == nil { return false } + // 旧实现在 Writer.Written 时直接 return false,导致 ping 已 flush 之后的 + // 上游错误(http2 timeout、连接中断等)完全无法把错误传给客户端—— + // HTTP 200 已锁死,TCP 直接 EOF,Codex CLI 报 "stream closed before response.completed"。 + // 这里改成:Writer 已写过时强制走 streamStarted 分支,让 + // handleStreamingAwareError 通过 SSE 发协议合规的 response.failed。 + if c.Writer.Written() { + streamStarted = true + } h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed", streamStarted) return true } diff --git a/backend/internal/handler/openai_gateway_handler_test.go b/backend/internal/handler/openai_gateway_handler_test.go index 6bddbce9..49b7b9ec 100644 --- a/backend/internal/handler/openai_gateway_handler_test.go +++ b/backend/internal/handler/openai_gateway_handler_test.go @@ -174,7 +174,11 @@ func TestOpenAIEnsureForwardErrorResponse_WritesFallbackWhenNotWritten(t *testin assert.Equal(t, "Upstream request failed", errorObj["message"]) } -func TestOpenAIEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *testing.T) { +// Writer 已写后 ensureForwardErrorResponse 必须仍然把错误信息以 SSE +// 形式追加给客户端(streamStarted 强制 true)。 +// 这是 case B 修复:旧实现遇到 Writer.Written 直接 return false, +// 客户端只能拿到 silent EOF;Codex CLI 报 "stream closed before response.completed"。 +func TestOpenAIEnsureForwardErrorResponse_AppendsSSEAfterWritten(t *testing.T) { gin.SetMode(gin.TestMode) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -184,9 +188,34 @@ func TestOpenAIEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *test h := &OpenAIGatewayHandler{} wrote := h.ensureForwardErrorResponse(c, false) - require.False(t, wrote) + require.True(t, wrote, "must attempt to communicate the failure to the client via SSE") + // 状态码改不了(headers 已 flush),但 body 应该追加 SSE 错误事件。 require.Equal(t, http.StatusTeapot, w.Code) - assert.Equal(t, "already written", w.Body.String()) + assert.Contains(t, w.Body.String(), "already written") + // 非 /responses 路径走 legacy event: error 分支。 + assert.Contains(t, w.Body.String(), "event: error\n") +} + +// case B 回归测试:/responses 路径,Writer 已被写过(模拟 ping flushed), +// ensureForwardErrorResponse 必须发 response.failed,让 Codex 收到合规终止事件。 +func TestOpenAIEnsureForwardErrorResponse_ResponsesRouteAfterWrittenEmitsResponseFailed(t *testing.T) { + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodPost, EndpointResponses, nil) + // 模拟 ping 已 flush 的状态:Writer 已写过 1 个字节 + _, _ = c.Writer.WriteString(":\n\n") + + h := &OpenAIGatewayHandler{} + wrote := h.ensureForwardErrorResponse(c, false) + + require.True(t, wrote) + body := w.Body.String() + assert.Contains(t, body, ":\n\n", "earlier ping bytes preserved") + assert.Contains(t, body, "event: response.failed\n", "appended a Responses terminal event") + assert.Contains(t, body, `"type":"response.failed"`) + assert.Contains(t, body, `"code":"upstream_error"`) + assert.Contains(t, body, "Upstream request failed") } func TestShouldLogOpenAIForwardFailureAsWarn(t *testing.T) { @@ -266,7 +295,9 @@ func TestOpenAIRecoverResponsesPanic_NoPanicNoWrite(t *testing.T) { assert.Equal(t, "", w.Body.String()) } -func TestOpenAIRecoverResponsesPanic_DoesNotOverrideWrittenResponse(t *testing.T) { +// Panic 在已 flush 的 /v1/responses 流中:状态码无法改(已 written), +// 但 body 应追加 response.failed 让客户端识别为合规截断而不是 silent EOF。 +func TestOpenAIRecoverResponsesPanic_AppendsResponseFailedAfterWritten(t *testing.T) { gin.SetMode(gin.TestMode) w := httptest.NewRecorder() @@ -284,7 +315,9 @@ func TestOpenAIRecoverResponsesPanic_DoesNotOverrideWrittenResponse(t *testing.T }) require.Equal(t, http.StatusTeapot, w.Code) - assert.Equal(t, "already written", w.Body.String()) + body := w.Body.String() + assert.Contains(t, body, "already written") + assert.Contains(t, body, "event: response.failed\n") } func TestOpenAIMissingResponsesDependencies(t *testing.T) { diff --git a/backend/internal/handler/stream_error_event.go b/backend/internal/handler/stream_error_event.go new file mode 100644 index 00000000..fc5bf61d --- /dev/null +++ b/backend/internal/handler/stream_error_event.go @@ -0,0 +1,141 @@ +package handler + +import ( + "fmt" + "net/http" + "strconv" + "strings" + + "github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey" + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +// writeResponsesFailedSSE emits a `response.failed` SSE event in the OpenAI +// Responses API protocol after the stream has already started. +// +// 必要性:一旦 SSE 头和任意数据(例如等待槽位时的 ping comment)已经 flush, +// HTTP 200 状态码就被固化。此后若网关需要回报错误,只能继续通过 SSE 事件传达。 +// 通用的 `event: error` 帧不是 Responses 协议规定的终止事件, +// Codex CLI 等严格 SDK 会因为没收到 `response.completed/failed/incomplete/cancelled` +// 而抛出 "stream closed before response.completed"。 +// +// 字段集对齐 apicompat.makeResponsesCompletedEvent:id/object/model/status/output/error。 +// 故意不写 sequence_number:本函数被调用时无法可靠拿到当前流的 last sequence, +// 而 OpenAI spec 将 sequence_number 设为可选;省略避免破坏单调性约束。 +// +// 返回 true 表示已尝试 SSE 写出(不论 Write 是否成功,caller 都应直接 return)。 +// 返回 false 表示 writer 不支持 Flusher,无法以 SSE 形式回报错误; +// 此时 caller 也无法回退到 JSON(HTTP 200 已固化),通常意味着连接已经损坏, +// 应当让请求处理函数 return,由上层关闭连接。 +func writeResponsesFailedSSE(c *gin.Context, errType, message string) bool { + flusher, ok := c.Writer.(http.Flusher) + if !ok { + return false + } + rid := synthesizeResponseID(c) + model := requestModel(c) + code := mapResponsesErrorCode(errType) + + var b strings.Builder + b.Grow(256 + len(message) + len(model)) + b.WriteString(`{"type":"response.failed","response":{`) + b.WriteString(`"id":`) + b.WriteString(strconv.Quote(rid)) + b.WriteString(`,"object":"response"`) + if model != "" { + b.WriteString(`,"model":`) + b.WriteString(strconv.Quote(model)) + } + b.WriteString(`,"status":"failed","output":[],"error":{"code":`) + b.WriteString(strconv.Quote(code)) + b.WriteString(`,"message":`) + b.WriteString(strconv.Quote(message)) + b.WriteString(`}}}`) + + if _, err := fmt.Fprintf(c.Writer, "event: response.failed\ndata: %s\n\n", b.String()); err != nil { + _ = c.Error(err) + return true + } + flusher.Flush() + return true +} + +// inboundIsResponses 判断当前请求是否落在任何 /responses 路由上。 +// +// 不能直接用 GetInboundEndpoint(c) == EndpointResponses 比较,因为 +// NormalizeInboundEndpoint 只识别包含 "/v1/responses" 子串的路径; +// 项目里实际注册了多组路由(gateway_v1、top-level bare、codex direct), +// 其中 r.POST("/responses", ...) 和 codexDirect.POST("/responses", ...) +// 的 c.FullPath() 不含 "/v1/" 前缀,会被归一化为原始路径, +// 导致协议合规终止事件没法发出去。 +// +// 这里用 FullPath 的后缀判断,覆盖所有变体: +// - /v1/responses +// - /v1/responses/compact +// - /responses +// - /responses/compact +// - /backend-api/codex/responses +// - /backend-api/codex/responses/compact +func inboundIsResponses(c *gin.Context) bool { + if c == nil { + return false + } + p := strings.TrimRight(c.FullPath(), "/") + if p == "" && c.Request != nil && c.Request.URL != nil { + p = strings.TrimRight(c.Request.URL.Path, "/") + } + if p == "" { + return false + } + return strings.HasSuffix(p, "/responses") || strings.Contains(p, "/responses/") +} + +// synthesizeResponseID 为合成的 response.failed 事件生成一个稳定的 id。 +// 优先复用 server 端生成的 request_id(存在 request.Context 里,由 request_logger 写入), +// 以便客户端报错能与 server 日志关联;缺失时回退 uuid。 +func synthesizeResponseID(c *gin.Context) string { + if c != nil && c.Request != nil { + if rid, ok := c.Request.Context().Value(ctxkey.RequestID).(string); ok { + if rid = strings.TrimSpace(rid); rid != "" { + return "resp_" + strings.ReplaceAll(rid, "-", "") + } + } + } + return "resp_" + strings.ReplaceAll(uuid.NewString(), "-", "") +} + +// requestModel 取当前请求的 inbound model(由 setOpsRequestContext 写入)。 +// 缺失时返回 "";caller 据此决定是否忽略该字段。 +func requestModel(c *gin.Context) string { + if c == nil { + return "" + } + if v, ok := c.Get(opsModelKey); ok { + if s, ok := v.(string); ok { + return strings.TrimSpace(s) + } + } + return "" +} + +// mapResponsesErrorCode 把内部 errType 映射为 Responses 协议常见的 error.code。 +// 无明确映射时原样返回,保证至少可读。 +func mapResponsesErrorCode(errType string) string { + switch errType { + case "rate_limit_error": + return "rate_limit_exceeded" + case "invalid_request_error": + return "invalid_request" + case "permission_error": + return "permission_denied" + case "authentication_error": + return "authentication_failed" + case "upstream_error": + return "upstream_error" + case "server_error", "api_error", "": + return "server_error" + default: + return errType + } +} diff --git a/backend/internal/handler/stream_error_event_test.go b/backend/internal/handler/stream_error_event_test.go new file mode 100644 index 00000000..721b5856 --- /dev/null +++ b/backend/internal/handler/stream_error_event_test.go @@ -0,0 +1,253 @@ +package handler + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey" + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Regression for the production incident on 2026-05-24 around 9:13 CST: +// user 16 sent /v1/responses with stream:true via Codex CLI; the user-concurrency +// slot wait sent SSE ping comments (flushing HTTP 200 + headers), then the 30s +// timeout fired and the handler emitted `event: error\ndata: {...}`. Codex CLI +// does not recognize that as a Responses terminal event and reports +// "stream closed before response.completed". The fix is to emit a synthetic +// response.failed event when the inbound endpoint is /v1/responses. + +func newGinContextForEndpoint(t *testing.T, endpoint string) (*gin.Context, *httptest.ResponseRecorder) { + t.Helper() + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodPost, endpoint, nil) + return c, w +} + +// parseResponsesFailedSSE 抽出 SSE 中 data 行的 JSON,返回 (response 对象, error 对象)。 +func parseResponsesFailedSSE(t *testing.T, body string) (map[string]any, map[string]any) { + t.Helper() + require.True(t, strings.HasPrefix(body, "event: response.failed\n"), + "expect event: response.failed prefix, got: %q", body) + require.True(t, strings.HasSuffix(body, "\n\n")) + + lines := strings.SplitN(strings.TrimSuffix(body, "\n\n"), "\n", 2) + require.Len(t, lines, 2) + require.True(t, strings.HasPrefix(lines[1], "data: ")) + jsonStr := strings.TrimPrefix(lines[1], "data: ") + + var parsed map[string]any + require.NoError(t, json.Unmarshal([]byte(jsonStr), &parsed), "data must be valid JSON: %s", jsonStr) + + assert.Equal(t, "response.failed", parsed["type"]) + // 故意不发 sequence_number,避免与后续真实事件的序号冲突。 + _, hasSeq := parsed["sequence_number"] + assert.False(t, hasSeq, "synthetic event must not emit sequence_number") + + resp, ok := parsed["response"].(map[string]any) + require.True(t, ok, "response object missing") + assert.Equal(t, "response", resp["object"]) + assert.Equal(t, "failed", resp["status"]) + + errObj, ok := resp["error"].(map[string]any) + require.True(t, ok, "error object missing") + + return resp, errObj +} + +// OpenAI handler: /v1/responses streaming, after stream started, must emit response.failed. +func TestOpenAIHandleStreamingAwareError_ResponsesStreamingEmitsResponseFailed(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointResponses) + h := &OpenAIGatewayHandler{} + h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", + "Concurrency limit exceeded for user, please retry later", true) + + resp, errObj := parseResponsesFailedSSE(t, w.Body.String()) + + id, _ := resp["id"].(string) + assert.True(t, strings.HasPrefix(id, "resp_"), "id should start with resp_, got %q", id) + assert.Equal(t, "rate_limit_exceeded", errObj["code"]) + assert.Equal(t, "Concurrency limit exceeded for user, please retry later", errObj["message"]) +} + +// 当 setOpsRequestContext 写过 model,合成事件应回填该字段(与 codebase 已有 makeResponsesCompletedEvent 对齐)。 +func TestOpenAIHandleStreamingAwareError_ResponsesStreamingIncludesModel(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointResponses) + setOpsRequestContext(c, "gpt-5.5", true) + + h := &OpenAIGatewayHandler{} + h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "boom", true) + + resp, _ := parseResponsesFailedSSE(t, w.Body.String()) + assert.Equal(t, "gpt-5.5", resp["model"]) +} + +// 没有 model 时 model 字段不应出现(避免发空字符串污染下游解析)。 +func TestOpenAIHandleStreamingAwareError_ResponsesStreamingOmitsEmptyModel(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointResponses) + h := &OpenAIGatewayHandler{} + h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "boom", true) + + resp, _ := parseResponsesFailedSSE(t, w.Body.String()) + _, hasModel := resp["model"] + assert.False(t, hasModel, "model field must be omitted when unknown") +} + +// 当 request.Context 携带 ctxkey.RequestID 时,合成 id 应与之关联,便于和 server log 串起来。 +func TestOpenAIHandleStreamingAwareError_ResponsesStreamingReusesRequestID(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointResponses) + c.Request = c.Request.WithContext( + context.WithValue(c.Request.Context(), ctxkey.RequestID, "fd277bc5-ff7e-45d1-8aa9-f54e1df318f1"), + ) + + h := &OpenAIGatewayHandler{} + h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "x", true) + + resp, _ := parseResponsesFailedSSE(t, w.Body.String()) + assert.Equal(t, "resp_fd277bc5ff7e45d18aa9f54e1df318f1", resp["id"]) +} + +// 与旧分支的 TestOpenAIHandleStreamingAwareError_JSONEscaping 对齐: +// 新的 response.failed payload 也必须正确转义 message 里的特殊字符, +// 否则下游 SDK 解析 JSON 时会失败。 +func TestOpenAIHandleStreamingAwareError_ResponsesStreamingJSONEscaping(t *testing.T) { + cases := []struct { + name string + errType string + message string + }{ + {"双引号", "server_error", `upstream returned "invalid" response`}, + {"反斜杠", "server_error", `path C:\Users\test\file.txt not found`}, + {"双引号+反斜杠", "upstream_error", `error parsing "key\value": unexpected token`}, + {"换行与制表", "server_error", "line1\nline2\ttab"}, + {"普通", "upstream_error", "Upstream service temporarily unavailable"}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointResponses) + h := &OpenAIGatewayHandler{} + h.handleStreamingAwareError(c, http.StatusBadGateway, tc.errType, tc.message, true) + + _, errObj := parseResponsesFailedSSE(t, w.Body.String()) + assert.Equal(t, tc.message, errObj["message"], "message 必须被原样还原") + }) + } +} + +// OpenAI handler: /v1/chat/completions streaming keeps the legacy event: error format +// (out of scope for this fix; covered to prevent regression of unrelated paths). +func TestOpenAIHandleStreamingAwareError_ChatCompletionsStreamingKeepsLegacy(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointChatCompletions) + h := &OpenAIGatewayHandler{} + h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "boom", true) + + body := w.Body.String() + assert.True(t, strings.HasPrefix(body, "event: error\n"), "got: %q", body) +} + +// Gateway (Anthropic-backed) handler: /v1/responses path also must emit response.failed. +func TestGatewayHandleStreamingAwareError_ResponsesStreamingEmitsResponseFailed(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointResponses) + h := &GatewayHandler{} + h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "upstream gone", true) + + _, errObj := parseResponsesFailedSSE(t, w.Body.String()) + assert.Equal(t, "upstream_error", errObj["code"]) + assert.Equal(t, "upstream gone", errObj["message"]) +} + +// Gateway handler: /v1/messages preserves the legacy data:{type:error,...} format +// (Anthropic spec accepts a type:"error" stream event). +func TestGatewayHandleStreamingAwareError_MessagesStreamingKeepsLegacy(t *testing.T) { + c, w := newGinContextForEndpoint(t, EndpointMessages) + h := &GatewayHandler{} + h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "boom", true) + + body := w.Body.String() + assert.True(t, strings.HasPrefix(body, `data: {"type":"error"`), "got: %q", body) +} + +// 项目里 /responses 注册在多组路由:/v1/responses(gateway)、裸 /responses(top-level)、 +// /backend-api/codex/responses(codex direct)。我们 fix 必须覆盖全部, +// 否则一些客户端走的路径就不会发 response.failed,照样报 stream closed。 +// 这是生产 2026-05-24 ~11:05 UTC user 16 实际命中的 bug。 +func TestInboundIsResponses_CoversAllRoutes(t *testing.T) { + cases := []struct { + route string + want bool + }{ + {"/v1/responses", true}, + {"/v1/responses/compact", true}, + {"/responses", true}, // <-- 用户 16 实际走这条 + {"/responses/compact", true}, + {"/backend-api/codex/responses", true}, + {"/backend-api/codex/responses/compact", true}, + {"/v1/chat/completions", false}, + {"/v1/messages", false}, + {"/", false}, + {"/responses-fake", false}, + } + for _, tc := range cases { + t.Run(tc.route, func(t *testing.T) { + c, _ := newGinContextForEndpoint(t, tc.route) + assert.Equal(t, tc.want, inboundIsResponses(c), "route=%q", tc.route) + }) + } +} + +// 用 c.Request.URL.Path 作为 fallback(当 c.FullPath() 为空时,例如某些测试 fixture)。 +func TestInboundIsResponses_FallsBackToURLPath(t *testing.T) { + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodPost, "/responses", nil) + // 这种情况下 c.FullPath() 是 "",必须 fallback 到 URL.Path + assert.True(t, inboundIsResponses(c), "URL.Path fallback must work when FullPath is empty") +} + +// 回归生产事故:用户 16 走 /responses 路径,必须发 response.failed。 +func TestOpenAIHandleStreamingAwareError_BareResponsesRouteEmitsResponseFailed(t *testing.T) { + c, w := newGinContextForEndpoint(t, "/responses") + h := &OpenAIGatewayHandler{} + h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", + "Concurrency limit exceeded for user, please retry later", true) + + resp, errObj := parseResponsesFailedSSE(t, w.Body.String()) + id, _ := resp["id"].(string) + assert.True(t, strings.HasPrefix(id, "resp_")) + assert.Equal(t, "rate_limit_exceeded", errObj["code"]) +} + +// Synthesized response.failed id falls back to uuid when no request_id is present. +func TestSynthesizeResponseID_FallbackUUID(t *testing.T) { + c, _ := newGinContextForEndpoint(t, EndpointResponses) + id := synthesizeResponseID(c) + assert.True(t, strings.HasPrefix(id, "resp_")) + // uuid 去掉短横线后 32 hex 字符;前缀 "resp_" 共 37。 + assert.Len(t, id, 37) +} + +func TestMapResponsesErrorCode(t *testing.T) { + cases := []struct{ in, out string }{ + {"rate_limit_error", "rate_limit_exceeded"}, + {"invalid_request_error", "invalid_request"}, + {"permission_error", "permission_denied"}, + {"authentication_error", "authentication_failed"}, + {"upstream_error", "upstream_error"}, + {"server_error", "server_error"}, + {"api_error", "server_error"}, + {"", "server_error"}, + {"custom_thing", "custom_thing"}, + } + for _, tc := range cases { + assert.Equal(t, tc.out, mapResponsesErrorCode(tc.in), "in=%q", tc.in) + } +}