Merge pull request #2732 from wminjay/fix/responses-stream-failed-event

fix(openai): emit response.failed when /v1/responses SSE aborted post-flush
This commit is contained in:
Wesley Liddick 2026-05-25 18:12:25 +08:00 committed by GitHub
commit a18738b29e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 490 additions and 10 deletions

View File

@ -1420,6 +1420,14 @@ func (h *GatewayHandler) mapUpstreamError(statusCode int) (int, string, string)
// handleStreamingAwareError handles errors that may occur after streaming has started
func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, errType, message string, streamStarted bool) {
if streamStarted {
// /v1/responses 的严格 SDKCodex CLI要求终止事件必须属于
// response.completed/failed/incomplete/cancelled 集合。
// Anthropic-backed Responses 路径同样会因为通用 error 帧被拒。
if inboundIsResponses(c) {
if writeResponsesFailedSSE(c, errType, message) {
return
}
}
// Stream already started, send error as SSE event then close
flusher, ok := c.Writer.(http.Flusher)
if ok {
@ -1438,10 +1446,16 @@ func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, e
}
// ensureForwardErrorResponse 在 Forward 返回错误但尚未写响应时补写统一错误响应。
// Writer 已被写过时ping 已 flush走 streamStarted 分支,
// 让 handleStreamingAwareError 通过 SSE 发协议合规的终止事件,
// 否则下游收到的就是 silent EOF。
func (h *GatewayHandler) ensureForwardErrorResponse(c *gin.Context, streamStarted bool) bool {
if c == nil || c.Writer == nil || c.Writer.Written() {
if c == nil || c.Writer == nil {
return false
}
if c.Writer.Written() {
streamStarted = true
}
h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed", streamStarted)
return true
}

View File

@ -33,7 +33,9 @@ func TestGatewayEnsureForwardErrorResponse_WritesFallbackWhenNotWritten(t *testi
assert.Equal(t, "Upstream request failed", errorObj["message"])
}
func TestGatewayEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *testing.T) {
// Writer 已写后 ensureForwardErrorResponse 必须把错误以 SSE 形式追加,
// 而不是 silent EOF。非 /responses 路径走 legacy data:{"type":"error"} 分支。
func TestGatewayEnsureForwardErrorResponse_AppendsSSEAfterWritten(t *testing.T) {
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@ -43,7 +45,27 @@ func TestGatewayEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *tes
h := &GatewayHandler{}
wrote := h.ensureForwardErrorResponse(c, false)
require.False(t, wrote)
require.True(t, wrote)
require.Equal(t, http.StatusTeapot, w.Code)
assert.Equal(t, "already written", w.Body.String())
assert.Contains(t, w.Body.String(), "already written")
assert.Contains(t, w.Body.String(), `data: {"type":"error"`)
}
// case B 回归Anthropic-backed /responsesWriter 已被写过时
// ensureForwardErrorResponse 仍要发 response.failed。
func TestGatewayEnsureForwardErrorResponse_ResponsesRouteAfterWrittenEmitsResponseFailed(t *testing.T) {
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodPost, EndpointResponses, nil)
_, _ = c.Writer.WriteString(":\n\n")
h := &GatewayHandler{}
wrote := h.ensureForwardErrorResponse(c, false)
require.True(t, wrote)
body := w.Body.String()
assert.Contains(t, body, ":\n\n")
assert.Contains(t, body, "event: response.failed\n")
assert.Contains(t, body, `"type":"response.failed"`)
}

View File

@ -1691,6 +1691,15 @@ func (h *OpenAIGatewayHandler) mapUpstreamError(statusCode int) (int, string, st
// handleStreamingAwareError handles errors that may occur after streaming has started
func (h *OpenAIGatewayHandler) handleStreamingAwareError(c *gin.Context, status int, errType, message string, streamStarted bool) {
if streamStarted {
// /v1/responses 的严格 SDKCodex CLI要求终止事件必须属于
// response.completed/failed/incomplete/cancelled 集合。
// 通用 `event: error` 帧不被识别为终止事件,会导致
// "stream closed before response.completed"。
if inboundIsResponses(c) {
if writeResponsesFailedSSE(c, errType, message) {
return
}
}
// Stream already started, send error as SSE event then close
flusher, ok := c.Writer.(http.Flusher)
if ok {
@ -1710,9 +1719,17 @@ func (h *OpenAIGatewayHandler) handleStreamingAwareError(c *gin.Context, status
// ensureForwardErrorResponse 在 Forward 返回错误但尚未写响应时补写统一错误响应。
func (h *OpenAIGatewayHandler) ensureForwardErrorResponse(c *gin.Context, streamStarted bool) bool {
if c == nil || c.Writer == nil || c.Writer.Written() {
if c == nil || c.Writer == nil {
return false
}
// 旧实现在 Writer.Written 时直接 return false导致 ping 已 flush 之后的
// 上游错误http2 timeout、连接中断等完全无法把错误传给客户端——
// HTTP 200 已锁死TCP 直接 EOFCodex CLI 报 "stream closed before response.completed"。
// 这里改成Writer 已写过时强制走 streamStarted 分支,让
// handleStreamingAwareError 通过 SSE 发协议合规的 response.failed。
if c.Writer.Written() {
streamStarted = true
}
h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed", streamStarted)
return true
}

View File

@ -174,7 +174,11 @@ func TestOpenAIEnsureForwardErrorResponse_WritesFallbackWhenNotWritten(t *testin
assert.Equal(t, "Upstream request failed", errorObj["message"])
}
func TestOpenAIEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *testing.T) {
// Writer 已写后 ensureForwardErrorResponse 必须仍然把错误信息以 SSE
// 形式追加给客户端streamStarted 强制 true
// 这是 case B 修复:旧实现遇到 Writer.Written 直接 return false
// 客户端只能拿到 silent EOFCodex CLI 报 "stream closed before response.completed"。
func TestOpenAIEnsureForwardErrorResponse_AppendsSSEAfterWritten(t *testing.T) {
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@ -184,9 +188,34 @@ func TestOpenAIEnsureForwardErrorResponse_DoesNotOverrideWrittenResponse(t *test
h := &OpenAIGatewayHandler{}
wrote := h.ensureForwardErrorResponse(c, false)
require.False(t, wrote)
require.True(t, wrote, "must attempt to communicate the failure to the client via SSE")
// 状态码改不了headers 已 flush但 body 应该追加 SSE 错误事件。
require.Equal(t, http.StatusTeapot, w.Code)
assert.Equal(t, "already written", w.Body.String())
assert.Contains(t, w.Body.String(), "already written")
// 非 /responses 路径走 legacy event: error 分支。
assert.Contains(t, w.Body.String(), "event: error\n")
}
// case B 回归测试:/responses 路径Writer 已被写过(模拟 ping flushed
// ensureForwardErrorResponse 必须发 response.failed让 Codex 收到合规终止事件。
func TestOpenAIEnsureForwardErrorResponse_ResponsesRouteAfterWrittenEmitsResponseFailed(t *testing.T) {
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodPost, EndpointResponses, nil)
// 模拟 ping 已 flush 的状态Writer 已写过 1 个字节
_, _ = c.Writer.WriteString(":\n\n")
h := &OpenAIGatewayHandler{}
wrote := h.ensureForwardErrorResponse(c, false)
require.True(t, wrote)
body := w.Body.String()
assert.Contains(t, body, ":\n\n", "earlier ping bytes preserved")
assert.Contains(t, body, "event: response.failed\n", "appended a Responses terminal event")
assert.Contains(t, body, `"type":"response.failed"`)
assert.Contains(t, body, `"code":"upstream_error"`)
assert.Contains(t, body, "Upstream request failed")
}
func TestShouldLogOpenAIForwardFailureAsWarn(t *testing.T) {
@ -266,7 +295,9 @@ func TestOpenAIRecoverResponsesPanic_NoPanicNoWrite(t *testing.T) {
assert.Equal(t, "", w.Body.String())
}
func TestOpenAIRecoverResponsesPanic_DoesNotOverrideWrittenResponse(t *testing.T) {
// Panic 在已 flush 的 /v1/responses 流中:状态码无法改(已 written
// 但 body 应追加 response.failed 让客户端识别为合规截断而不是 silent EOF。
func TestOpenAIRecoverResponsesPanic_AppendsResponseFailedAfterWritten(t *testing.T) {
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@ -284,7 +315,9 @@ func TestOpenAIRecoverResponsesPanic_DoesNotOverrideWrittenResponse(t *testing.T
})
require.Equal(t, http.StatusTeapot, w.Code)
assert.Equal(t, "already written", w.Body.String())
body := w.Body.String()
assert.Contains(t, body, "already written")
assert.Contains(t, body, "event: response.failed\n")
}
func TestOpenAIMissingResponsesDependencies(t *testing.T) {

View File

@ -0,0 +1,141 @@
package handler
import (
"fmt"
"net/http"
"strconv"
"strings"
"github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
// writeResponsesFailedSSE emits a `response.failed` SSE event in the OpenAI
// Responses API protocol after the stream has already started.
//
// 必要性:一旦 SSE 头和任意数据(例如等待槽位时的 ping comment已经 flush
// HTTP 200 状态码就被固化。此后若网关需要回报错误,只能继续通过 SSE 事件传达。
// 通用的 `event: error` 帧不是 Responses 协议规定的终止事件,
// Codex CLI 等严格 SDK 会因为没收到 `response.completed/failed/incomplete/cancelled`
// 而抛出 "stream closed before response.completed"。
//
// 字段集对齐 apicompat.makeResponsesCompletedEventid/object/model/status/output/error。
// 故意不写 sequence_number本函数被调用时无法可靠拿到当前流的 last sequence
// 而 OpenAI spec 将 sequence_number 设为可选;省略避免破坏单调性约束。
//
// 返回 true 表示已尝试 SSE 写出(不论 Write 是否成功caller 都应直接 return
// 返回 false 表示 writer 不支持 Flusher无法以 SSE 形式回报错误;
// 此时 caller 也无法回退到 JSONHTTP 200 已固化),通常意味着连接已经损坏,
// 应当让请求处理函数 return由上层关闭连接。
func writeResponsesFailedSSE(c *gin.Context, errType, message string) bool {
flusher, ok := c.Writer.(http.Flusher)
if !ok {
return false
}
rid := synthesizeResponseID(c)
model := requestModel(c)
code := mapResponsesErrorCode(errType)
var b strings.Builder
b.Grow(256 + len(message) + len(model))
b.WriteString(`{"type":"response.failed","response":{`)
b.WriteString(`"id":`)
b.WriteString(strconv.Quote(rid))
b.WriteString(`,"object":"response"`)
if model != "" {
b.WriteString(`,"model":`)
b.WriteString(strconv.Quote(model))
}
b.WriteString(`,"status":"failed","output":[],"error":{"code":`)
b.WriteString(strconv.Quote(code))
b.WriteString(`,"message":`)
b.WriteString(strconv.Quote(message))
b.WriteString(`}}}`)
if _, err := fmt.Fprintf(c.Writer, "event: response.failed\ndata: %s\n\n", b.String()); err != nil {
_ = c.Error(err)
return true
}
flusher.Flush()
return true
}
// inboundIsResponses 判断当前请求是否落在任何 /responses 路由上。
//
// 不能直接用 GetInboundEndpoint(c) == EndpointResponses 比较,因为
// NormalizeInboundEndpoint 只识别包含 "/v1/responses" 子串的路径;
// 项目里实际注册了多组路由gateway_v1、top-level bare、codex direct
// 其中 r.POST("/responses", ...) 和 codexDirect.POST("/responses", ...)
// 的 c.FullPath() 不含 "/v1/" 前缀,会被归一化为原始路径,
// 导致协议合规终止事件没法发出去。
//
// 这里用 FullPath 的后缀判断,覆盖所有变体:
// - /v1/responses
// - /v1/responses/compact
// - /responses
// - /responses/compact
// - /backend-api/codex/responses
// - /backend-api/codex/responses/compact
func inboundIsResponses(c *gin.Context) bool {
if c == nil {
return false
}
p := strings.TrimRight(c.FullPath(), "/")
if p == "" && c.Request != nil && c.Request.URL != nil {
p = strings.TrimRight(c.Request.URL.Path, "/")
}
if p == "" {
return false
}
return strings.HasSuffix(p, "/responses") || strings.Contains(p, "/responses/")
}
// synthesizeResponseID 为合成的 response.failed 事件生成一个稳定的 id。
// 优先复用 server 端生成的 request_id存在 request.Context 里,由 request_logger 写入),
// 以便客户端报错能与 server 日志关联;缺失时回退 uuid。
func synthesizeResponseID(c *gin.Context) string {
if c != nil && c.Request != nil {
if rid, ok := c.Request.Context().Value(ctxkey.RequestID).(string); ok {
if rid = strings.TrimSpace(rid); rid != "" {
return "resp_" + strings.ReplaceAll(rid, "-", "")
}
}
}
return "resp_" + strings.ReplaceAll(uuid.NewString(), "-", "")
}
// requestModel 取当前请求的 inbound model由 setOpsRequestContext 写入)。
// 缺失时返回 ""caller 据此决定是否忽略该字段。
func requestModel(c *gin.Context) string {
if c == nil {
return ""
}
if v, ok := c.Get(opsModelKey); ok {
if s, ok := v.(string); ok {
return strings.TrimSpace(s)
}
}
return ""
}
// mapResponsesErrorCode 把内部 errType 映射为 Responses 协议常见的 error.code。
// 无明确映射时原样返回,保证至少可读。
func mapResponsesErrorCode(errType string) string {
switch errType {
case "rate_limit_error":
return "rate_limit_exceeded"
case "invalid_request_error":
return "invalid_request"
case "permission_error":
return "permission_denied"
case "authentication_error":
return "authentication_failed"
case "upstream_error":
return "upstream_error"
case "server_error", "api_error", "":
return "server_error"
default:
return errType
}
}

View File

@ -0,0 +1,253 @@
package handler
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// Regression for the production incident on 2026-05-24 around 9:13 CST:
// user 16 sent /v1/responses with stream:true via Codex CLI; the user-concurrency
// slot wait sent SSE ping comments (flushing HTTP 200 + headers), then the 30s
// timeout fired and the handler emitted `event: error\ndata: {...}`. Codex CLI
// does not recognize that as a Responses terminal event and reports
// "stream closed before response.completed". The fix is to emit a synthetic
// response.failed event when the inbound endpoint is /v1/responses.
func newGinContextForEndpoint(t *testing.T, endpoint string) (*gin.Context, *httptest.ResponseRecorder) {
t.Helper()
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodPost, endpoint, nil)
return c, w
}
// parseResponsesFailedSSE 抽出 SSE 中 data 行的 JSON返回 (response 对象, error 对象)。
func parseResponsesFailedSSE(t *testing.T, body string) (map[string]any, map[string]any) {
t.Helper()
require.True(t, strings.HasPrefix(body, "event: response.failed\n"),
"expect event: response.failed prefix, got: %q", body)
require.True(t, strings.HasSuffix(body, "\n\n"))
lines := strings.SplitN(strings.TrimSuffix(body, "\n\n"), "\n", 2)
require.Len(t, lines, 2)
require.True(t, strings.HasPrefix(lines[1], "data: "))
jsonStr := strings.TrimPrefix(lines[1], "data: ")
var parsed map[string]any
require.NoError(t, json.Unmarshal([]byte(jsonStr), &parsed), "data must be valid JSON: %s", jsonStr)
assert.Equal(t, "response.failed", parsed["type"])
// 故意不发 sequence_number避免与后续真实事件的序号冲突。
_, hasSeq := parsed["sequence_number"]
assert.False(t, hasSeq, "synthetic event must not emit sequence_number")
resp, ok := parsed["response"].(map[string]any)
require.True(t, ok, "response object missing")
assert.Equal(t, "response", resp["object"])
assert.Equal(t, "failed", resp["status"])
errObj, ok := resp["error"].(map[string]any)
require.True(t, ok, "error object missing")
return resp, errObj
}
// OpenAI handler: /v1/responses streaming, after stream started, must emit response.failed.
func TestOpenAIHandleStreamingAwareError_ResponsesStreamingEmitsResponseFailed(t *testing.T) {
c, w := newGinContextForEndpoint(t, EndpointResponses)
h := &OpenAIGatewayHandler{}
h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error",
"Concurrency limit exceeded for user, please retry later", true)
resp, errObj := parseResponsesFailedSSE(t, w.Body.String())
id, _ := resp["id"].(string)
assert.True(t, strings.HasPrefix(id, "resp_"), "id should start with resp_, got %q", id)
assert.Equal(t, "rate_limit_exceeded", errObj["code"])
assert.Equal(t, "Concurrency limit exceeded for user, please retry later", errObj["message"])
}
// 当 setOpsRequestContext 写过 model合成事件应回填该字段与 codebase 已有 makeResponsesCompletedEvent 对齐)。
func TestOpenAIHandleStreamingAwareError_ResponsesStreamingIncludesModel(t *testing.T) {
c, w := newGinContextForEndpoint(t, EndpointResponses)
setOpsRequestContext(c, "gpt-5.5", true)
h := &OpenAIGatewayHandler{}
h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "boom", true)
resp, _ := parseResponsesFailedSSE(t, w.Body.String())
assert.Equal(t, "gpt-5.5", resp["model"])
}
// 没有 model 时 model 字段不应出现(避免发空字符串污染下游解析)。
func TestOpenAIHandleStreamingAwareError_ResponsesStreamingOmitsEmptyModel(t *testing.T) {
c, w := newGinContextForEndpoint(t, EndpointResponses)
h := &OpenAIGatewayHandler{}
h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "boom", true)
resp, _ := parseResponsesFailedSSE(t, w.Body.String())
_, hasModel := resp["model"]
assert.False(t, hasModel, "model field must be omitted when unknown")
}
// 当 request.Context 携带 ctxkey.RequestID 时,合成 id 应与之关联,便于和 server log 串起来。
func TestOpenAIHandleStreamingAwareError_ResponsesStreamingReusesRequestID(t *testing.T) {
c, w := newGinContextForEndpoint(t, EndpointResponses)
c.Request = c.Request.WithContext(
context.WithValue(c.Request.Context(), ctxkey.RequestID, "fd277bc5-ff7e-45d1-8aa9-f54e1df318f1"),
)
h := &OpenAIGatewayHandler{}
h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "x", true)
resp, _ := parseResponsesFailedSSE(t, w.Body.String())
assert.Equal(t, "resp_fd277bc5ff7e45d18aa9f54e1df318f1", resp["id"])
}
// 与旧分支的 TestOpenAIHandleStreamingAwareError_JSONEscaping 对齐:
// 新的 response.failed payload 也必须正确转义 message 里的特殊字符,
// 否则下游 SDK 解析 JSON 时会失败。
func TestOpenAIHandleStreamingAwareError_ResponsesStreamingJSONEscaping(t *testing.T) {
cases := []struct {
name string
errType string
message string
}{
{"双引号", "server_error", `upstream returned "invalid" response`},
{"反斜杠", "server_error", `path C:\Users\test\file.txt not found`},
{"双引号+反斜杠", "upstream_error", `error parsing "key\value": unexpected token`},
{"换行与制表", "server_error", "line1\nline2\ttab"},
{"普通", "upstream_error", "Upstream service temporarily unavailable"},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
c, w := newGinContextForEndpoint(t, EndpointResponses)
h := &OpenAIGatewayHandler{}
h.handleStreamingAwareError(c, http.StatusBadGateway, tc.errType, tc.message, true)
_, errObj := parseResponsesFailedSSE(t, w.Body.String())
assert.Equal(t, tc.message, errObj["message"], "message 必须被原样还原")
})
}
}
// OpenAI handler: /v1/chat/completions streaming keeps the legacy event: error format
// (out of scope for this fix; covered to prevent regression of unrelated paths).
func TestOpenAIHandleStreamingAwareError_ChatCompletionsStreamingKeepsLegacy(t *testing.T) {
c, w := newGinContextForEndpoint(t, EndpointChatCompletions)
h := &OpenAIGatewayHandler{}
h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "boom", true)
body := w.Body.String()
assert.True(t, strings.HasPrefix(body, "event: error\n"), "got: %q", body)
}
// Gateway (Anthropic-backed) handler: /v1/responses path also must emit response.failed.
func TestGatewayHandleStreamingAwareError_ResponsesStreamingEmitsResponseFailed(t *testing.T) {
c, w := newGinContextForEndpoint(t, EndpointResponses)
h := &GatewayHandler{}
h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "upstream gone", true)
_, errObj := parseResponsesFailedSSE(t, w.Body.String())
assert.Equal(t, "upstream_error", errObj["code"])
assert.Equal(t, "upstream gone", errObj["message"])
}
// Gateway handler: /v1/messages preserves the legacy data:{type:error,...} format
// (Anthropic spec accepts a type:"error" stream event).
func TestGatewayHandleStreamingAwareError_MessagesStreamingKeepsLegacy(t *testing.T) {
c, w := newGinContextForEndpoint(t, EndpointMessages)
h := &GatewayHandler{}
h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", "boom", true)
body := w.Body.String()
assert.True(t, strings.HasPrefix(body, `data: {"type":"error"`), "got: %q", body)
}
// 项目里 /responses 注册在多组路由:/v1/responsesgateway、裸 /responsestop-level
// /backend-api/codex/responsescodex direct。我们 fix 必须覆盖全部,
// 否则一些客户端走的路径就不会发 response.failed照样报 stream closed。
// 这是生产 2026-05-24 ~11:05 UTC user 16 实际命中的 bug。
func TestInboundIsResponses_CoversAllRoutes(t *testing.T) {
cases := []struct {
route string
want bool
}{
{"/v1/responses", true},
{"/v1/responses/compact", true},
{"/responses", true}, // <-- 用户 16 实际走这条
{"/responses/compact", true},
{"/backend-api/codex/responses", true},
{"/backend-api/codex/responses/compact", true},
{"/v1/chat/completions", false},
{"/v1/messages", false},
{"/", false},
{"/responses-fake", false},
}
for _, tc := range cases {
t.Run(tc.route, func(t *testing.T) {
c, _ := newGinContextForEndpoint(t, tc.route)
assert.Equal(t, tc.want, inboundIsResponses(c), "route=%q", tc.route)
})
}
}
// 用 c.Request.URL.Path 作为 fallback当 c.FullPath() 为空时,例如某些测试 fixture
func TestInboundIsResponses_FallsBackToURLPath(t *testing.T) {
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodPost, "/responses", nil)
// 这种情况下 c.FullPath() 是 "",必须 fallback 到 URL.Path
assert.True(t, inboundIsResponses(c), "URL.Path fallback must work when FullPath is empty")
}
// 回归生产事故:用户 16 走 /responses 路径,必须发 response.failed。
func TestOpenAIHandleStreamingAwareError_BareResponsesRouteEmitsResponseFailed(t *testing.T) {
c, w := newGinContextForEndpoint(t, "/responses")
h := &OpenAIGatewayHandler{}
h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error",
"Concurrency limit exceeded for user, please retry later", true)
resp, errObj := parseResponsesFailedSSE(t, w.Body.String())
id, _ := resp["id"].(string)
assert.True(t, strings.HasPrefix(id, "resp_"))
assert.Equal(t, "rate_limit_exceeded", errObj["code"])
}
// Synthesized response.failed id falls back to uuid when no request_id is present.
func TestSynthesizeResponseID_FallbackUUID(t *testing.T) {
c, _ := newGinContextForEndpoint(t, EndpointResponses)
id := synthesizeResponseID(c)
assert.True(t, strings.HasPrefix(id, "resp_"))
// uuid 去掉短横线后 32 hex 字符;前缀 "resp_" 共 37。
assert.Len(t, id, 37)
}
func TestMapResponsesErrorCode(t *testing.T) {
cases := []struct{ in, out string }{
{"rate_limit_error", "rate_limit_exceeded"},
{"invalid_request_error", "invalid_request"},
{"permission_error", "permission_denied"},
{"authentication_error", "authentication_failed"},
{"upstream_error", "upstream_error"},
{"server_error", "server_error"},
{"api_error", "server_error"},
{"", "server_error"},
{"custom_thing", "custom_thing"},
}
for _, tc := range cases {
assert.Equal(t, tc.out, mapResponsesErrorCode(tc.in), "in=%q", tc.in)
}
}