fix(antigravity): capture message_start input_tokens in streaming passthrough

The antigravity upstream-passthrough path (account.Type == AccountTypeUpstream
forwarding to a Claude-format upstream) drains the SSE stream via
streamUpstreamResponse + extractSSEUsage. The extractor only reads top-level
event["usage"], which matches Anthropic's message_delta but misses
message_start where usage is nested under event.message.usage.

As a result, every streaming /v1/messages request through this path drops
the input-side fields (input_tokens, cache_read_input_tokens, cache_creation_*)
and writes a usage_logs row with input_tokens=0 + output_tokens>0. The user
in #2332 observed 2,728 such rows attributed to claude-opus-4-6 / haiku-4-5
streaming requests; their billing on output is correct but the input-side
accounting is missing. (Their "duplicate write from message_delta" hypothesis
isn't borne out by the code — RecordUsage is invoked once per request and
writeUsageLogBestEffort dedupes by request_id; what they're seeing is
single records produced by this buggy extractor.)

Branch on event.type so message_start reads from event.message.usage and
other events keep using event.usage, matching how parseSSEUsagePassthrough
already handles both shapes for the Anthropic OAuth / API-key / Bedrock paths.
Adds two extractSSEUsage table cases plus a TestExtractSSEUsage_StreamingSequence
that drives the message_start → message_delta sequence end-to-end; both fail
on main and pass with this change.

Fixes #2332

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Pluviobyte 2026-05-27 09:02:15 +00:00
parent b0142146af
commit 1e6d0b602a
No known key found for this signature in database
2 changed files with 53 additions and 2 deletions

View File

@ -4463,6 +4463,14 @@ func (s *AntigravityGatewayService) streamUpstreamResponse(c *gin.Context, resp
}
// extractSSEUsage 从 SSE data 行中提取 Claude usage用于流式透传场景
//
// Anthropic streaming 的 usage 字段分布在两类事件中:
// - message_start嵌套在 event.message.usageinput_tokens、cache_creation_input_tokens、
// cache_read_input_tokens 等输入侧字段)
// - message_delta位于顶层 event.usage流结束时的最终 output_tokens
//
// 仅读取顶层 event.usage 会漏掉 message_start 的输入侧字段,导致流式透传请求落库的
// usage_logs 记录 input_tokens=0。
func (s *AntigravityGatewayService) extractSSEUsage(line string, usage *ClaudeUsage) {
if !strings.HasPrefix(line, "data: ") {
return
@ -4472,8 +4480,15 @@ func (s *AntigravityGatewayService) extractSSEUsage(line string, usage *ClaudeUs
if json.Unmarshal([]byte(dataStr), &event) != nil {
return
}
u, ok := event["usage"].(map[string]any)
if !ok {
var u map[string]any
if eventType, _ := event["type"].(string); eventType == "message_start" {
if msg, ok := event["message"].(map[string]any); ok {
u, _ = msg["usage"].(map[string]any)
}
} else {
u, _ = event["usage"].(map[string]any)
}
if u == nil {
return
}
if v, ok := u["input_tokens"].(float64); ok && int(v) > 0 {

View File

@ -1301,6 +1301,19 @@ func TestExtractSSEUsage(t *testing.T) {
line: `data: {"usage":{"input_tokens":10,"output_tokens":20,"cache_read_input_tokens":5,"cache_creation_input_tokens":3}}`,
expected: ClaudeUsage{InputTokens: 10, OutputTokens: 20, CacheReadInputTokens: 5, CacheCreationInputTokens: 3},
},
{
// Anthropic message_start 把 usage 嵌套在 message.usage 下,
// 必须从这里提取输入侧字段(含 cache_read/cache_creation_input_tokens
name: "message_start nested usage with input/cache tokens",
line: `data: {"type":"message_start","message":{"id":"msg_01","usage":{"input_tokens":35576,"cache_creation_input_tokens":0,"cache_read_input_tokens":12000,"output_tokens":1}}}`,
expected: ClaudeUsage{InputTokens: 35576, OutputTokens: 1, CacheReadInputTokens: 12000},
},
{
// message_start.message.usage.cache_creation 内的 5m/1h 明细也要解析。
name: "message_start nested usage with cache_creation breakdown",
line: `data: {"type":"message_start","message":{"usage":{"input_tokens":100,"cache_creation":{"ephemeral_5m_input_tokens":30,"ephemeral_1h_input_tokens":70}}}}`,
expected: ClaudeUsage{InputTokens: 100, CacheCreation5mTokens: 30, CacheCreation1hTokens: 70},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -1311,6 +1324,29 @@ func TestExtractSSEUsage(t *testing.T) {
}
}
// TestExtractSSEUsage_StreamingSequence 复现 issue #2332完整的 Anthropic streaming
// 序列message_start → message_delta必须把两类事件中的 usage 字段都汇入同一份累计值,
// 否则透传账号产出的 usage_logs 会出现 input_tokens=0、仅有 output_tokens 的"残缺"记录。
func TestExtractSSEUsage_StreamingSequence(t *testing.T) {
svc := &AntigravityGatewayService{}
usage := &ClaudeUsage{}
// 1) message_start携带完整输入侧 usageinput_tokens + cache_read
svc.extractSSEUsage(
`data: {"type":"message_start","message":{"id":"msg_01","type":"message","role":"assistant","content":[],"model":"claude-opus-4-6","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":35576,"cache_creation_input_tokens":0,"cache_read_input_tokens":12000,"output_tokens":1}}}`,
usage,
)
// 2) message_delta流结束时只带 output_tokens无 input_tokens 字段)
svc.extractSSEUsage(
`data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":816}}`,
usage,
)
require.Equal(t, 35576, usage.InputTokens, "message_start 的 input_tokens 必须被记录,否则记账会缺失输入侧 token (#2332)")
require.Equal(t, 12000, usage.CacheReadInputTokens, "message_start 的 cache_read_input_tokens 必须被记录")
require.Equal(t, 816, usage.OutputTokens, "message_delta 的最终 output_tokens 必须被记录")
}
// TestAntigravityClientWriter 验证 antigravityClientWriter 的断开检测
func TestAntigravityClientWriter(t *testing.T) {
t.Run("normal write succeeds", func(t *testing.T) {