fix: record zero OpenAI usage logs

This commit is contained in:
shaw 2026-05-03 17:11:52 +08:00
parent 72d5ee4cd1
commit 47fb38bca1
6 changed files with 85 additions and 25 deletions

View File

@ -336,7 +336,9 @@ func TestForwardAsAnthropic_TerminalUsageWithoutUpstreamCloseReturns(t *testing.
upstreamBody := []byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}` + "\n\n") upstreamBody := []byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}` + "\n\n")
upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody)
defer upstreamStream.Close() defer func() {
require.NoError(t, upstreamStream.Close())
}()
upstream := &httpUpstreamRecorder{resp: &http.Response{ upstream := &httpUpstreamRecorder{resp: &http.Response{
StatusCode: http.StatusOK, StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_terminal_no_close"}}, Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_terminal_no_close"}},
@ -389,7 +391,9 @@ func TestForwardAsAnthropic_BufferedTerminalWithoutUpstreamCloseReturns(t *testi
upstreamBody := []byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}` + "\n\n") upstreamBody := []byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}` + "\n\n")
upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody)
defer upstreamStream.Close() defer func() {
require.NoError(t, upstreamStream.Close())
}()
upstream := &httpUpstreamRecorder{resp: &http.Response{ upstream := &httpUpstreamRecorder{resp: &http.Response{
StatusCode: http.StatusOK, StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_buffered_terminal_no_close"}}, Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_buffered_terminal_no_close"}},

View File

@ -20,20 +20,29 @@ func (s *openAI403CounterResetStub) ResetOpenAI403Count(_ context.Context, accou
return nil return nil
} }
func TestOpenAIGatewayServiceRecordUsage_ResetsOpenAI403CounterBeforeZeroUsageReturn(t *testing.T) { func TestOpenAIGatewayServiceRecordUsage_ResetsOpenAI403CounterForZeroUsage(t *testing.T) {
counter := &openAI403CounterResetStub{} counter := &openAI403CounterResetStub{}
rateLimitSvc := NewRateLimitService(nil, nil, nil, nil, nil) rateLimitSvc := NewRateLimitService(nil, nil, nil, nil, nil)
rateLimitSvc.SetOpenAI403CounterCache(counter) rateLimitSvc.SetOpenAI403CounterCache(counter)
svc := &OpenAIGatewayService{ usageRepo := &openAIRecordUsageLogRepoStub{inserted: true}
rateLimitService: rateLimitSvc, billingRepo := &openAIRecordUsageBillingRepoStub{result: &UsageBillingApplyResult{Applied: true}}
} userRepo := &openAIRecordUsageUserRepoStub{}
subRepo := &openAIRecordUsageSubRepoStub{}
svc := newOpenAIRecordUsageServiceWithBillingRepoForTest(usageRepo, billingRepo, userRepo, subRepo, nil)
svc.rateLimitService = rateLimitSvc
err := svc.RecordUsage(context.Background(), &OpenAIRecordUsageInput{ err := svc.RecordUsage(context.Background(), &OpenAIRecordUsageInput{
Result: &OpenAIForwardResult{}, Result: &OpenAIForwardResult{
RequestID: "resp_zero_usage_reset_403",
Model: "gpt-5.1",
},
APIKey: &APIKey{ID: 1001, Group: &Group{RateMultiplier: 1}},
User: &User{ID: 2001},
Account: &Account{ID: 777, Platform: PlatformOpenAI}, Account: &Account{ID: 777, Platform: PlatformOpenAI},
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []int64{777}, counter.resetCalls) require.Equal(t, []int64{777}, counter.resetCalls)
require.Equal(t, 1, usageRepo.calls)
} }

View File

@ -156,7 +156,9 @@ func TestForwardAsChatCompletions_TerminalUsageWithoutUpstreamCloseReturns(t *te
upstreamBody := []byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":17,"output_tokens":8,"total_tokens":25,"input_tokens_details":{"cached_tokens":6}}}}` + "\n\n") upstreamBody := []byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":17,"output_tokens":8,"total_tokens":25,"input_tokens_details":{"cached_tokens":6}}}}` + "\n\n")
upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody)
defer upstreamStream.Close() defer func() {
require.NoError(t, upstreamStream.Close())
}()
upstream := &httpUpstreamRecorder{resp: &http.Response{ upstream := &httpUpstreamRecorder{resp: &http.Response{
StatusCode: http.StatusOK, StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_terminal_no_close"}}, Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_terminal_no_close"}},
@ -209,7 +211,9 @@ func TestForwardAsChatCompletions_BufferedTerminalWithoutUpstreamCloseReturns(t
upstreamBody := []byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":17,"output_tokens":8,"total_tokens":25,"input_tokens_details":{"cached_tokens":6}}}}` + "\n\n") upstreamBody := []byte(`data: {"type":"response.completed","response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":17,"output_tokens":8,"total_tokens":25,"input_tokens_details":{"cached_tokens":6}}}}` + "\n\n")
upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody)
defer upstreamStream.Close() defer func() {
require.NoError(t, upstreamStream.Close())
}()
upstream := &httpUpstreamRecorder{resp: &http.Response{ upstream := &httpUpstreamRecorder{resp: &http.Response{
StatusCode: http.StatusOK, StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_buffered_terminal_no_close"}}, Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_buffered_terminal_no_close"}},

View File

@ -441,13 +441,13 @@ func (s *OpenAIGatewayService) readOpenAICompatBufferedTerminal(
return nil, usage, acc, ev.err return nil, usage, acc, ev.err
} }
if isOpenAICompatDoneSentinelLine(ev.line) {
return nil, usage, acc, nil
}
payload, ok := extractOpenAISSEDataLine(ev.line) payload, ok := extractOpenAISSEDataLine(ev.line)
if !ok || payload == "" { if !ok || payload == "" {
continue continue
} }
if strings.TrimSpace(payload) == "[DONE]" {
return nil, usage, acc, nil
}
var event apicompat.ResponsesStreamEvent var event apicompat.ResponsesStreamEvent
if err := json.Unmarshal([]byte(payload), &event); err != nil { if err := json.Unmarshal([]byte(payload), &event); err != nil {
@ -640,13 +640,13 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse(
if streamInterval <= 0 && keepaliveInterval <= 0 { if streamInterval <= 0 && keepaliveInterval <= 0 {
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text() line := scanner.Text()
if isOpenAICompatDoneSentinelLine(line) {
return missingTerminalErr()
}
payload, ok := extractOpenAISSEDataLine(line) payload, ok := extractOpenAISSEDataLine(line)
if !ok { if !ok {
continue continue
} }
if strings.TrimSpace(payload) == "[DONE]" {
return missingTerminalErr()
}
if processDataLine(payload) { if processDataLine(payload) {
return finalizeStream() return finalizeStream()
} }
@ -713,13 +713,13 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse(
} }
lastDataAt = time.Now() lastDataAt = time.Now()
line := ev.line line := ev.line
if isOpenAICompatDoneSentinelLine(line) {
return missingTerminalErr()
}
payload, ok := extractOpenAISSEDataLine(line) payload, ok := extractOpenAISSEDataLine(line)
if !ok { if !ok {
continue continue
} }
if strings.TrimSpace(payload) == "[DONE]" {
return missingTerminalErr()
}
if processDataLine(payload) { if processDataLine(payload) {
return finalizeStream() return finalizeStream()
} }

View File

@ -186,6 +186,56 @@ func max(a, b int) int {
return b return b
} }
func TestOpenAIGatewayServiceRecordUsage_ZeroUsageStillWritesUsageLog(t *testing.T) {
usageRepo := &openAIRecordUsageLogRepoStub{inserted: true}
billingRepo := &openAIRecordUsageBillingRepoStub{result: &UsageBillingApplyResult{Applied: true}}
userRepo := &openAIRecordUsageUserRepoStub{}
subRepo := &openAIRecordUsageSubRepoStub{}
quotaSvc := &openAIRecordUsageAPIKeyQuotaStub{}
svc := newOpenAIRecordUsageServiceWithBillingRepoForTest(usageRepo, billingRepo, userRepo, subRepo, nil)
err := svc.RecordUsage(context.Background(), &OpenAIRecordUsageInput{
Result: &OpenAIForwardResult{
RequestID: "resp_zero_usage",
Usage: OpenAIUsage{},
Model: "gpt-5.1",
Duration: time.Second,
},
APIKey: &APIKey{ID: 1000, Quota: 100, Group: &Group{RateMultiplier: 1}},
User: &User{ID: 2000},
Account: &Account{ID: 3000, Type: AccountTypeAPIKey},
APIKeyService: quotaSvc,
})
require.NoError(t, err)
require.Equal(t, 1, billingRepo.calls)
require.Equal(t, 1, usageRepo.calls)
require.Equal(t, 0, userRepo.deductCalls)
require.Equal(t, 0, subRepo.incrementCalls)
require.Equal(t, 0, quotaSvc.quotaCalls)
require.Equal(t, 0, quotaSvc.rateLimitCalls)
require.NotNil(t, usageRepo.lastLog)
require.Equal(t, "resp_zero_usage", usageRepo.lastLog.RequestID)
require.Zero(t, usageRepo.lastLog.InputTokens)
require.Zero(t, usageRepo.lastLog.OutputTokens)
require.Zero(t, usageRepo.lastLog.CacheCreationTokens)
require.Zero(t, usageRepo.lastLog.CacheReadTokens)
require.Zero(t, usageRepo.lastLog.ImageOutputTokens)
require.Zero(t, usageRepo.lastLog.ImageCount)
require.Zero(t, usageRepo.lastLog.InputCost)
require.Zero(t, usageRepo.lastLog.OutputCost)
require.Zero(t, usageRepo.lastLog.TotalCost)
require.Zero(t, usageRepo.lastLog.ActualCost)
require.NotNil(t, billingRepo.lastCmd)
require.Zero(t, billingRepo.lastCmd.BalanceCost)
require.Zero(t, billingRepo.lastCmd.SubscriptionCost)
require.Zero(t, billingRepo.lastCmd.APIKeyQuotaCost)
require.Zero(t, billingRepo.lastCmd.APIKeyRateLimitCost)
require.Zero(t, billingRepo.lastCmd.AccountQuotaCost)
}
func TestOpenAIGatewayServiceRecordUsage_UsesUserSpecificGroupRate(t *testing.T) { func TestOpenAIGatewayServiceRecordUsage_UsesUserSpecificGroupRate(t *testing.T) {
groupID := int64(11) groupID := int64(11)
groupRate := 1.4 groupRate := 1.4

View File

@ -5041,13 +5041,6 @@ func (s *OpenAIGatewayService) RecordUsage(ctx context.Context, input *OpenAIRec
s.rateLimitService.ResetOpenAI403Counter(ctx, input.Account.ID) s.rateLimitService.ResetOpenAI403Counter(ctx, input.Account.ID)
} }
// 跳过所有 token 均为零的用量记录——上游未返回 usage 时不应写入数据库
if result.Usage.InputTokens == 0 && result.Usage.OutputTokens == 0 &&
result.Usage.CacheCreationInputTokens == 0 && result.Usage.CacheReadInputTokens == 0 &&
result.Usage.ImageOutputTokens == 0 && result.ImageCount == 0 {
return nil
}
apiKey := input.APIKey apiKey := input.APIKey
user := input.User user := input.User
account := input.Account account := input.Account