// Package telemetry simulates the real Claude Code CLI's OTEL telemetry events. // // Real CLI emits events to two channels: // 1. Anthropic event_logging/batch (first-party events) // 2. Datadog log intake (third-party observability) // // Ported from antigravity/node-tls-proxy/proxy.js — see that file for JS original. package telemetry import ( "bytes" "crypto/sha256" "encoding/base64" "encoding/hex" "encoding/json" "fmt" "log/slog" "math" "math/rand" "net/http" "strings" "sync" "time" claude "github.com/Wei-Shaw/sub2api/internal/pkg/claude" ) // ─── Constants ─────────────────────────────────────────── const ( ddAPIKey = "pubbbf48e6d78dae54bceaa4acf463299bf" buildTime = "2026-04-12T01:48:09Z" sessionMaxAge = time.Hour sessionCleanup = 5 * time.Minute telemetryTimeout = 10 * time.Second firstPartyBatchSize = 24 firstPartyFlushInterval = 1500 * time.Millisecond firstPartyRetryBaseDelay = 2 * time.Second firstPartyRetryMaxAttempt = 3 firstPartyFailedBatchCap = 64 ) func fakeNodeVersion() string { return claude.DefaultStainlessRuntimeVersion } // ─── Virtual Host Identity ─────────────────────────────── var ( mbpNames = []string{"alex", "sam", "chris", "max", "lee", "kai", "jamie", "taylor", "morgan", "casey", "drew", "avery", "riley", "blake", "jordan", "ryan", "parker", "quinn", "reese", "cameron"} mbpSuffix = []string{"-MBP", "-MacBook", "-MacBook-Pro", "-MacBook-Air", "s-MBP", "s-MacBook", "s-MacBook-Pro"} ) type hostIdentity struct { Hostname string Username string Terminal string Shell string MachineID string Arch string OSVersion string KernelRelease string ExecPath string RipgrepVersion string RipgrepPath string McpServerCount int McpFailCount int } func hashField(seed, field string) []byte { h := sha256.Sum256([]byte(seed + ":" + field)) return h[:] } func generateHostIdentity(seed string) hostIdentity { hb := hashField(seed, "hostname") name := mbpNames[int(hb[0])%len(mbpNames)] sfx := mbpSuffix[int(hb[1])%len(mbpSuffix)] termRoll := int(hashField(seed, "terminal")[0]) % 100 var terminal string switch { case termRoll < 75: terminal = "xterm-256color" case termRoll < 88: terminal = "screen-256color" case termRoll < 96: terminal = "alacritty" default: terminal = "kitty" } shellRoll := int(hashField(seed, "shell")[0]) % 100 var shell string switch { case shellRoll < 65: shell = "/bin/zsh" case shellRoll < 82: shell = "/usr/local/bin/zsh" case shellRoll < 93: shell = "/bin/bash" default: shell = "/opt/homebrew/bin/fish" } mid := hashField(seed, "machine-id") machineID := fmt.Sprintf("%s-%s-%s-%s-%s", strings.ToUpper(hex.EncodeToString(mid[0:4])), strings.ToUpper(hex.EncodeToString(mid[4:6])), strings.ToUpper(hex.EncodeToString(mid[6:8])), strings.ToUpper(hex.EncodeToString(mid[8:10])), strings.ToUpper(hex.EncodeToString(mid[10:16])), ) osb := hashField(seed, "os") major := 13 + int(osb[0])%3 minor := int(osb[1]) % 8 patch := int(osb[2]) % 5 darwinMajor := major + 9 // macOS 13 = Darwin 22 darwinMinor := int(osb[3]) % 7 darwinPatch := int(osb[4]) % 3 archRoll := int(hashField(seed, "arch")[0]) % 100 arch := "arm64" if archRoll >= 70 { arch = "x64" } execRoll := int(hashField(seed, "exec")[0]) % 100 var execPath string switch { case execRoll < 40: execPath = "/usr/local/bin/claude" case execRoll < 70: execPath = "/opt/homebrew/bin/claude" case execRoll < 90: execPath = fmt.Sprintf("/Users/%s/.npm-global/bin/claude", name) default: execPath = fmt.Sprintf("/Users/%s/.local/bin/claude", name) } rgVersions := []string{"14.1.1", "14.1.0", "14.0.3", "14.0.2", "13.0.0", "14.1.2", "14.0.1"} rgPaths := []string{"/opt/homebrew/bin/rg", "/usr/local/bin/rg", "/Users/" + name + "/.cargo/bin/rg", "/usr/bin/rg"} rb := hashField(seed, "ripgrep") return hostIdentity{ Hostname: name + sfx, Username: name, Terminal: terminal, Shell: shell, MachineID: machineID, Arch: arch, OSVersion: fmt.Sprintf("%d.%d.%d", major, minor, patch), KernelRelease: fmt.Sprintf("%d.%d.%d", darwinMajor, darwinMinor, darwinPatch), ExecPath: execPath, RipgrepVersion: rgVersions[int(rb[0])%len(rgVersions)], RipgrepPath: rgPaths[int(rb[1])%len(rgPaths)], McpServerCount: int(rb[2])%5 + 1, McpFailCount: int(rb[3]) % 3, } } // ─── Session State ─────────────────────────────────────── type sessionState struct { SessionID string DeviceID string HostID hostIdentity StartTime time.Time RequestCount int64 RipgrepReported bool LastModel string LastBetas string LastAuthToken string LastAuthHeader string LastRequestAt time.Time LastStatusCode int } var ( sessions = make(map[string]*sessionState) sessionsMu sync.Mutex ) func init() { go func() { ticker := time.NewTicker(sessionCleanup) defer ticker.Stop() for range ticker.C { now := time.Now() sessionsMu.Lock() for k, s := range sessions { if now.Sub(s.StartTime) > sessionMaxAge { delete(sessions, k) } } sessionsMu.Unlock() } }() } func generateDeviceID(accountSeed string) string { h := sha256.Sum256([]byte("device:" + accountSeed)) return hex.EncodeToString(h[:]) } func getOrCreateSession(deviceID string) *sessionState { sessionsMu.Lock() defer sessionsMu.Unlock() if s, ok := sessions[deviceID]; ok { return s } s := &sessionState{ SessionID: generateUUID(), DeviceID: deviceID, HostID: generateHostIdentity(deviceID), StartTime: time.Now(), } sessions[deviceID] = s return s } func getSession(deviceID string) *sessionState { sessionsMu.Lock() defer sessionsMu.Unlock() return sessions[deviceID] } func deleteSession(deviceID string) { sessionsMu.Lock() delete(sessions, deviceID) sessionsMu.Unlock() } func generateUUID() string { b := make([]byte, 16) rand.Read(b) b[6] = (b[6] & 0x0f) | 0x40 // version 4 b[8] = (b[8] & 0x3f) | 0x80 // variant return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16]) } // ─── Process Metrics Simulation ────────────────────────── func buildProcessMetrics(uptime float64) string { baseRss := 180_000_000.0 + math.Min(uptime*50_000, 200_000_000) rss := int64(baseRss + rand.Float64()*80_000_000) heapTotal := int64(float64(rss)*0.6 + rand.Float64()*10_000_000) heapUsed := int64(float64(heapTotal)*0.5 + rand.Float64()*float64(heapTotal)*0.3) metrics := map[string]any{ "uptime": uptime, "rss": rss, "heapTotal": heapTotal, "heapUsed": heapUsed, "external": 14_000_000 + rand.Intn(2_000_000), "arrayBuffers": rand.Intn(200_000), "constrainedMemory": 51539607552, "cpuUsage": map[string]int64{ "user": int64(uptime*10_000 + rand.Float64()*300_000), "system": int64(uptime*2_000 + rand.Float64()*80_000), }, "cpuPercent": rand.Float64() * 200, } data, _ := json.Marshal(metrics) return base64.StdEncoding.EncodeToString(data) } // ─── Env Block ─────────────────────────────────────────── func buildEnvBlock(hostID hostIdentity) map[string]any { return map[string]any{ "platform": "darwin", "node_version": fakeNodeVersion(), "terminal": hostID.Terminal, "package_managers": "npm,pnpm", "runtimes": "deno,node", "is_running_with_bun": true, "is_ci": false, "is_claubbit": false, "is_github_action": false, "is_claude_code_action": false, "is_claude_ai_auth": false, "version": claude.DefaultCLIVersion, "arch": hostID.Arch, "is_claude_code_remote": false, "deployment_environment": "unknown-darwin", "is_conductor": false, "version_base": claude.DefaultCLIVersion, "build_time": buildTime, "is_local_agent_mode": false, "vcs": "git", "platform_raw": "darwin", } } // ─── Event Building ────────────────────────────────────── type eventWrapper struct { EventType string `json:"event_type"` EventData map[string]any `json:"event_data"` } func buildEvent(eventName string, session *sessionState, model, betas string, extraData map[string]any, tsOverride string) eventWrapper { uptime := time.Since(session.StartTime).Seconds() pm := buildProcessMetrics(uptime) ts := tsOverride if ts == "" { ts = time.Now().UTC().Format(time.RFC3339Nano) } if model == "" { model = "claude-sonnet-4-6" } if betas == "" { betas = "claude-code-20250219,interleaved-thinking-2025-05-14" } data := map[string]any{ "event_name": eventName, "client_timestamp": ts, "model": model, "session_id": session.SessionID, "user_type": "external", "betas": betas, "env": buildEnvBlock(session.HostID), "entrypoint": "cli", "is_interactive": true, "client_type": "cli", "process": pm, "event_id": generateUUID(), "device_id": session.DeviceID, } for k, v := range extraData { data[k] = v } return eventWrapper{ EventType: "ClaudeCodeInternalEvent", EventData: data, } } // ─── Send Functions ────────────────────────────────────── var httpClient = &http.Client{Timeout: telemetryTimeout} type queuedTelemetryBatch struct { authToken string events []eventWrapper attempt int } var ( telemetryQueueMu sync.Mutex pendingTelemetry = make(map[string][]eventWrapper) flushTimer *time.Timer failedTelemetryQueue []queuedTelemetryBatch retryTimer *time.Timer ) func sendTelemetryEvents(events []eventWrapper, _ *sessionState, authToken string) { if len(events) == 0 { return } enqueueTelemetryEvents(authToken, events) } func enqueueTelemetryEvents(authToken string, events []eventWrapper) { if len(events) == 0 { return } telemetryQueueMu.Lock() pendingTelemetry[authToken] = append(pendingTelemetry[authToken], events...) if len(pendingTelemetry[authToken]) >= firstPartyBatchSize { batch := queuedTelemetryBatch{ authToken: authToken, events: append([]eventWrapper(nil), pendingTelemetry[authToken]...), attempt: 1, } delete(pendingTelemetry, authToken) telemetryQueueMu.Unlock() go sendBatchWithRetry(batch) return } scheduleFlushLocked() telemetryQueueMu.Unlock() } func scheduleFlushLocked() { if flushTimer != nil { return } flushTimer = time.AfterFunc(firstPartyFlushInterval, flushPendingTelemetryBatches) } func flushPendingTelemetryBatches() { telemetryQueueMu.Lock() batches := make([]queuedTelemetryBatch, 0, len(pendingTelemetry)) for authToken, events := range pendingTelemetry { if len(events) == 0 { continue } batches = append(batches, queuedTelemetryBatch{ authToken: authToken, events: append([]eventWrapper(nil), events...), attempt: 1, }) } pendingTelemetry = make(map[string][]eventWrapper) flushTimer = nil telemetryQueueMu.Unlock() for _, batch := range batches { go sendBatchWithRetry(batch) } } func sendBatchWithRetry(batch queuedTelemetryBatch) { if len(batch.events) == 0 { return } if err := postTelemetryBatch(batch.authToken, batch.events); err == nil { slog.Debug("telemetry_sent", "events", len(batch.events), "attempt", batch.attempt) return } else if batch.attempt < firstPartyRetryMaxAttempt { delay := firstPartyRetryBaseDelay * time.Duration(1<<(batch.attempt-1)) slog.Debug("telemetry_retry", "attempt", batch.attempt, "delay_ms", delay.Milliseconds(), "error", err.Error()) next := batch next.attempt++ time.AfterFunc(delay, func() { sendBatchWithRetry(next) }) return } else { slog.Debug("telemetry_queue_failed", "events", len(batch.events), "error", err.Error()) queueFailedEvents(batch) } } func queueFailedEvents(batch queuedTelemetryBatch) { telemetryQueueMu.Lock() if len(failedTelemetryQueue) >= firstPartyFailedBatchCap { failedTelemetryQueue = failedTelemetryQueue[1:] } failedTelemetryQueue = append(failedTelemetryQueue, queuedTelemetryBatch{ authToken: batch.authToken, events: append([]eventWrapper(nil), batch.events...), attempt: 1, }) scheduleRetryFailedEventsLocked() telemetryQueueMu.Unlock() } func scheduleRetryFailedEventsLocked() { if retryTimer != nil { return } retryTimer = time.AfterFunc(firstPartyRetryBaseDelay*4, retryFailedEvents) } func retryFailedEvents() { telemetryQueueMu.Lock() batches := failedTelemetryQueue failedTelemetryQueue = nil retryTimer = nil telemetryQueueMu.Unlock() for _, batch := range batches { go sendBatchWithRetry(batch) } } func postTelemetryBatch(authToken string, events []eventWrapper) error { payload := map[string]any{"events": events} body, err := json.Marshal(payload) if err != nil { return err } req, err := http.NewRequest("POST", "https://api.anthropic.com/api/event_logging/batch", bytes.NewReader(body)) if err != nil { return err } req.Header.Set("Accept", "application/json, text/plain, */*") req.Header.Set("Content-Type", "application/json") req.Header.Set("User-Agent", claude.DefaultCodeUserAgent()) req.Header.Set("x-service-name", "claude-code") if authToken != "" { req.Header.Set("Authorization", "Bearer "+authToken) } resp, err := httpClient.Do(req) if err != nil { return err } resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { return fmt.Errorf("event_logging status %d", resp.StatusCode) } return nil } func sendDatadogLog(eventName string, session *sessionState, model string) { hostID := session.HostID uptime := time.Since(session.StartTime).Seconds() if model == "" { model = "claude-sonnet-4-6" } baseRss := 180_000_000.0 + math.Min(uptime*50_000, 200_000_000) rss := int64(baseRss + rand.Float64()*80_000_000) heapTotal := int64(float64(rss)*0.6 + rand.Float64()*10_000_000) heapUsed := int64(float64(heapTotal)*0.5 + rand.Float64()*float64(heapTotal)*0.3) pm := map[string]any{ "uptime": uptime, "rss": rss, "heapTotal": heapTotal, "heapUsed": heapUsed, "external": 14_000_000 + rand.Intn(2_000_000), "arrayBuffers": rand.Intn(10_000), "constrainedMemory": 0, "cpuUsage": map[string]int64{ "user": int64(uptime*10_000 + rand.Float64()*300_000), "system": int64(uptime*2_000 + rand.Float64()*80_000), }, } entry := map[string]any{ "ddsource": "nodejs", "ddtags": fmt.Sprintf("event:%s,arch:%s,client_type:cli,model:%s,platform:darwin,user_type:external,version:%s,version_base:%s", eventName, hostID.Arch, model, claude.DefaultCLIVersion, claude.DefaultCLIVersion), "message": eventName, "service": "claude-code", "hostname": "claude-code", "env": "external", "model": model, "session_id": session.SessionID, "user_type": "external", "entrypoint": "cli", "is_interactive": "true", "client_type": "cli", "process_metrics": pm, "platform": "darwin", "platform_raw": "darwin", "arch": hostID.Arch, "node_version": fakeNodeVersion(), "version": claude.DefaultCLIVersion, "version_base": claude.DefaultCLIVersion, "build_time": buildTime, "deployment_environment": "unknown-darwin", "vcs": "git", } body, err := json.Marshal([]any{entry}) if err != nil { return } req, err := http.NewRequest("POST", "https://http-intake.logs.us5.datadoghq.com/api/v2/logs", bytes.NewReader(body)) if err != nil { return } req.Header.Set("Accept", "application/json, text/plain, */*") req.Header.Set("Content-Type", "application/json") req.Header.Set("User-Agent", "axios/1.13.6") req.Header.Set("DD-API-KEY", ddAPIKey) resp, err := httpClient.Do(req) if err != nil { return } resp.Body.Close() } // ─── Public API ────────────────────────────────────────── // EmitPreRequest fires pre-request telemetry events for a /v1/messages request. // accountSeed should be a stable identifier for the account (e.g. account ID or OAuth token suffix). // authHeader is the Authorization header value (used for device ID derivation). // authToken is the raw OAuth token (without "Bearer " prefix) for 1P auth. // model is the model name from the request body (e.g. "claude-sonnet-4-6"). // betaHeader is the anthropic-beta header value. func EmitPreRequest(accountSeed, authHeader, authToken, model, betaHeader string) { authSuffix := authHeader if len(authSuffix) > 16 { authSuffix = authSuffix[len(authSuffix)-16:] } deviceID := generateDeviceID(accountSeed + ":" + authSuffix) session := getOrCreateSession(deviceID) session.RequestCount++ if model == "" { model = "claude-sonnet-4-6" } betas := betaHeader if betas == "" { betas = claude.DefaultBetaHeader } session.LastModel = model session.LastBetas = betas session.LastAuthToken = authToken session.LastAuthHeader = authHeader session.LastRequestAt = time.Now() // First request: full startup sequence if session.RequestCount == 1 { hostID := session.HostID baseTime := time.Now() ts := func(offsetMs int) string { return baseTime.Add(time.Duration(offsetMs) * time.Millisecond).UTC().Format(time.RFC3339Nano) } batch1 := []eventWrapper{ buildEvent("tengu_started", session, model, betas, nil, ts(0)), buildEvent("tengu_init", session, model, betas, nil, ts(80+rand.Intn(120))), buildEvent("tengu_ripgrep_availability", session, model, betas, map[string]any{ "ripgrep_available": true, "ripgrep_version": hostID.RipgrepVersion, "ripgrep_path": hostID.RipgrepPath, }, ts(200+rand.Intn(150))), } // MCP connection events mcpOffset := 400 mcpSuccessCount := hostID.McpServerCount - hostID.McpFailCount for i := 0; i < hostID.McpFailCount; i++ { mcpOffset += 100 + rand.Intn(300) batch1 = append(batch1, buildEvent("tengu_mcp_server_connection_failed", session, model, betas, nil, ts(mcpOffset))) } for i := 0; i < mcpSuccessCount; i++ { mcpOffset += 200 + rand.Intn(500) batch1 = append(batch1, buildEvent("tengu_mcp_server_connection_succeeded", session, model, betas, nil, ts(mcpOffset))) } session.RipgrepReported = true go sendTelemetryEvents(batch1, session, authToken) go sendDatadogLog("tengu_started", session, model) go sendDatadogLog("tengu_init", session, model) // Delayed batch (~25-35s later, matches real CLI timing) go func() { time.Sleep(time.Duration(25000+rand.Intn(10000)) * time.Millisecond) sendTelemetryEvents([]eventWrapper{ buildEvent("tengu_session_init", session, model, betas, nil, ""), buildEvent("tengu_context_loaded", session, model, betas, nil, ""), }, session, authToken) }() } // Every request: tengu_api_query (real CLI event name) go sendTelemetryEvents([]eventWrapper{ buildEvent("tengu_api_query", session, model, betas, nil, ""), }, session, authToken) } // EmitPostRequest fires post-request telemetry events after upstream response. func EmitPostRequest(accountSeed, authHeader, authToken, model, betaHeader string, statusCode int) { authSuffix := authHeader if len(authSuffix) > 16 { authSuffix = authSuffix[len(authSuffix)-16:] } deviceID := generateDeviceID(accountSeed + ":" + authSuffix) session := getOrCreateSession(deviceID) if model == "" { model = "claude-sonnet-4-6" } betas := betaHeader if betas == "" { betas = claude.DefaultBetaHeader } session.LastModel = model session.LastBetas = betas session.LastAuthToken = authToken session.LastAuthHeader = authHeader session.LastRequestAt = time.Now() session.LastStatusCode = statusCode // Real CLI uses tengu_api_success on success, tengu_api_error on failure if statusCode < 400 { events := []eventWrapper{ buildEvent("tengu_api_success", session, model, betas, nil, ""), } go sendTelemetryEvents(events, session, authToken) go sendDatadogLog("tengu_api_success", session, model) } else { var errMsg string switch { case statusCode == 429: errMsg = "rate_limit_exceeded" case statusCode == 529: errMsg = "overloaded" case statusCode >= 500: errMsg = "server_error" default: errMsg = "client_error" } errEvent := buildEvent("tengu_api_error", session, model, betas, map[string]any{ "error_type": "TelemetrySafeError", "error_code": statusCode, "error_message": errMsg, }, "") go sendTelemetryEvents([]eventWrapper{errEvent}, session, authToken) go sendDatadogLog("tengu_api_error", session, model) } // Random tool_use event (30% probability, 2-7s delay) if rand.Float64() < 0.3 { go func() { time.Sleep(time.Duration(2000+rand.Intn(5000)) * time.Millisecond) sendTelemetryEvents([]eventWrapper{ buildEvent("tengu_tool_use_success", session, model, betas, nil, ""), }, session, authToken) }() } } // EmitExit fires a real tengu_exit event for an existing session. func EmitExit(accountSeed, authHeader, authToken, model, betaHeader string, extraData map[string]any) { authSuffix := authHeader if len(authSuffix) > 16 { authSuffix = authSuffix[len(authSuffix)-16:] } deviceID := generateDeviceID(accountSeed + ":" + authSuffix) session := getSession(deviceID) if session == nil { return } if model == "" { model = session.LastModel } if model == "" { model = "claude-sonnet-4-6" } if betaHeader == "" { betaHeader = session.LastBetas } if betaHeader == "" { betaHeader = claude.DefaultBetaHeader } if authToken == "" { authToken = session.LastAuthToken } payload := map[string]any{ "last_session_id": session.SessionID, "last_session_duration": time.Since(session.StartTime).Milliseconds(), "last_session_request_count": session.RequestCount, } if session.LastStatusCode > 0 { payload["last_session_status_code"] = session.LastStatusCode } for k, v := range extraData { payload[k] = v } go sendTelemetryEvents([]eventWrapper{ buildEvent("tengu_exit", session, model, betaHeader, payload, ""), }, session, authToken) go sendDatadogLog("tengu_exit", session, model) deleteSession(deviceID) } // Jitter returns a random delay to inject before forwarding a request. // 80% fast (80-300ms exponential), 20% slow (400-1200ms uniform). func Jitter() time.Duration { if rand.Float64() < 0.80 { ms := 80.0 + (-math.Log(rand.Float64()) * 90.0) return time.Duration(ms) * time.Millisecond } ms := 400.0 + rand.Float64()*800.0 return time.Duration(ms) * time.Millisecond }