Merge pull request #2530 from lyen1688/fix/openai-responses-sse-terminal

修复 OpenAI Responses SSE 终止事件识别
This commit is contained in:
Wesley Liddick 2026-05-17 16:20:04 +08:00 committed by GitHub
commit f5bd25bea0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 474 additions and 12 deletions

View File

@ -1360,6 +1360,135 @@ func TestForwardAsAnthropic_TerminalUsageWithoutUpstreamCloseReturns(t *testing.
}
}
func TestForwardAsAnthropic_EventNamedTerminalWithoutUpstreamCloseReturns(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Writer = &openAICompatFailingWriter{ResponseWriter: c.Writer, failAfter: 0}
body := []byte(`{"model":"gpt-5.4","max_tokens":16,"messages":[{"role":"user","content":"hello"}],"stream":true}`)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
upstreamBody := []byte(strings.Join([]string{
`event: response.completed`,
`data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}`,
``,
``,
}, "\n"))
upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody)
defer func() {
require.NoError(t, upstreamStream.Close())
}()
upstream := &httpUpstreamRecorder{resp: &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_messages_event_named_terminal"}},
Body: upstreamStream,
}}
svc := &OpenAIGatewayService{httpUpstream: upstream}
account := &Account{
ID: 1,
Name: "openai-oauth",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{
"access_token": "oauth-token",
"chatgpt_account_id": "chatgpt-acc",
},
}
type forwardResult struct {
result *OpenAIForwardResult
err error
}
resultCh := make(chan forwardResult, 1)
go func() {
result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.1")
resultCh <- forwardResult{result: result, err: err}
}()
select {
case got := <-resultCh:
require.NoError(t, got.err)
require.NotNil(t, got.result)
require.Equal(t, 15, got.result.Usage.InputTokens)
require.Equal(t, 6, got.result.Usage.OutputTokens)
require.Equal(t, 5, got.result.Usage.CacheReadInputTokens)
case <-time.After(time.Second):
require.Fail(t, "ForwardAsAnthropic should use SSE event names when data payloads omit type")
}
}
func TestForwardAsAnthropic_EventNamedTerminalWithKeepaliveReturns(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Writer = &openAICompatFailingWriter{ResponseWriter: c.Writer, failAfter: 0}
body := []byte(`{"model":"gpt-5.4","max_tokens":16,"messages":[{"role":"user","content":"hello"}],"stream":true}`)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
upstreamBody := []byte(strings.Join([]string{
`: upstream ping`,
``,
`event: response.completed`,
`data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}`,
``,
``,
}, "\n"))
upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody)
defer func() {
require.NoError(t, upstreamStream.Close())
}()
upstream := &httpUpstreamRecorder{resp: &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_messages_event_named_keepalive"}},
Body: upstreamStream,
}}
svc := &OpenAIGatewayService{
cfg: &config.Config{Gateway: config.GatewayConfig{
StreamKeepaliveInterval: 5,
}},
httpUpstream: upstream,
}
account := &Account{
ID: 1,
Name: "openai-oauth",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{
"access_token": "oauth-token",
"chatgpt_account_id": "chatgpt-acc",
},
}
type forwardResult struct {
result *OpenAIForwardResult
err error
}
resultCh := make(chan forwardResult, 1)
go func() {
result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.1")
resultCh <- forwardResult{result: result, err: err}
}()
select {
case got := <-resultCh:
require.NoError(t, got.err)
require.NotNil(t, got.result)
require.Equal(t, 15, got.result.Usage.InputTokens)
require.Equal(t, 6, got.result.Usage.OutputTokens)
require.Equal(t, 5, got.result.Usage.CacheReadInputTokens)
case <-time.After(time.Second):
require.Fail(t, "ForwardAsAnthropic keepalive path should use SSE event names when data payloads omit type")
}
}
func TestForwardAsAnthropic_BufferedTerminalWithoutUpstreamCloseReturns(t *testing.T) {
gin.SetMode(gin.TestMode)
@ -1416,6 +1545,67 @@ func TestForwardAsAnthropic_BufferedTerminalWithoutUpstreamCloseReturns(t *testi
}
}
func TestForwardAsAnthropic_BufferedEventNamedTerminalWithoutUpstreamCloseReturns(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
body := []byte(`{"model":"gpt-5.4","max_tokens":16,"messages":[{"role":"user","content":"hello"}],"stream":false}`)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
upstreamBody := []byte(strings.Join([]string{
`event: response.completed`,
`data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}`,
``,
``,
}, "\n"))
upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody)
defer func() {
require.NoError(t, upstreamStream.Close())
}()
upstream := &httpUpstreamRecorder{resp: &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_messages_buffered_event_named"}},
Body: upstreamStream,
}}
svc := &OpenAIGatewayService{httpUpstream: upstream}
account := &Account{
ID: 1,
Name: "openai-oauth",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{
"access_token": "oauth-token",
"chatgpt_account_id": "chatgpt-acc",
},
}
type forwardResult struct {
result *OpenAIForwardResult
err error
}
resultCh := make(chan forwardResult, 1)
go func() {
result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.1")
resultCh <- forwardResult{result: result, err: err}
}()
select {
case got := <-resultCh:
require.NoError(t, got.err)
require.NotNil(t, got.result)
require.Equal(t, 15, got.result.Usage.InputTokens)
require.Equal(t, 6, got.result.Usage.OutputTokens)
require.Equal(t, 5, got.result.Usage.CacheReadInputTokens)
require.Contains(t, rec.Body.String(), `"stop_reason":"end_turn"`)
case <-time.After(time.Second):
require.Fail(t, "ForwardAsAnthropic buffered response should use SSE event names when data payloads omit type")
}
}
func TestForwardAsAnthropic_DoneSentinelWithoutTerminalReturnsError(t *testing.T) {
gin.SetMode(gin.TestMode)

View File

@ -554,6 +554,13 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
missingTerminalErr := func() (*OpenAIForwardResult, error) {
return resultWithUsage(), fmt.Errorf("stream usage incomplete: missing terminal event")
}
processFrame := func(frame openAICompatSSEFrame) bool {
payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType)
if strings.TrimSpace(payload) == "[DONE]" {
return false
}
return processDataLine(payload)
}
// Determine keepalive interval
keepaliveInterval := time.Duration(0)
@ -563,16 +570,17 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
// No keepalive: fast synchronous path
if streamInterval <= 0 && keepaliveInterval <= 0 {
var parser openAICompatSSEFrameParser
for scanner.Scan() {
line := scanner.Text()
payload, ok := extractOpenAISSEDataLine(line)
frame, ok := parser.AddLine(line)
if !ok {
continue
}
if strings.TrimSpace(payload) == "[DONE]" {
if strings.TrimSpace(frame.Data) == "[DONE]" {
return missingTerminalErr()
}
if processDataLine(payload) {
if processFrame(frame) {
return finalizeStream()
}
}
@ -580,6 +588,14 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
handleScanErr(err)
return resultWithUsage(), fmt.Errorf("stream usage incomplete: %w", err)
}
if frame, ok := parser.Finish(); ok {
if strings.TrimSpace(frame.Data) == "[DONE]" {
return missingTerminalErr()
}
if processFrame(frame) {
return finalizeStream()
}
}
return missingTerminalErr()
}
@ -624,11 +640,20 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
keepaliveCh = keepaliveTicker.C
}
lastDataAt := time.Now()
var parser openAICompatSSEFrameParser
for {
select {
case ev, ok := <-events:
if !ok {
if frame, ok := parser.Finish(); ok {
if strings.TrimSpace(frame.Data) == "[DONE]" {
return missingTerminalErr()
}
if processFrame(frame) {
return finalizeStream()
}
}
return missingTerminalErr()
}
if ev.err != nil {
@ -637,14 +662,14 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
}
lastDataAt = time.Now()
line := ev.line
payload, ok := extractOpenAISSEDataLine(line)
frame, ok := parser.AddLine(line)
if !ok {
continue
}
if strings.TrimSpace(payload) == "[DONE]" {
if strings.TrimSpace(frame.Data) == "[DONE]" {
return missingTerminalErr()
}
if processDataLine(payload) {
if processFrame(frame) {
return finalizeStream()
}

View File

@ -236,6 +236,120 @@ func TestForwardAsChatCompletions_TerminalUsageWithoutUpstreamCloseReturns(t *te
}
}
func TestForwardAsChatCompletions_EventNamedTerminalWithoutUpstreamCloseReturns(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
body := []byte(`{"model":"gpt-5.4","messages":[{"role":"user","content":"hello"}],"stream":true}`)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
upstreamBody := []byte(strings.Join([]string{
`event: response.created`,
`data: {"response":{"id":"resp_1","model":"gpt-5.4","status":"in_progress","output":[]}}`,
``,
`event: response.output_text.delta`,
`data: {"delta":"ok"}`,
``,
`event: response.completed`,
`data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":17,"output_tokens":8,"total_tokens":25,"input_tokens_details":{"cached_tokens":6}}}}`,
``,
``,
}, "\n"))
upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody)
defer func() {
require.NoError(t, upstreamStream.Close())
}()
upstream := &httpUpstreamRecorder{resp: &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_event_named_terminal"}},
Body: upstreamStream,
}}
svc := &OpenAIGatewayService{httpUpstream: upstream}
account := &Account{
ID: 1,
Name: "openai-oauth",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{
"access_token": "oauth-token",
"chatgpt_account_id": "chatgpt-acc",
},
}
type forwardResult struct {
result *OpenAIForwardResult
err error
}
resultCh := make(chan forwardResult, 1)
go func() {
result, err := svc.ForwardAsChatCompletions(context.Background(), c, account, body, "", "gpt-5.1")
resultCh <- forwardResult{result: result, err: err}
}()
select {
case got := <-resultCh:
require.NoError(t, got.err)
require.NotNil(t, got.result)
require.Equal(t, 17, got.result.Usage.InputTokens)
require.Equal(t, 8, got.result.Usage.OutputTokens)
require.Equal(t, 6, got.result.Usage.CacheReadInputTokens)
require.Contains(t, rec.Body.String(), `"content":"ok"`)
case <-time.After(time.Second):
require.Fail(t, "ForwardAsChatCompletions should use SSE event names when data payloads omit type")
}
}
func TestForwardAsChatCompletions_EventTypeDoesNotLeakAcrossFrames(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
body := []byte(`{"model":"gpt-5.4","messages":[{"role":"user","content":"hello"}],"stream":true}`)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
upstreamBody := strings.Join([]string{
`event: response.created`,
`data: {"response":{"id":"resp_1","model":"gpt-5.4","status":"in_progress","output":[]}}`,
``,
`data: {"type":"response.output_text.delta","delta":"ok"}`,
``,
`event: response.completed`,
`data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":17,"output_tokens":8,"total_tokens":25,"input_tokens_details":{"cached_tokens":6}}}}`,
``,
`data: [DONE]`,
``,
}, "\n")
upstream := &httpUpstreamRecorder{resp: &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_event_boundary"}},
Body: io.NopCloser(strings.NewReader(upstreamBody)),
}}
svc := &OpenAIGatewayService{httpUpstream: upstream}
account := &Account{
ID: 1,
Name: "openai-oauth",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{
"access_token": "oauth-token",
"chatgpt_account_id": "chatgpt-acc",
},
}
result, err := svc.ForwardAsChatCompletions(context.Background(), c, account, body, "", "gpt-5.1")
require.NoError(t, err)
require.NotNil(t, result)
require.Contains(t, rec.Body.String(), `"content":"ok"`)
require.Contains(t, rec.Body.String(), `data: [DONE]`)
}
func TestForwardAsChatCompletions_BufferedTerminalWithoutUpstreamCloseReturns(t *testing.T) {
gin.SetMode(gin.TestMode)

View File

@ -560,10 +560,24 @@ func (s *OpenAIGatewayService) readOpenAICompatBufferedTerminal(
}()
defer close(done)
var parser openAICompatSSEFrameParser
for {
select {
case ev, ok := <-events:
if !ok {
if frame, ok := parser.Finish(); ok {
payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType)
var event apicompat.ResponsesStreamEvent
if err := json.Unmarshal([]byte(payload), &event); err == nil {
acc.ProcessEvent(&event)
if isOpenAICompatResponsesTerminalEvent(event.Type) && event.Response != nil {
if event.Response.Usage != nil {
usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage)
}
return event.Response, usage, acc, nil
}
}
}
return nil, usage, acc, nil
}
resetTimeout()
@ -580,10 +594,11 @@ func (s *OpenAIGatewayService) readOpenAICompatBufferedTerminal(
if isOpenAICompatDoneSentinelLine(ev.line) {
return nil, usage, acc, nil
}
payload, ok := extractOpenAISSEDataLine(ev.line)
if !ok || payload == "" {
frame, ok := parser.AddLine(ev.line)
if !ok {
continue
}
payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType)
var event apicompat.ResponsesStreamEvent
if err := json.Unmarshal([]byte(payload), &event); err != nil {
@ -772,6 +787,10 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse(
missingTerminalErr := func() (*OpenAIForwardResult, error) {
return resultWithUsage(), fmt.Errorf("stream usage incomplete: missing terminal event")
}
processFrame := func(frame openAICompatSSEFrame) bool {
payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType)
return processDataLine(payload)
}
// ── Determine keepalive interval ──
keepaliveInterval := time.Duration(0)
@ -781,16 +800,17 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse(
// ── No keepalive: fast synchronous path (no goroutine overhead) ──
if streamInterval <= 0 && keepaliveInterval <= 0 {
var parser openAICompatSSEFrameParser
for scanner.Scan() {
line := scanner.Text()
if isOpenAICompatDoneSentinelLine(line) {
return missingTerminalErr()
}
payload, ok := extractOpenAISSEDataLine(line)
frame, ok := parser.AddLine(line)
if !ok {
continue
}
if processDataLine(payload) {
if processFrame(frame) {
return finalizeStream()
}
}
@ -798,6 +818,14 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse(
handleScanErr(err)
return resultWithUsage(), fmt.Errorf("stream usage incomplete: %w", err)
}
if frame, ok := parser.Finish(); ok {
if strings.TrimSpace(frame.Data) == "[DONE]" {
return missingTerminalErr()
}
if processFrame(frame) {
return finalizeStream()
}
}
return missingTerminalErr()
}
@ -842,12 +870,21 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse(
keepaliveCh = keepaliveTicker.C
}
lastDataAt := time.Now()
var parser openAICompatSSEFrameParser
for {
select {
case ev, ok := <-events:
if !ok {
// Upstream closed
if frame, ok := parser.Finish(); ok {
if strings.TrimSpace(frame.Data) == "[DONE]" {
return missingTerminalErr()
}
if processFrame(frame) {
return finalizeStream()
}
}
return missingTerminalErr()
}
if ev.err != nil {
@ -859,11 +896,11 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse(
if isOpenAICompatDoneSentinelLine(line) {
return missingTerminalErr()
}
payload, ok := extractOpenAISSEDataLine(line)
frame, ok := parser.AddLine(line)
if !ok {
continue
}
if processDataLine(payload) {
if processFrame(frame) {
return finalizeStream()
}

View File

@ -4578,6 +4578,76 @@ func extractOpenAISSEDataLine(line string) (string, bool) {
return line[start:], true
}
func extractOpenAISSEEventLine(line string) (string, bool) {
if !strings.HasPrefix(line, "event:") {
return "", false
}
start := len("event:")
for start < len(line) {
if line[start] != ' ' && line[start] != ' ' {
break
}
start++
}
return strings.TrimSpace(line[start:]), true
}
type openAICompatSSEFrame struct {
EventType string
Data string
}
type openAICompatSSEFrameParser struct {
eventType string
dataLines []string
}
func (p *openAICompatSSEFrameParser) AddLine(line string) (openAICompatSSEFrame, bool) {
if line == "" {
return p.dispatch()
}
if strings.HasPrefix(line, ":") {
return openAICompatSSEFrame{}, false
}
if eventType, ok := extractOpenAISSEEventLine(line); ok {
p.eventType = eventType
return openAICompatSSEFrame{}, false
}
if data, ok := extractOpenAISSEDataLine(line); ok {
p.dataLines = append(p.dataLines, data)
}
return openAICompatSSEFrame{}, false
}
func (p *openAICompatSSEFrameParser) Finish() (openAICompatSSEFrame, bool) {
return p.dispatch()
}
func (p *openAICompatSSEFrameParser) dispatch() (openAICompatSSEFrame, bool) {
frame := openAICompatSSEFrame{
EventType: p.eventType,
Data: strings.Join(p.dataLines, "\n"),
}
p.eventType = ""
p.dataLines = nil
return frame, frame.Data != ""
}
func openAICompatPayloadWithEventType(payload, eventType string) string {
eventType = strings.TrimSpace(eventType)
if eventType == "" || strings.TrimSpace(payload) == "" || strings.TrimSpace(payload) == "[DONE]" {
return payload
}
if gjson.Get(payload, "type").Exists() {
return payload
}
patched, err := sjson.Set(payload, "type", eventType)
if err != nil {
return payload
}
return patched
}
func (s *OpenAIGatewayService) replaceModelInSSELine(line, fromModel, toModel string) string {
data, ok := extractOpenAISSEDataLine(line)
if !ok {

View File

@ -2293,3 +2293,29 @@ func TestHandleSSEToJSON_ResponseFailedReturnsProtocolError(t *testing.T) {
require.Contains(t, rec.Body.String(), "upstream rejected request")
require.Contains(t, rec.Header().Get("Content-Type"), "application/json")
}
func TestOpenAICompatSSEFrameParserResetsEventTypeAtFrameBoundary(t *testing.T) {
var parser openAICompatSSEFrameParser
frame, ok := parser.AddLine("event: response.created")
require.False(t, ok)
require.Empty(t, frame)
frame, ok = parser.AddLine(`data: {"response":{"id":"resp_1"}}`)
require.False(t, ok)
require.Empty(t, frame)
frame, ok = parser.AddLine("")
require.True(t, ok)
require.Equal(t, "response.created", frame.EventType)
require.JSONEq(t, `{"response":{"id":"resp_1"}}`, frame.Data)
frame, ok = parser.AddLine(`data: {"delta":"ok"}`)
require.False(t, ok)
require.Empty(t, frame.EventType)
frame, ok = parser.AddLine("")
require.True(t, ok)
require.Empty(t, frame.EventType)
require.JSONEq(t, `{"delta":"ok"}`, frame.Data)
}