diff --git a/backend/internal/pkg/apicompat/anthropic_responses_test.go b/backend/internal/pkg/apicompat/anthropic_responses_test.go index 7490654d..bb566081 100644 --- a/backend/internal/pkg/apicompat/anthropic_responses_test.go +++ b/backend/internal/pkg/apicompat/anthropic_responses_test.go @@ -517,6 +517,33 @@ func TestResponsesEventToAnthropicEvents_ResponseDone(t *testing.T) { assert.Nil(t, FinalizeResponsesAnthropicStream(state)) } +func TestResponsesEventToAnthropicEvents_TopLevelTerminalUsage(t *testing.T) { + state := NewResponsesEventToAnthropicState() + state.Model = "gpt-4o" + + events := ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{ + Type: "response.completed", + Response: &ResponsesResponse{ + Status: "completed", + }, + Usage: &ResponsesUsage{ + InputTokens: 20, + OutputTokens: 6, + InputTokensDetails: &ResponsesInputTokensDetails{ + CachedTokens: 5, + }, + }, + }, state) + + require.Len(t, events, 2) + assert.Equal(t, "message_delta", events[0].Type) + require.NotNil(t, events[0].Usage) + assert.Equal(t, 15, events[0].Usage.InputTokens) + assert.Equal(t, 5, events[0].Usage.CacheReadInputTokens) + assert.Equal(t, 6, events[0].Usage.OutputTokens) + assert.Equal(t, "message_stop", events[1].Type) +} + func TestResponsesEventToAnthropicEvents_ResponseDoneIncomplete(t *testing.T) { state := NewResponsesEventToAnthropicState() state.Model = "gpt-4o" diff --git a/backend/internal/pkg/apicompat/chatcompletions_responses_test.go b/backend/internal/pkg/apicompat/chatcompletions_responses_test.go index ad26f273..016c2415 100644 --- a/backend/internal/pkg/apicompat/chatcompletions_responses_test.go +++ b/backend/internal/pkg/apicompat/chatcompletions_responses_test.go @@ -846,6 +846,33 @@ func TestResponsesEventToChatChunks_ResponseDone(t *testing.T) { assert.Nil(t, FinalizeResponsesChatStream(state)) } +func TestResponsesEventToChatChunks_TopLevelTerminalUsage(t *testing.T) { + state := NewResponsesEventToChatState() + state.Model = "gpt-4o" + state.IncludeUsage = true + + chunks := ResponsesEventToChatChunks(&ResponsesStreamEvent{ + Type: "response.completed", + Response: &ResponsesResponse{ + Status: "completed", + }, + Usage: &ResponsesUsage{ + InputTokens: 21, + OutputTokens: 9, + InputTokensDetails: &ResponsesInputTokensDetails{ + CachedTokens: 4, + }, + }, + }, state) + + require.Len(t, chunks, 2) + require.NotNil(t, chunks[1].Usage) + assert.Equal(t, 21, chunks[1].Usage.PromptTokens) + assert.Equal(t, 9, chunks[1].Usage.CompletionTokens) + require.NotNil(t, chunks[1].Usage.PromptTokensDetails) + assert.Equal(t, 4, chunks[1].Usage.PromptTokensDetails.CachedTokens) +} + func TestResponsesEventToChatChunks_ResponseDoneIncomplete(t *testing.T) { state := NewResponsesEventToChatState() state.Model = "gpt-4o" diff --git a/backend/internal/pkg/apicompat/responses_to_anthropic.go b/backend/internal/pkg/apicompat/responses_to_anthropic.go index d7ef0145..6913f2eb 100644 --- a/backend/internal/pkg/apicompat/responses_to_anthropic.go +++ b/backend/internal/pkg/apicompat/responses_to_anthropic.go @@ -567,6 +567,12 @@ func resToAnthHandleCompleted(evt *ResponsesStreamEvent, state *ResponsesEventTo events = append(events, closeCurrentBlock(state)...) stopReason := "end_turn" + if evt.Usage != nil { + usage := anthropicUsageFromResponsesUsage(evt.Usage) + state.InputTokens = usage.InputTokens + state.OutputTokens = usage.OutputTokens + state.CacheReadInputTokens = usage.CacheReadInputTokens + } if evt.Response != nil { if evt.Response.Usage != nil { usage := anthropicUsageFromResponsesUsage(evt.Response.Usage) diff --git a/backend/internal/pkg/apicompat/responses_to_chatcompletions.go b/backend/internal/pkg/apicompat/responses_to_chatcompletions.go index 2386771d..7e8354ee 100644 --- a/backend/internal/pkg/apicompat/responses_to_chatcompletions.go +++ b/backend/internal/pkg/apicompat/responses_to_chatcompletions.go @@ -293,20 +293,12 @@ func resToChatHandleCompleted(evt *ResponsesStreamEvent, state *ResponsesEventTo state.Finalized = true finishReason := "stop" + if evt.Usage != nil { + state.Usage = chatUsageFromResponsesUsage(evt.Usage) + } if evt.Response != nil { if evt.Response.Usage != nil { - u := evt.Response.Usage - usage := &ChatUsage{ - PromptTokens: u.InputTokens, - CompletionTokens: u.OutputTokens, - TotalTokens: u.InputTokens + u.OutputTokens, - } - if u.InputTokensDetails != nil && u.InputTokensDetails.CachedTokens > 0 { - usage.PromptTokensDetails = &ChatTokenDetails{ - CachedTokens: u.InputTokensDetails.CachedTokens, - } - } - state.Usage = usage + state.Usage = chatUsageFromResponsesUsage(evt.Response.Usage) } switch evt.Response.Status { @@ -340,6 +332,23 @@ func resToChatHandleCompleted(evt *ResponsesStreamEvent, state *ResponsesEventTo return chunks } +func chatUsageFromResponsesUsage(u *ResponsesUsage) *ChatUsage { + if u == nil { + return nil + } + usage := &ChatUsage{ + PromptTokens: u.InputTokens, + CompletionTokens: u.OutputTokens, + TotalTokens: u.InputTokens + u.OutputTokens, + } + if u.InputTokensDetails != nil && u.InputTokensDetails.CachedTokens > 0 { + usage.PromptTokensDetails = &ChatTokenDetails{ + CachedTokens: u.InputTokensDetails.CachedTokens, + } + } + return usage +} + func makeChatDeltaChunk(state *ResponsesEventToChatState, delta ChatDelta) ChatCompletionsChunk { return ChatCompletionsChunk{ ID: state.ID, diff --git a/backend/internal/pkg/apicompat/types.go b/backend/internal/pkg/apicompat/types.go index 7c46ccaf..8b576647 100644 --- a/backend/internal/pkg/apicompat/types.go +++ b/backend/internal/pkg/apicompat/types.go @@ -380,6 +380,8 @@ type ResponsesStreamEvent struct { // response.created / response.completed / response.done / response.failed / response.incomplete Response *ResponsesResponse `json:"response,omitempty"` + // 部分 OpenAI 兼容上游会把 usage 放在终止事件顶层,而不是 response.usage。 + Usage *ResponsesUsage `json:"usage,omitempty"` // response.output_item.added / response.output_item.done Item *ResponsesOutput `json:"item,omitempty"` diff --git a/backend/internal/service/openai_gateway_chat_completions.go b/backend/internal/service/openai_gateway_chat_completions.go index f49b3218..f44d88cf 100644 --- a/backend/internal/service/openai_gateway_chat_completions.go +++ b/backend/internal/service/openai_gateway_chat_completions.go @@ -76,7 +76,6 @@ func (s *OpenAIGatewayService) ForwardAsChatCompletions( } originalModel := chatReq.Model clientStream := chatReq.Stream - includeUsage := chatReq.StreamOptions != nil && chatReq.StreamOptions.IncludeUsage // 2. Resolve model mapping early so compat prompt_cache_key injection can // derive a stable seed from the final upstream model family. @@ -291,7 +290,7 @@ func (s *OpenAIGatewayService) ForwardAsChatCompletions( var result *OpenAIForwardResult var handleErr error if clientStream { - result, handleErr = s.handleChatStreamingResponse(resp, c, account, originalModel, billingModel, upstreamModel, includeUsage, startTime, len(body)) + result, handleErr = s.handleChatStreamingResponse(resp, c, account, originalModel, billingModel, upstreamModel, startTime, len(body)) } else { result, handleErr = s.handleChatBufferedStreamingResponse(resp, c, originalModel, billingModel, upstreamModel, startTime) } @@ -417,7 +416,6 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( originalModel string, billingModel string, upstreamModel string, - includeUsage bool, startTime time.Time, requestBodyLen int, ) (*OpenAIForwardResult, error) { @@ -441,7 +439,9 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( state := apicompat.NewResponsesEventToChatState() state.Model = originalModel - state.IncludeUsage = includeUsage + // 网关作为计费链路的一环,不能把下游 usage 输出绑定到客户端是否显式请求。 + // raw Chat Completions 直转路径已经强制透出 usage,这里保持同样行为,避免级联代理计费为 0。 + state.IncludeUsage = true var usage OpenAIUsage var firstTokenMs *int @@ -502,10 +502,14 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( } refusalDetector.ObservePayload([]byte(payload)) - // 仅按兼容转换器支持的终止事件提取 usage,避免无意扩大事件语义。 isTerminalEvent := isOpenAICompatResponsesTerminalEvent(event.Type) - if isTerminalEvent && event.Response != nil && event.Response.Usage != nil { - usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage) + if isTerminalEvent { + if event.Usage != nil { + usage = copyOpenAIUsageFromResponsesUsage(event.Usage) + } + if event.Response != nil && event.Response.Usage != nil { + usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage) + } } chunks := apicompat.ResponsesEventToChatChunks(&event, state) diff --git a/backend/internal/service/openai_gateway_chat_completions_raw_test.go b/backend/internal/service/openai_gateway_chat_completions_raw_test.go index 8e5fab20..eeb814f1 100644 --- a/backend/internal/service/openai_gateway_chat_completions_raw_test.go +++ b/backend/internal/service/openai_gateway_chat_completions_raw_test.go @@ -328,7 +328,6 @@ func TestHandleChatStreamingResponse_SilentRefusalReasoningSummaryExempt(t *test "gpt-5.5", "gpt-5.5", "gpt-5.5", - false, time.Now(), openAISilentRefusalMinRequestBodyBytes, ) diff --git a/backend/internal/service/openai_gateway_chat_completions_test.go b/backend/internal/service/openai_gateway_chat_completions_test.go index a26091a3..9a5ea711 100644 --- a/backend/internal/service/openai_gateway_chat_completions_test.go +++ b/backend/internal/service/openai_gateway_chat_completions_test.go @@ -180,6 +180,158 @@ func TestForwardAsChatCompletions_ClientDisconnectDrainsUpstreamUsage(t *testing require.Equal(t, 4, result.Usage.CacheReadInputTokens) } +func TestForwardAsChatCompletions_StreamsUsageWithoutClientStreamOptions(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.4","messages":[{"role":"user","content":"hello"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := strings.Join([]string{ + `data: {"type":"response.created","response":{"id":"resp_1","model":"gpt-5.4","status":"in_progress","output":[]}}`, + "", + `data: {"type":"response.output_text.delta","delta":"ok"}`, + "", + `data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":13,"output_tokens":7,"total_tokens":20,"input_tokens_details":{"cached_tokens":5}}}}`, + "", + "data: [DONE]", + "", + }, "\n") + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_usage_no_stream_options"}}, + Body: io.NopCloser(strings.NewReader(upstreamBody)), + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + result, err := svc.ForwardAsChatCompletions(context.Background(), c, account, body, "", "gpt-5.1") + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, 13, result.Usage.InputTokens) + require.Equal(t, 7, result.Usage.OutputTokens) + require.Equal(t, 5, result.Usage.CacheReadInputTokens) + + responseBody := rec.Body.String() + require.Contains(t, responseBody, `"usage"`) + require.Contains(t, responseBody, `"prompt_tokens":13`) + require.Contains(t, responseBody, `"completion_tokens":7`) + require.Contains(t, responseBody, `"cached_tokens":5`) +} + +func TestForwardAsChatCompletions_StreamsTopLevelTerminalUsage(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.4","messages":[{"role":"user","content":"hello"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := strings.Join([]string{ + `data: {"type":"response.created","response":{"id":"resp_top","model":"gpt-5.4","status":"in_progress","output":[]}}`, + "", + `data: {"type":"response.output_text.delta","delta":"ok"}`, + "", + `data: {"type":"response.completed","response":{"id":"resp_top","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}]},"usage":{"input_tokens":21,"output_tokens":9,"total_tokens":30,"input_tokens_details":{"cached_tokens":4}}}`, + "", + "data: [DONE]", + "", + }, "\n") + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_top_level_usage"}}, + Body: io.NopCloser(strings.NewReader(upstreamBody)), + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + result, err := svc.ForwardAsChatCompletions(context.Background(), c, account, body, "", "gpt-5.1") + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, 21, result.Usage.InputTokens) + require.Equal(t, 9, result.Usage.OutputTokens) + require.Equal(t, 4, result.Usage.CacheReadInputTokens) + + responseBody := rec.Body.String() + require.Contains(t, responseBody, `"usage"`) + require.Contains(t, responseBody, `"prompt_tokens":21`) + require.Contains(t, responseBody, `"completion_tokens":9`) + require.Contains(t, responseBody, `"cached_tokens":4`) +} + +func TestForwardAsChatCompletions_BufferedTopLevelTerminalUsage(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.4","messages":[{"role":"user","content":"hello"}],"stream":false}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := strings.Join([]string{ + `data: {"type":"response.completed","response":{"id":"resp_top_buffered","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}]},"usage":{"input_tokens":18,"output_tokens":6,"total_tokens":24,"input_tokens_details":{"cached_tokens":3}}}`, + "", + "data: [DONE]", + "", + }, "\n") + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_buffered_top_level_usage"}}, + Body: io.NopCloser(strings.NewReader(upstreamBody)), + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + result, err := svc.ForwardAsChatCompletions(context.Background(), c, account, body, "", "gpt-5.1") + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, 18, result.Usage.InputTokens) + require.Equal(t, 6, result.Usage.OutputTokens) + require.Equal(t, 3, result.Usage.CacheReadInputTokens) + + responseBody := rec.Body.String() + require.Contains(t, responseBody, `"usage"`) + require.Contains(t, responseBody, `"prompt_tokens":18`) + require.Contains(t, responseBody, `"completion_tokens":6`) + require.Contains(t, responseBody, `"cached_tokens":3`) +} + func TestForwardAsChatCompletions_TerminalUsageWithoutUpstreamCloseReturns(t *testing.T) { gin.SetMode(gin.TestMode) diff --git a/backend/internal/service/openai_gateway_messages.go b/backend/internal/service/openai_gateway_messages.go index 662d2a69..a624175b 100644 --- a/backend/internal/service/openai_gateway_messages.go +++ b/backend/internal/service/openai_gateway_messages.go @@ -570,6 +570,12 @@ func (s *OpenAIGatewayService) readOpenAICompatBufferedTerminal( if err := json.Unmarshal([]byte(payload), &event); err == nil { acc.ProcessEvent(&event) if isOpenAICompatResponsesTerminalEvent(event.Type) && event.Response != nil { + if event.Usage != nil { + usage = copyOpenAIUsageFromResponsesUsage(event.Usage) + if event.Response.Usage == nil { + event.Response.Usage = event.Usage + } + } if event.Response.Usage != nil { usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage) } @@ -611,6 +617,12 @@ func (s *OpenAIGatewayService) readOpenAICompatBufferedTerminal( acc.ProcessEvent(&event) if isOpenAICompatResponsesTerminalEvent(event.Type) && event.Response != nil { + if event.Usage != nil { + usage = copyOpenAIUsageFromResponsesUsage(event.Usage) + if event.Response.Usage == nil { + event.Response.Usage = event.Usage + } + } if event.Response.Usage != nil { usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage) } @@ -713,14 +725,18 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( return false } - // 仅按兼容转换器支持的终止事件提取 usage,避免无意扩大事件语义。 isTerminalEvent := isOpenAICompatResponsesTerminalEvent(event.Type) - if isTerminalEvent && event.Response != nil { - if id := strings.TrimSpace(event.Response.ID); id != "" { - responseID = id + if isTerminalEvent { + if event.Response != nil { + if id := strings.TrimSpace(event.Response.ID); id != "" { + responseID = id + } + if event.Response.Usage != nil { + usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage) + } } - if event.Response.Usage != nil { - usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage) + if event.Usage != nil { + usage = copyOpenAIUsageFromResponsesUsage(event.Usage) } }