diff --git a/backend/internal/service/openai_compat_model_test.go b/backend/internal/service/openai_compat_model_test.go index e222b093..0ba2a63f 100644 --- a/backend/internal/service/openai_compat_model_test.go +++ b/backend/internal/service/openai_compat_model_test.go @@ -1360,6 +1360,135 @@ func TestForwardAsAnthropic_TerminalUsageWithoutUpstreamCloseReturns(t *testing. } } +func TestForwardAsAnthropic_EventNamedTerminalWithoutUpstreamCloseReturns(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Writer = &openAICompatFailingWriter{ResponseWriter: c.Writer, failAfter: 0} + body := []byte(`{"model":"gpt-5.4","max_tokens":16,"messages":[{"role":"user","content":"hello"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := []byte(strings.Join([]string{ + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}`, + ``, + ``, + }, "\n")) + upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) + defer func() { + require.NoError(t, upstreamStream.Close()) + }() + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_messages_event_named_terminal"}}, + Body: upstreamStream, + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + type forwardResult struct { + result *OpenAIForwardResult + err error + } + resultCh := make(chan forwardResult, 1) + go func() { + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.1") + resultCh <- forwardResult{result: result, err: err} + }() + + select { + case got := <-resultCh: + require.NoError(t, got.err) + require.NotNil(t, got.result) + require.Equal(t, 15, got.result.Usage.InputTokens) + require.Equal(t, 6, got.result.Usage.OutputTokens) + require.Equal(t, 5, got.result.Usage.CacheReadInputTokens) + case <-time.After(time.Second): + require.Fail(t, "ForwardAsAnthropic should use SSE event names when data payloads omit type") + } +} + +func TestForwardAsAnthropic_EventNamedTerminalWithKeepaliveReturns(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Writer = &openAICompatFailingWriter{ResponseWriter: c.Writer, failAfter: 0} + body := []byte(`{"model":"gpt-5.4","max_tokens":16,"messages":[{"role":"user","content":"hello"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := []byte(strings.Join([]string{ + `: upstream ping`, + ``, + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}`, + ``, + ``, + }, "\n")) + upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) + defer func() { + require.NoError(t, upstreamStream.Close()) + }() + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_messages_event_named_keepalive"}}, + Body: upstreamStream, + }} + + svc := &OpenAIGatewayService{ + cfg: &config.Config{Gateway: config.GatewayConfig{ + StreamKeepaliveInterval: 5, + }}, + httpUpstream: upstream, + } + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + type forwardResult struct { + result *OpenAIForwardResult + err error + } + resultCh := make(chan forwardResult, 1) + go func() { + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.1") + resultCh <- forwardResult{result: result, err: err} + }() + + select { + case got := <-resultCh: + require.NoError(t, got.err) + require.NotNil(t, got.result) + require.Equal(t, 15, got.result.Usage.InputTokens) + require.Equal(t, 6, got.result.Usage.OutputTokens) + require.Equal(t, 5, got.result.Usage.CacheReadInputTokens) + case <-time.After(time.Second): + require.Fail(t, "ForwardAsAnthropic keepalive path should use SSE event names when data payloads omit type") + } +} + func TestForwardAsAnthropic_BufferedTerminalWithoutUpstreamCloseReturns(t *testing.T) { gin.SetMode(gin.TestMode) @@ -1416,6 +1545,67 @@ func TestForwardAsAnthropic_BufferedTerminalWithoutUpstreamCloseReturns(t *testi } } +func TestForwardAsAnthropic_BufferedEventNamedTerminalWithoutUpstreamCloseReturns(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.4","max_tokens":16,"messages":[{"role":"user","content":"hello"}],"stream":false}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := []byte(strings.Join([]string{ + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}`, + ``, + ``, + }, "\n")) + upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) + defer func() { + require.NoError(t, upstreamStream.Close()) + }() + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_messages_buffered_event_named"}}, + Body: upstreamStream, + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + type forwardResult struct { + result *OpenAIForwardResult + err error + } + resultCh := make(chan forwardResult, 1) + go func() { + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.1") + resultCh <- forwardResult{result: result, err: err} + }() + + select { + case got := <-resultCh: + require.NoError(t, got.err) + require.NotNil(t, got.result) + require.Equal(t, 15, got.result.Usage.InputTokens) + require.Equal(t, 6, got.result.Usage.OutputTokens) + require.Equal(t, 5, got.result.Usage.CacheReadInputTokens) + require.Contains(t, rec.Body.String(), `"stop_reason":"end_turn"`) + case <-time.After(time.Second): + require.Fail(t, "ForwardAsAnthropic buffered response should use SSE event names when data payloads omit type") + } +} + func TestForwardAsAnthropic_DoneSentinelWithoutTerminalReturnsError(t *testing.T) { gin.SetMode(gin.TestMode) diff --git a/backend/internal/service/openai_gateway_chat_completions.go b/backend/internal/service/openai_gateway_chat_completions.go index 84d85c74..5b3c0e6f 100644 --- a/backend/internal/service/openai_gateway_chat_completions.go +++ b/backend/internal/service/openai_gateway_chat_completions.go @@ -554,6 +554,13 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( missingTerminalErr := func() (*OpenAIForwardResult, error) { return resultWithUsage(), fmt.Errorf("stream usage incomplete: missing terminal event") } + processFrame := func(frame openAICompatSSEFrame) bool { + payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType) + if strings.TrimSpace(payload) == "[DONE]" { + return false + } + return processDataLine(payload) + } // Determine keepalive interval keepaliveInterval := time.Duration(0) @@ -563,16 +570,17 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( // No keepalive: fast synchronous path if streamInterval <= 0 && keepaliveInterval <= 0 { + var parser openAICompatSSEFrameParser for scanner.Scan() { line := scanner.Text() - payload, ok := extractOpenAISSEDataLine(line) + frame, ok := parser.AddLine(line) if !ok { continue } - if strings.TrimSpace(payload) == "[DONE]" { + if strings.TrimSpace(frame.Data) == "[DONE]" { return missingTerminalErr() } - if processDataLine(payload) { + if processFrame(frame) { return finalizeStream() } } @@ -580,6 +588,14 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( handleScanErr(err) return resultWithUsage(), fmt.Errorf("stream usage incomplete: %w", err) } + if frame, ok := parser.Finish(); ok { + if strings.TrimSpace(frame.Data) == "[DONE]" { + return missingTerminalErr() + } + if processFrame(frame) { + return finalizeStream() + } + } return missingTerminalErr() } @@ -624,11 +640,20 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( keepaliveCh = keepaliveTicker.C } lastDataAt := time.Now() + var parser openAICompatSSEFrameParser for { select { case ev, ok := <-events: if !ok { + if frame, ok := parser.Finish(); ok { + if strings.TrimSpace(frame.Data) == "[DONE]" { + return missingTerminalErr() + } + if processFrame(frame) { + return finalizeStream() + } + } return missingTerminalErr() } if ev.err != nil { @@ -637,14 +662,14 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( } lastDataAt = time.Now() line := ev.line - payload, ok := extractOpenAISSEDataLine(line) + frame, ok := parser.AddLine(line) if !ok { continue } - if strings.TrimSpace(payload) == "[DONE]" { + if strings.TrimSpace(frame.Data) == "[DONE]" { return missingTerminalErr() } - if processDataLine(payload) { + if processFrame(frame) { return finalizeStream() } diff --git a/backend/internal/service/openai_gateway_chat_completions_test.go b/backend/internal/service/openai_gateway_chat_completions_test.go index b0d1fa31..a26091a3 100644 --- a/backend/internal/service/openai_gateway_chat_completions_test.go +++ b/backend/internal/service/openai_gateway_chat_completions_test.go @@ -236,6 +236,120 @@ func TestForwardAsChatCompletions_TerminalUsageWithoutUpstreamCloseReturns(t *te } } +func TestForwardAsChatCompletions_EventNamedTerminalWithoutUpstreamCloseReturns(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.4","messages":[{"role":"user","content":"hello"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := []byte(strings.Join([]string{ + `event: response.created`, + `data: {"response":{"id":"resp_1","model":"gpt-5.4","status":"in_progress","output":[]}}`, + ``, + `event: response.output_text.delta`, + `data: {"delta":"ok"}`, + ``, + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":17,"output_tokens":8,"total_tokens":25,"input_tokens_details":{"cached_tokens":6}}}}`, + ``, + ``, + }, "\n")) + upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) + defer func() { + require.NoError(t, upstreamStream.Close()) + }() + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_event_named_terminal"}}, + Body: upstreamStream, + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + type forwardResult struct { + result *OpenAIForwardResult + err error + } + resultCh := make(chan forwardResult, 1) + go func() { + result, err := svc.ForwardAsChatCompletions(context.Background(), c, account, body, "", "gpt-5.1") + resultCh <- forwardResult{result: result, err: err} + }() + + select { + case got := <-resultCh: + require.NoError(t, got.err) + require.NotNil(t, got.result) + require.Equal(t, 17, got.result.Usage.InputTokens) + require.Equal(t, 8, got.result.Usage.OutputTokens) + require.Equal(t, 6, got.result.Usage.CacheReadInputTokens) + require.Contains(t, rec.Body.String(), `"content":"ok"`) + case <-time.After(time.Second): + require.Fail(t, "ForwardAsChatCompletions should use SSE event names when data payloads omit type") + } +} + +func TestForwardAsChatCompletions_EventTypeDoesNotLeakAcrossFrames(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.4","messages":[{"role":"user","content":"hello"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := strings.Join([]string{ + `event: response.created`, + `data: {"response":{"id":"resp_1","model":"gpt-5.4","status":"in_progress","output":[]}}`, + ``, + `data: {"type":"response.output_text.delta","delta":"ok"}`, + ``, + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":17,"output_tokens":8,"total_tokens":25,"input_tokens_details":{"cached_tokens":6}}}}`, + ``, + `data: [DONE]`, + ``, + }, "\n") + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_event_boundary"}}, + Body: io.NopCloser(strings.NewReader(upstreamBody)), + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + result, err := svc.ForwardAsChatCompletions(context.Background(), c, account, body, "", "gpt-5.1") + require.NoError(t, err) + require.NotNil(t, result) + require.Contains(t, rec.Body.String(), `"content":"ok"`) + require.Contains(t, rec.Body.String(), `data: [DONE]`) +} + func TestForwardAsChatCompletions_BufferedTerminalWithoutUpstreamCloseReturns(t *testing.T) { gin.SetMode(gin.TestMode) diff --git a/backend/internal/service/openai_gateway_messages.go b/backend/internal/service/openai_gateway_messages.go index aefa8fd2..6d74f7dd 100644 --- a/backend/internal/service/openai_gateway_messages.go +++ b/backend/internal/service/openai_gateway_messages.go @@ -560,10 +560,24 @@ func (s *OpenAIGatewayService) readOpenAICompatBufferedTerminal( }() defer close(done) + var parser openAICompatSSEFrameParser for { select { case ev, ok := <-events: if !ok { + if frame, ok := parser.Finish(); ok { + payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType) + var event apicompat.ResponsesStreamEvent + if err := json.Unmarshal([]byte(payload), &event); err == nil { + acc.ProcessEvent(&event) + if isOpenAICompatResponsesTerminalEvent(event.Type) && event.Response != nil { + if event.Response.Usage != nil { + usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage) + } + return event.Response, usage, acc, nil + } + } + } return nil, usage, acc, nil } resetTimeout() @@ -580,10 +594,11 @@ func (s *OpenAIGatewayService) readOpenAICompatBufferedTerminal( if isOpenAICompatDoneSentinelLine(ev.line) { return nil, usage, acc, nil } - payload, ok := extractOpenAISSEDataLine(ev.line) - if !ok || payload == "" { + frame, ok := parser.AddLine(ev.line) + if !ok { continue } + payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType) var event apicompat.ResponsesStreamEvent if err := json.Unmarshal([]byte(payload), &event); err != nil { @@ -772,6 +787,10 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( missingTerminalErr := func() (*OpenAIForwardResult, error) { return resultWithUsage(), fmt.Errorf("stream usage incomplete: missing terminal event") } + processFrame := func(frame openAICompatSSEFrame) bool { + payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType) + return processDataLine(payload) + } // ── Determine keepalive interval ── keepaliveInterval := time.Duration(0) @@ -781,16 +800,17 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( // ── No keepalive: fast synchronous path (no goroutine overhead) ── if streamInterval <= 0 && keepaliveInterval <= 0 { + var parser openAICompatSSEFrameParser for scanner.Scan() { line := scanner.Text() if isOpenAICompatDoneSentinelLine(line) { return missingTerminalErr() } - payload, ok := extractOpenAISSEDataLine(line) + frame, ok := parser.AddLine(line) if !ok { continue } - if processDataLine(payload) { + if processFrame(frame) { return finalizeStream() } } @@ -798,6 +818,14 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( handleScanErr(err) return resultWithUsage(), fmt.Errorf("stream usage incomplete: %w", err) } + if frame, ok := parser.Finish(); ok { + if strings.TrimSpace(frame.Data) == "[DONE]" { + return missingTerminalErr() + } + if processFrame(frame) { + return finalizeStream() + } + } return missingTerminalErr() } @@ -842,12 +870,21 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( keepaliveCh = keepaliveTicker.C } lastDataAt := time.Now() + var parser openAICompatSSEFrameParser for { select { case ev, ok := <-events: if !ok { // Upstream closed + if frame, ok := parser.Finish(); ok { + if strings.TrimSpace(frame.Data) == "[DONE]" { + return missingTerminalErr() + } + if processFrame(frame) { + return finalizeStream() + } + } return missingTerminalErr() } if ev.err != nil { @@ -859,11 +896,11 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( if isOpenAICompatDoneSentinelLine(line) { return missingTerminalErr() } - payload, ok := extractOpenAISSEDataLine(line) + frame, ok := parser.AddLine(line) if !ok { continue } - if processDataLine(payload) { + if processFrame(frame) { return finalizeStream() } diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index e12b208e..a2276353 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -4578,6 +4578,76 @@ func extractOpenAISSEDataLine(line string) (string, bool) { return line[start:], true } +func extractOpenAISSEEventLine(line string) (string, bool) { + if !strings.HasPrefix(line, "event:") { + return "", false + } + start := len("event:") + for start < len(line) { + if line[start] != ' ' && line[start] != ' ' { + break + } + start++ + } + return strings.TrimSpace(line[start:]), true +} + +type openAICompatSSEFrame struct { + EventType string + Data string +} + +type openAICompatSSEFrameParser struct { + eventType string + dataLines []string +} + +func (p *openAICompatSSEFrameParser) AddLine(line string) (openAICompatSSEFrame, bool) { + if line == "" { + return p.dispatch() + } + if strings.HasPrefix(line, ":") { + return openAICompatSSEFrame{}, false + } + if eventType, ok := extractOpenAISSEEventLine(line); ok { + p.eventType = eventType + return openAICompatSSEFrame{}, false + } + if data, ok := extractOpenAISSEDataLine(line); ok { + p.dataLines = append(p.dataLines, data) + } + return openAICompatSSEFrame{}, false +} + +func (p *openAICompatSSEFrameParser) Finish() (openAICompatSSEFrame, bool) { + return p.dispatch() +} + +func (p *openAICompatSSEFrameParser) dispatch() (openAICompatSSEFrame, bool) { + frame := openAICompatSSEFrame{ + EventType: p.eventType, + Data: strings.Join(p.dataLines, "\n"), + } + p.eventType = "" + p.dataLines = nil + return frame, frame.Data != "" +} + +func openAICompatPayloadWithEventType(payload, eventType string) string { + eventType = strings.TrimSpace(eventType) + if eventType == "" || strings.TrimSpace(payload) == "" || strings.TrimSpace(payload) == "[DONE]" { + return payload + } + if gjson.Get(payload, "type").Exists() { + return payload + } + patched, err := sjson.Set(payload, "type", eventType) + if err != nil { + return payload + } + return patched +} + func (s *OpenAIGatewayService) replaceModelInSSELine(line, fromModel, toModel string) string { data, ok := extractOpenAISSEDataLine(line) if !ok { diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index 84a2fe71..d636cf27 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -2293,3 +2293,29 @@ func TestHandleSSEToJSON_ResponseFailedReturnsProtocolError(t *testing.T) { require.Contains(t, rec.Body.String(), "upstream rejected request") require.Contains(t, rec.Header().Get("Content-Type"), "application/json") } + +func TestOpenAICompatSSEFrameParserResetsEventTypeAtFrameBoundary(t *testing.T) { + var parser openAICompatSSEFrameParser + + frame, ok := parser.AddLine("event: response.created") + require.False(t, ok) + require.Empty(t, frame) + + frame, ok = parser.AddLine(`data: {"response":{"id":"resp_1"}}`) + require.False(t, ok) + require.Empty(t, frame) + + frame, ok = parser.AddLine("") + require.True(t, ok) + require.Equal(t, "response.created", frame.EventType) + require.JSONEq(t, `{"response":{"id":"resp_1"}}`, frame.Data) + + frame, ok = parser.AddLine(`data: {"delta":"ok"}`) + require.False(t, ok) + require.Empty(t, frame.EventType) + + frame, ok = parser.AddLine("") + require.True(t, ok) + require.Empty(t, frame.EventType) + require.JSONEq(t, `{"delta":"ok"}`, frame.Data) +}