From 47fb38bca13fd53d0c85e9db113c5405a6d6b3e4 Mon Sep 17 00:00:00 2001 From: shaw Date: Sun, 3 May 2026 17:11:52 +0800 Subject: [PATCH] fix: record zero OpenAI usage logs --- .../service/openai_compat_model_test.go | 8 ++- .../service/openai_gateway_403_reset_test.go | 19 +++++-- .../openai_gateway_chat_completions_test.go | 8 ++- .../service/openai_gateway_messages.go | 18 +++---- .../openai_gateway_record_usage_test.go | 50 +++++++++++++++++++ .../service/openai_gateway_service.go | 7 --- 6 files changed, 85 insertions(+), 25 deletions(-) diff --git a/backend/internal/service/openai_compat_model_test.go b/backend/internal/service/openai_compat_model_test.go index 1129bf04..840784bf 100644 --- a/backend/internal/service/openai_compat_model_test.go +++ b/backend/internal/service/openai_compat_model_test.go @@ -336,7 +336,9 @@ func TestForwardAsAnthropic_TerminalUsageWithoutUpstreamCloseReturns(t *testing. upstreamBody := []byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}` + "\n\n") upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) - defer upstreamStream.Close() + defer func() { + require.NoError(t, upstreamStream.Close()) + }() upstream := &httpUpstreamRecorder{resp: &http.Response{ StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_terminal_no_close"}}, @@ -389,7 +391,9 @@ func TestForwardAsAnthropic_BufferedTerminalWithoutUpstreamCloseReturns(t *testi upstreamBody := []byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}` + "\n\n") upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) - defer upstreamStream.Close() + defer func() { + require.NoError(t, upstreamStream.Close()) + }() upstream := &httpUpstreamRecorder{resp: &http.Response{ StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_buffered_terminal_no_close"}}, diff --git a/backend/internal/service/openai_gateway_403_reset_test.go b/backend/internal/service/openai_gateway_403_reset_test.go index c6805464..440b94a9 100644 --- a/backend/internal/service/openai_gateway_403_reset_test.go +++ b/backend/internal/service/openai_gateway_403_reset_test.go @@ -20,20 +20,29 @@ func (s *openAI403CounterResetStub) ResetOpenAI403Count(_ context.Context, accou return nil } -func TestOpenAIGatewayServiceRecordUsage_ResetsOpenAI403CounterBeforeZeroUsageReturn(t *testing.T) { +func TestOpenAIGatewayServiceRecordUsage_ResetsOpenAI403CounterForZeroUsage(t *testing.T) { counter := &openAI403CounterResetStub{} rateLimitSvc := NewRateLimitService(nil, nil, nil, nil, nil) rateLimitSvc.SetOpenAI403CounterCache(counter) - svc := &OpenAIGatewayService{ - rateLimitService: rateLimitSvc, - } + usageRepo := &openAIRecordUsageLogRepoStub{inserted: true} + billingRepo := &openAIRecordUsageBillingRepoStub{result: &UsageBillingApplyResult{Applied: true}} + userRepo := &openAIRecordUsageUserRepoStub{} + subRepo := &openAIRecordUsageSubRepoStub{} + svc := newOpenAIRecordUsageServiceWithBillingRepoForTest(usageRepo, billingRepo, userRepo, subRepo, nil) + svc.rateLimitService = rateLimitSvc err := svc.RecordUsage(context.Background(), &OpenAIRecordUsageInput{ - Result: &OpenAIForwardResult{}, + Result: &OpenAIForwardResult{ + RequestID: "resp_zero_usage_reset_403", + Model: "gpt-5.1", + }, + APIKey: &APIKey{ID: 1001, Group: &Group{RateMultiplier: 1}}, + User: &User{ID: 2001}, Account: &Account{ID: 777, Platform: PlatformOpenAI}, }) require.NoError(t, err) require.Equal(t, []int64{777}, counter.resetCalls) + require.Equal(t, 1, usageRepo.calls) } diff --git a/backend/internal/service/openai_gateway_chat_completions_test.go b/backend/internal/service/openai_gateway_chat_completions_test.go index 1236fb2c..c129a4df 100644 --- a/backend/internal/service/openai_gateway_chat_completions_test.go +++ b/backend/internal/service/openai_gateway_chat_completions_test.go @@ -156,7 +156,9 @@ func TestForwardAsChatCompletions_TerminalUsageWithoutUpstreamCloseReturns(t *te upstreamBody := []byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":17,"output_tokens":8,"total_tokens":25,"input_tokens_details":{"cached_tokens":6}}}}` + "\n\n") upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) - defer upstreamStream.Close() + defer func() { + require.NoError(t, upstreamStream.Close()) + }() upstream := &httpUpstreamRecorder{resp: &http.Response{ StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_terminal_no_close"}}, @@ -209,7 +211,9 @@ func TestForwardAsChatCompletions_BufferedTerminalWithoutUpstreamCloseReturns(t upstreamBody := []byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":17,"output_tokens":8,"total_tokens":25,"input_tokens_details":{"cached_tokens":6}}}}` + "\n\n") upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) - defer upstreamStream.Close() + defer func() { + require.NoError(t, upstreamStream.Close()) + }() upstream := &httpUpstreamRecorder{resp: &http.Response{ StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_buffered_terminal_no_close"}}, diff --git a/backend/internal/service/openai_gateway_messages.go b/backend/internal/service/openai_gateway_messages.go index 9fd6f04c..5f3bf5c1 100644 --- a/backend/internal/service/openai_gateway_messages.go +++ b/backend/internal/service/openai_gateway_messages.go @@ -441,13 +441,13 @@ func (s *OpenAIGatewayService) readOpenAICompatBufferedTerminal( return nil, usage, acc, ev.err } + if isOpenAICompatDoneSentinelLine(ev.line) { + return nil, usage, acc, nil + } payload, ok := extractOpenAISSEDataLine(ev.line) if !ok || payload == "" { continue } - if strings.TrimSpace(payload) == "[DONE]" { - return nil, usage, acc, nil - } var event apicompat.ResponsesStreamEvent if err := json.Unmarshal([]byte(payload), &event); err != nil { @@ -640,13 +640,13 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( if streamInterval <= 0 && keepaliveInterval <= 0 { for scanner.Scan() { line := scanner.Text() + if isOpenAICompatDoneSentinelLine(line) { + return missingTerminalErr() + } payload, ok := extractOpenAISSEDataLine(line) if !ok { continue } - if strings.TrimSpace(payload) == "[DONE]" { - return missingTerminalErr() - } if processDataLine(payload) { return finalizeStream() } @@ -713,13 +713,13 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( } lastDataAt = time.Now() line := ev.line + if isOpenAICompatDoneSentinelLine(line) { + return missingTerminalErr() + } payload, ok := extractOpenAISSEDataLine(line) if !ok { continue } - if strings.TrimSpace(payload) == "[DONE]" { - return missingTerminalErr() - } if processDataLine(payload) { return finalizeStream() } diff --git a/backend/internal/service/openai_gateway_record_usage_test.go b/backend/internal/service/openai_gateway_record_usage_test.go index 47ff4e3b..76fbb794 100644 --- a/backend/internal/service/openai_gateway_record_usage_test.go +++ b/backend/internal/service/openai_gateway_record_usage_test.go @@ -186,6 +186,56 @@ func max(a, b int) int { return b } +func TestOpenAIGatewayServiceRecordUsage_ZeroUsageStillWritesUsageLog(t *testing.T) { + usageRepo := &openAIRecordUsageLogRepoStub{inserted: true} + billingRepo := &openAIRecordUsageBillingRepoStub{result: &UsageBillingApplyResult{Applied: true}} + userRepo := &openAIRecordUsageUserRepoStub{} + subRepo := &openAIRecordUsageSubRepoStub{} + quotaSvc := &openAIRecordUsageAPIKeyQuotaStub{} + svc := newOpenAIRecordUsageServiceWithBillingRepoForTest(usageRepo, billingRepo, userRepo, subRepo, nil) + + err := svc.RecordUsage(context.Background(), &OpenAIRecordUsageInput{ + Result: &OpenAIForwardResult{ + RequestID: "resp_zero_usage", + Usage: OpenAIUsage{}, + Model: "gpt-5.1", + Duration: time.Second, + }, + APIKey: &APIKey{ID: 1000, Quota: 100, Group: &Group{RateMultiplier: 1}}, + User: &User{ID: 2000}, + Account: &Account{ID: 3000, Type: AccountTypeAPIKey}, + APIKeyService: quotaSvc, + }) + + require.NoError(t, err) + require.Equal(t, 1, billingRepo.calls) + require.Equal(t, 1, usageRepo.calls) + require.Equal(t, 0, userRepo.deductCalls) + require.Equal(t, 0, subRepo.incrementCalls) + require.Equal(t, 0, quotaSvc.quotaCalls) + require.Equal(t, 0, quotaSvc.rateLimitCalls) + + require.NotNil(t, usageRepo.lastLog) + require.Equal(t, "resp_zero_usage", usageRepo.lastLog.RequestID) + require.Zero(t, usageRepo.lastLog.InputTokens) + require.Zero(t, usageRepo.lastLog.OutputTokens) + require.Zero(t, usageRepo.lastLog.CacheCreationTokens) + require.Zero(t, usageRepo.lastLog.CacheReadTokens) + require.Zero(t, usageRepo.lastLog.ImageOutputTokens) + require.Zero(t, usageRepo.lastLog.ImageCount) + require.Zero(t, usageRepo.lastLog.InputCost) + require.Zero(t, usageRepo.lastLog.OutputCost) + require.Zero(t, usageRepo.lastLog.TotalCost) + require.Zero(t, usageRepo.lastLog.ActualCost) + + require.NotNil(t, billingRepo.lastCmd) + require.Zero(t, billingRepo.lastCmd.BalanceCost) + require.Zero(t, billingRepo.lastCmd.SubscriptionCost) + require.Zero(t, billingRepo.lastCmd.APIKeyQuotaCost) + require.Zero(t, billingRepo.lastCmd.APIKeyRateLimitCost) + require.Zero(t, billingRepo.lastCmd.AccountQuotaCost) +} + func TestOpenAIGatewayServiceRecordUsage_UsesUserSpecificGroupRate(t *testing.T) { groupID := int64(11) groupRate := 1.4 diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index d1d73586..b818fa4a 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -5041,13 +5041,6 @@ func (s *OpenAIGatewayService) RecordUsage(ctx context.Context, input *OpenAIRec s.rateLimitService.ResetOpenAI403Counter(ctx, input.Account.ID) } - // 跳过所有 token 均为零的用量记录——上游未返回 usage 时不应写入数据库 - if result.Usage.InputTokens == 0 && result.Usage.OutputTokens == 0 && - result.Usage.CacheCreationInputTokens == 0 && result.Usage.CacheReadInputTokens == 0 && - result.Usage.ImageOutputTokens == 0 && result.ImageCount == 0 { - return nil - } - apiKey := input.APIKey user := input.User account := input.Account