fix(openai): preserve chat responses usage billing

This commit is contained in:
shaw 2026-05-26 21:33:28 +08:00
parent 4b9b63443f
commit f7ac5e5931
9 changed files with 268 additions and 26 deletions

View File

@ -517,6 +517,33 @@ func TestResponsesEventToAnthropicEvents_ResponseDone(t *testing.T) {
assert.Nil(t, FinalizeResponsesAnthropicStream(state))
}
func TestResponsesEventToAnthropicEvents_TopLevelTerminalUsage(t *testing.T) {
state := NewResponsesEventToAnthropicState()
state.Model = "gpt-4o"
events := ResponsesEventToAnthropicEvents(&ResponsesStreamEvent{
Type: "response.completed",
Response: &ResponsesResponse{
Status: "completed",
},
Usage: &ResponsesUsage{
InputTokens: 20,
OutputTokens: 6,
InputTokensDetails: &ResponsesInputTokensDetails{
CachedTokens: 5,
},
},
}, state)
require.Len(t, events, 2)
assert.Equal(t, "message_delta", events[0].Type)
require.NotNil(t, events[0].Usage)
assert.Equal(t, 15, events[0].Usage.InputTokens)
assert.Equal(t, 5, events[0].Usage.CacheReadInputTokens)
assert.Equal(t, 6, events[0].Usage.OutputTokens)
assert.Equal(t, "message_stop", events[1].Type)
}
func TestResponsesEventToAnthropicEvents_ResponseDoneIncomplete(t *testing.T) {
state := NewResponsesEventToAnthropicState()
state.Model = "gpt-4o"

View File

@ -846,6 +846,33 @@ func TestResponsesEventToChatChunks_ResponseDone(t *testing.T) {
assert.Nil(t, FinalizeResponsesChatStream(state))
}
func TestResponsesEventToChatChunks_TopLevelTerminalUsage(t *testing.T) {
state := NewResponsesEventToChatState()
state.Model = "gpt-4o"
state.IncludeUsage = true
chunks := ResponsesEventToChatChunks(&ResponsesStreamEvent{
Type: "response.completed",
Response: &ResponsesResponse{
Status: "completed",
},
Usage: &ResponsesUsage{
InputTokens: 21,
OutputTokens: 9,
InputTokensDetails: &ResponsesInputTokensDetails{
CachedTokens: 4,
},
},
}, state)
require.Len(t, chunks, 2)
require.NotNil(t, chunks[1].Usage)
assert.Equal(t, 21, chunks[1].Usage.PromptTokens)
assert.Equal(t, 9, chunks[1].Usage.CompletionTokens)
require.NotNil(t, chunks[1].Usage.PromptTokensDetails)
assert.Equal(t, 4, chunks[1].Usage.PromptTokensDetails.CachedTokens)
}
func TestResponsesEventToChatChunks_ResponseDoneIncomplete(t *testing.T) {
state := NewResponsesEventToChatState()
state.Model = "gpt-4o"

View File

@ -567,6 +567,12 @@ func resToAnthHandleCompleted(evt *ResponsesStreamEvent, state *ResponsesEventTo
events = append(events, closeCurrentBlock(state)...)
stopReason := "end_turn"
if evt.Usage != nil {
usage := anthropicUsageFromResponsesUsage(evt.Usage)
state.InputTokens = usage.InputTokens
state.OutputTokens = usage.OutputTokens
state.CacheReadInputTokens = usage.CacheReadInputTokens
}
if evt.Response != nil {
if evt.Response.Usage != nil {
usage := anthropicUsageFromResponsesUsage(evt.Response.Usage)

View File

@ -293,20 +293,12 @@ func resToChatHandleCompleted(evt *ResponsesStreamEvent, state *ResponsesEventTo
state.Finalized = true
finishReason := "stop"
if evt.Usage != nil {
state.Usage = chatUsageFromResponsesUsage(evt.Usage)
}
if evt.Response != nil {
if evt.Response.Usage != nil {
u := evt.Response.Usage
usage := &ChatUsage{
PromptTokens: u.InputTokens,
CompletionTokens: u.OutputTokens,
TotalTokens: u.InputTokens + u.OutputTokens,
}
if u.InputTokensDetails != nil && u.InputTokensDetails.CachedTokens > 0 {
usage.PromptTokensDetails = &ChatTokenDetails{
CachedTokens: u.InputTokensDetails.CachedTokens,
}
}
state.Usage = usage
state.Usage = chatUsageFromResponsesUsage(evt.Response.Usage)
}
switch evt.Response.Status {
@ -340,6 +332,23 @@ func resToChatHandleCompleted(evt *ResponsesStreamEvent, state *ResponsesEventTo
return chunks
}
func chatUsageFromResponsesUsage(u *ResponsesUsage) *ChatUsage {
if u == nil {
return nil
}
usage := &ChatUsage{
PromptTokens: u.InputTokens,
CompletionTokens: u.OutputTokens,
TotalTokens: u.InputTokens + u.OutputTokens,
}
if u.InputTokensDetails != nil && u.InputTokensDetails.CachedTokens > 0 {
usage.PromptTokensDetails = &ChatTokenDetails{
CachedTokens: u.InputTokensDetails.CachedTokens,
}
}
return usage
}
func makeChatDeltaChunk(state *ResponsesEventToChatState, delta ChatDelta) ChatCompletionsChunk {
return ChatCompletionsChunk{
ID: state.ID,

View File

@ -380,6 +380,8 @@ type ResponsesStreamEvent struct {
// response.created / response.completed / response.done / response.failed / response.incomplete
Response *ResponsesResponse `json:"response,omitempty"`
// 部分 OpenAI 兼容上游会把 usage 放在终止事件顶层,而不是 response.usage。
Usage *ResponsesUsage `json:"usage,omitempty"`
// response.output_item.added / response.output_item.done
Item *ResponsesOutput `json:"item,omitempty"`

View File

@ -76,7 +76,6 @@ func (s *OpenAIGatewayService) ForwardAsChatCompletions(
}
originalModel := chatReq.Model
clientStream := chatReq.Stream
includeUsage := chatReq.StreamOptions != nil && chatReq.StreamOptions.IncludeUsage
// 2. Resolve model mapping early so compat prompt_cache_key injection can
// derive a stable seed from the final upstream model family.
@ -291,7 +290,7 @@ func (s *OpenAIGatewayService) ForwardAsChatCompletions(
var result *OpenAIForwardResult
var handleErr error
if clientStream {
result, handleErr = s.handleChatStreamingResponse(resp, c, account, originalModel, billingModel, upstreamModel, includeUsage, startTime, len(body))
result, handleErr = s.handleChatStreamingResponse(resp, c, account, originalModel, billingModel, upstreamModel, startTime, len(body))
} else {
result, handleErr = s.handleChatBufferedStreamingResponse(resp, c, originalModel, billingModel, upstreamModel, startTime)
}
@ -417,7 +416,6 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
originalModel string,
billingModel string,
upstreamModel string,
includeUsage bool,
startTime time.Time,
requestBodyLen int,
) (*OpenAIForwardResult, error) {
@ -441,7 +439,9 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
state := apicompat.NewResponsesEventToChatState()
state.Model = originalModel
state.IncludeUsage = includeUsage
// 网关作为计费链路的一环,不能把下游 usage 输出绑定到客户端是否显式请求。
// raw Chat Completions 直转路径已经强制透出 usage这里保持同样行为避免级联代理计费为 0。
state.IncludeUsage = true
var usage OpenAIUsage
var firstTokenMs *int
@ -502,10 +502,14 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
}
refusalDetector.ObservePayload([]byte(payload))
// 仅按兼容转换器支持的终止事件提取 usage避免无意扩大事件语义。
isTerminalEvent := isOpenAICompatResponsesTerminalEvent(event.Type)
if isTerminalEvent && event.Response != nil && event.Response.Usage != nil {
usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage)
if isTerminalEvent {
if event.Usage != nil {
usage = copyOpenAIUsageFromResponsesUsage(event.Usage)
}
if event.Response != nil && event.Response.Usage != nil {
usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage)
}
}
chunks := apicompat.ResponsesEventToChatChunks(&event, state)

View File

@ -328,7 +328,6 @@ func TestHandleChatStreamingResponse_SilentRefusalReasoningSummaryExempt(t *test
"gpt-5.5",
"gpt-5.5",
"gpt-5.5",
false,
time.Now(),
openAISilentRefusalMinRequestBodyBytes,
)

View File

@ -180,6 +180,158 @@ func TestForwardAsChatCompletions_ClientDisconnectDrainsUpstreamUsage(t *testing
require.Equal(t, 4, result.Usage.CacheReadInputTokens)
}
func TestForwardAsChatCompletions_StreamsUsageWithoutClientStreamOptions(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
body := []byte(`{"model":"gpt-5.4","messages":[{"role":"user","content":"hello"}],"stream":true}`)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
upstreamBody := strings.Join([]string{
`data: {"type":"response.created","response":{"id":"resp_1","model":"gpt-5.4","status":"in_progress","output":[]}}`,
"",
`data: {"type":"response.output_text.delta","delta":"ok"}`,
"",
`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":13,"output_tokens":7,"total_tokens":20,"input_tokens_details":{"cached_tokens":5}}}}`,
"",
"data: [DONE]",
"",
}, "\n")
upstream := &httpUpstreamRecorder{resp: &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_usage_no_stream_options"}},
Body: io.NopCloser(strings.NewReader(upstreamBody)),
}}
svc := &OpenAIGatewayService{httpUpstream: upstream}
account := &Account{
ID: 1,
Name: "openai-oauth",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{
"access_token": "oauth-token",
"chatgpt_account_id": "chatgpt-acc",
},
}
result, err := svc.ForwardAsChatCompletions(context.Background(), c, account, body, "", "gpt-5.1")
require.NoError(t, err)
require.NotNil(t, result)
require.Equal(t, 13, result.Usage.InputTokens)
require.Equal(t, 7, result.Usage.OutputTokens)
require.Equal(t, 5, result.Usage.CacheReadInputTokens)
responseBody := rec.Body.String()
require.Contains(t, responseBody, `"usage"`)
require.Contains(t, responseBody, `"prompt_tokens":13`)
require.Contains(t, responseBody, `"completion_tokens":7`)
require.Contains(t, responseBody, `"cached_tokens":5`)
}
func TestForwardAsChatCompletions_StreamsTopLevelTerminalUsage(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
body := []byte(`{"model":"gpt-5.4","messages":[{"role":"user","content":"hello"}],"stream":true}`)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
upstreamBody := strings.Join([]string{
`data: {"type":"response.created","response":{"id":"resp_top","model":"gpt-5.4","status":"in_progress","output":[]}}`,
"",
`data: {"type":"response.output_text.delta","delta":"ok"}`,
"",
`data: {"type":"response.completed","response":{"id":"resp_top","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}]},"usage":{"input_tokens":21,"output_tokens":9,"total_tokens":30,"input_tokens_details":{"cached_tokens":4}}}`,
"",
"data: [DONE]",
"",
}, "\n")
upstream := &httpUpstreamRecorder{resp: &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_top_level_usage"}},
Body: io.NopCloser(strings.NewReader(upstreamBody)),
}}
svc := &OpenAIGatewayService{httpUpstream: upstream}
account := &Account{
ID: 1,
Name: "openai-oauth",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{
"access_token": "oauth-token",
"chatgpt_account_id": "chatgpt-acc",
},
}
result, err := svc.ForwardAsChatCompletions(context.Background(), c, account, body, "", "gpt-5.1")
require.NoError(t, err)
require.NotNil(t, result)
require.Equal(t, 21, result.Usage.InputTokens)
require.Equal(t, 9, result.Usage.OutputTokens)
require.Equal(t, 4, result.Usage.CacheReadInputTokens)
responseBody := rec.Body.String()
require.Contains(t, responseBody, `"usage"`)
require.Contains(t, responseBody, `"prompt_tokens":21`)
require.Contains(t, responseBody, `"completion_tokens":9`)
require.Contains(t, responseBody, `"cached_tokens":4`)
}
func TestForwardAsChatCompletions_BufferedTopLevelTerminalUsage(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
body := []byte(`{"model":"gpt-5.4","messages":[{"role":"user","content":"hello"}],"stream":false}`)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
upstreamBody := strings.Join([]string{
`data: {"type":"response.completed","response":{"id":"resp_top_buffered","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}]},"usage":{"input_tokens":18,"output_tokens":6,"total_tokens":24,"input_tokens_details":{"cached_tokens":3}}}`,
"",
"data: [DONE]",
"",
}, "\n")
upstream := &httpUpstreamRecorder{resp: &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_buffered_top_level_usage"}},
Body: io.NopCloser(strings.NewReader(upstreamBody)),
}}
svc := &OpenAIGatewayService{httpUpstream: upstream}
account := &Account{
ID: 1,
Name: "openai-oauth",
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Concurrency: 1,
Credentials: map[string]any{
"access_token": "oauth-token",
"chatgpt_account_id": "chatgpt-acc",
},
}
result, err := svc.ForwardAsChatCompletions(context.Background(), c, account, body, "", "gpt-5.1")
require.NoError(t, err)
require.NotNil(t, result)
require.Equal(t, 18, result.Usage.InputTokens)
require.Equal(t, 6, result.Usage.OutputTokens)
require.Equal(t, 3, result.Usage.CacheReadInputTokens)
responseBody := rec.Body.String()
require.Contains(t, responseBody, `"usage"`)
require.Contains(t, responseBody, `"prompt_tokens":18`)
require.Contains(t, responseBody, `"completion_tokens":6`)
require.Contains(t, responseBody, `"cached_tokens":3`)
}
func TestForwardAsChatCompletions_TerminalUsageWithoutUpstreamCloseReturns(t *testing.T) {
gin.SetMode(gin.TestMode)

View File

@ -570,6 +570,12 @@ func (s *OpenAIGatewayService) readOpenAICompatBufferedTerminal(
if err := json.Unmarshal([]byte(payload), &event); err == nil {
acc.ProcessEvent(&event)
if isOpenAICompatResponsesTerminalEvent(event.Type) && event.Response != nil {
if event.Usage != nil {
usage = copyOpenAIUsageFromResponsesUsage(event.Usage)
if event.Response.Usage == nil {
event.Response.Usage = event.Usage
}
}
if event.Response.Usage != nil {
usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage)
}
@ -611,6 +617,12 @@ func (s *OpenAIGatewayService) readOpenAICompatBufferedTerminal(
acc.ProcessEvent(&event)
if isOpenAICompatResponsesTerminalEvent(event.Type) && event.Response != nil {
if event.Usage != nil {
usage = copyOpenAIUsageFromResponsesUsage(event.Usage)
if event.Response.Usage == nil {
event.Response.Usage = event.Usage
}
}
if event.Response.Usage != nil {
usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage)
}
@ -713,14 +725,18 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse(
return false
}
// 仅按兼容转换器支持的终止事件提取 usage避免无意扩大事件语义。
isTerminalEvent := isOpenAICompatResponsesTerminalEvent(event.Type)
if isTerminalEvent && event.Response != nil {
if id := strings.TrimSpace(event.Response.ID); id != "" {
responseID = id
if isTerminalEvent {
if event.Response != nil {
if id := strings.TrimSpace(event.Response.ID); id != "" {
responseID = id
}
if event.Response.Usage != nil {
usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage)
}
}
if event.Response.Usage != nil {
usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage)
if event.Usage != nil {
usage = copyOpenAIUsageFromResponsesUsage(event.Usage)
}
}