feat: 遥测模拟 — 模拟 Claude CLI 的 event_logging + DataDog 日志
Some checks failed
CI / test (push) Failing after 4s
CI / golangci-lint (push) Failing after 5s
Security Scan / backend-security (push) Failing after 6s
Security Scan / frontend-security (push) Failing after 5s

基于真实 Claude CLI 2.1.81 抓包数据实现:
- POST api.anthropic.com/api/event_logging/batch(请求前后自动发送)
- POST http-intake.logs.us5.datadoghq.com/api/v2/logs
- 事件类型:tengu_started, tengu_init, tengu_api_request_started/completed
- 每个账号独立 session_id + device_id
- process_metrics base64 编码(匹配真实格式)
- 可通过 TELEMETRY_ENABLED=false 关闭
This commit is contained in:
win 2026-03-22 13:06:24 +08:00
parent 3f93d5d7bf
commit eb6bee0137

View File

@ -4,6 +4,7 @@ const http = require('http');
const https = require('https');
const http2 = require('http2');
const net = require('net');
const crypto = require('crypto');
// ─── 配置 ───────────────────────────────────────────────
const UPSTREAM_HOST = process.env.UPSTREAM_HOST || 'api.anthropic.com';
@ -12,6 +13,10 @@ const LISTEN_HOST = process.env.PROXY_HOST || '127.0.0.1';
const UPSTREAM_PROXY = process.env.UPSTREAM_PROXY || '';
const CONNECT_TIMEOUT = parseInt(process.env.CONNECT_TIMEOUT || '30000', 10);
const IDLE_TIMEOUT = parseInt(process.env.IDLE_TIMEOUT || '600000', 10);
const TELEMETRY_ENABLED = process.env.TELEMETRY_ENABLED !== 'false'; // 默认开启
const DD_API_KEY = process.env.DD_API_KEY || 'pubbbf48e6d78dae54bceaa4acf463299bf';
const CLI_VERSION = process.env.CLI_VERSION || '2.1.81';
const BUILD_TIME = process.env.BUILD_TIME || '2026-03-20T21:26:18Z';
const log = (level, msg, extra = {}) => {
const entry = { time: new Date().toISOString(), level, msg, ...extra };
@ -19,8 +24,229 @@ const log = (level, msg, extra = {}) => {
};
const HEALTH_PATH = '/__health';
const h2Hosts = new Set(); // 已知需要 H2 的主机
const h2Sessions = new Map(); // H2 session 缓存
const h2Hosts = new Set();
const h2Sessions = new Map();
// ─── 遥测模拟 ────────────────────────────────────────────
// 每个 device_id 的会话状态
const sessionStates = new Map();
function getOrCreateSession(deviceId) {
if (sessionStates.has(deviceId)) return sessionStates.get(deviceId);
const state = {
sessionId: crypto.randomUUID(),
deviceId,
startTime: Date.now(),
requestCount: 0,
};
sessionStates.set(deviceId, state);
return state;
}
function generateDeviceId(accountSeed) {
return crypto.createHash('sha256').update(`device:${accountSeed}`).digest('hex');
}
function buildEnvBlock() {
return {
platform: 'linux',
node_version: process.version,
terminal: 'xterm-256color',
package_managers: 'npm',
runtimes: 'node',
is_running_with_bun: false,
is_ci: false,
is_claubbit: false,
is_github_action: false,
is_claude_code_action: false,
is_claude_ai_auth: false,
version: CLI_VERSION,
arch: 'x64',
is_claude_code_remote: false,
deployment_environment: 'unknown-linux',
is_conductor: false,
version_base: CLI_VERSION,
build_time: BUILD_TIME,
is_local_agent_mode: false,
vcs: 'git',
platform_raw: 'linux',
};
}
function buildProcessMetrics(uptime) {
const rss = 200_000_000 + Math.floor(Math.random() * 100_000_000);
return Buffer.from(JSON.stringify({
uptime,
rss,
heapTotal: 30_000_000 + Math.floor(Math.random() * 5_000_000),
heapUsed: 40_000_000 + Math.floor(Math.random() * 20_000_000),
external: 14_000_000 + Math.floor(Math.random() * 2_000_000),
arrayBuffers: Math.floor(Math.random() * 10_000),
constrainedMemory: 0,
cpuUsage: { user: 100_000 + Math.floor(Math.random() * 300_000), system: 20_000 + Math.floor(Math.random() * 80_000) },
cpuPercent: Math.random() * 200,
})).toString('base64');
}
function buildEvent(eventName, session, model, betas) {
const uptime = (Date.now() - session.startTime) / 1000;
return {
event_type: 'ClaudeCodeInternalEvent',
event_data: {
event_name: eventName,
client_timestamp: new Date().toISOString(),
model: model || 'claude-sonnet-4-6',
session_id: session.sessionId,
user_type: 'external',
betas: betas || 'claude-code-20250219,interleaved-thinking-2025-05-14',
env: buildEnvBlock(),
entrypoint: 'cli',
is_interactive: true,
client_type: 'cli',
process: buildProcessMetrics(uptime),
event_id: crypto.randomUUID(),
device_id: session.deviceId,
},
};
}
// 发送遥测到 api.anthropic.com/api/event_logging/batch
function sendTelemetryEvents(events) {
if (!TELEMETRY_ENABLED || events.length === 0) return;
const body = JSON.stringify({ events });
const opts = {
hostname: 'api.anthropic.com',
port: 443,
path: '/api/event_logging/batch',
method: 'POST',
headers: {
'Accept': 'application/json, text/plain, */*',
'Content-Type': 'application/json',
'User-Agent': `claude-code/${CLI_VERSION}`,
'x-service-name': 'claude-code',
'Content-Length': Buffer.byteLength(body),
},
timeout: 10000,
};
const req = https.request(opts, (res) => {
res.resume(); // drain
log('debug', 'telemetry_sent', { status: res.statusCode, events: events.length });
});
req.on('error', (err) => {
log('debug', 'telemetry_error', { error: err.message });
});
req.on('timeout', () => req.destroy());
req.end(body);
}
// 发送 DataDog 日志
function sendDatadogLog(eventName, session, model) {
if (!TELEMETRY_ENABLED) return;
const uptime = (Date.now() - session.startTime) / 1000;
const entry = {
ddsource: 'nodejs',
ddtags: `event:${eventName},arch:x64,client_type:cli,model:${model || 'claude-sonnet-4-6'},platform:linux,user_type:external,version:${CLI_VERSION},version_base:${CLI_VERSION}`,
message: eventName,
service: 'claude-code',
hostname: 'claude-code',
env: 'external',
model: model || 'claude-sonnet-4-6',
session_id: session.sessionId,
user_type: 'external',
entrypoint: 'cli',
is_interactive: 'true',
client_type: 'cli',
process_metrics: {
uptime,
rss: 200_000_000 + Math.floor(Math.random() * 100_000_000),
heapTotal: 30_000_000 + Math.floor(Math.random() * 5_000_000),
heapUsed: 40_000_000 + Math.floor(Math.random() * 20_000_000),
external: 14_000_000 + Math.floor(Math.random() * 2_000_000),
arrayBuffers: Math.floor(Math.random() * 10_000),
constrainedMemory: 0,
cpuUsage: { user: 100_000 + Math.floor(Math.random() * 300_000), system: 20_000 + Math.floor(Math.random() * 80_000) },
},
platform: 'linux',
platform_raw: 'linux',
arch: 'x64',
node_version: process.version,
version: CLI_VERSION,
version_base: CLI_VERSION,
build_time: BUILD_TIME,
deployment_environment: 'unknown-linux',
vcs: 'git',
};
const body = JSON.stringify([entry]);
const opts = {
hostname: 'http-intake.logs.us5.datadoghq.com',
port: 443,
path: '/api/v2/logs',
method: 'POST',
headers: {
'Accept': 'application/json, text/plain, */*',
'Content-Type': 'application/json',
'User-Agent': 'axios/1.13.6',
'dd-api-key': DD_API_KEY,
'Content-Length': Buffer.byteLength(body),
},
timeout: 10000,
};
const req = https.request(opts, (res) => { res.resume(); });
req.on('error', () => {});
req.on('timeout', () => req.destroy());
req.end(body);
}
// 请求前发遥测(模拟 CLI 启动 + 初始化事件)
function emitPreRequestTelemetry(reqHeaders) {
const accountSeed = reqHeaders['x-forwarded-host'] || 'default';
const deviceId = generateDeviceId(accountSeed + ':' + (reqHeaders['authorization'] || '').slice(-16));
const session = getOrCreateSession(deviceId);
session.requestCount++;
const model = 'claude-sonnet-4-6';
const betas = reqHeaders['anthropic-beta'] || 'claude-code-20250219,interleaved-thinking-2025-05-14';
// 首次请求:发 tengu_started + tengu_init
if (session.requestCount === 1) {
const events = [
buildEvent('tengu_started', session, model, betas),
buildEvent('tengu_init', session, model, betas),
buildEvent('tengu_mcp_server_connection_succeeded', session, model, betas),
];
sendTelemetryEvents(events);
sendDatadogLog('tengu_started', session, model);
sendDatadogLog('tengu_init', session, model);
}
// 每次请求:发 request 相关事件
const events = [
buildEvent('tengu_api_request_started', session, model, betas),
];
sendTelemetryEvents(events);
}
// 请求后发遥测
function emitPostRequestTelemetry(reqHeaders, statusCode) {
const accountSeed = reqHeaders['x-forwarded-host'] || 'default';
const deviceId = generateDeviceId(accountSeed + ':' + (reqHeaders['authorization'] || '').slice(-16));
const session = getOrCreateSession(deviceId);
const model = 'claude-sonnet-4-6';
const betas = reqHeaders['anthropic-beta'] || 'claude-code-20250219,interleaved-thinking-2025-05-14';
const events = [
buildEvent('tengu_api_request_completed', session, model, betas),
];
sendTelemetryEvents(events);
sendDatadogLog('tengu_api_request_completed', session, model);
}
// ─── H2 session 管理 ────────────────────────────────────
function getOrCreateH2Session(host) {
@ -87,7 +313,7 @@ function collectBody(req) {
}
// ─── H1 代理 ─────────────────────────────────────────────
function sendViaH1(targetHost, method, path, reqHeaders, body, res) {
function sendViaH1(targetHost, method, path, reqHeaders, body, res, savedHeaders) {
return new Promise((resolve) => {
const headers = { ...reqHeaders, host: targetHost };
['x-forwarded-host', 'connection', 'keep-alive', 'proxy-connection', 'transfer-encoding'].forEach(h => delete headers[h]);
@ -104,13 +330,17 @@ function sendViaH1(targetHost, method, path, reqHeaders, body, res) {
delete rh['connection']; delete rh['keep-alive'];
res.writeHead(proxyRes.statusCode, rh);
proxyRes.pipe(res, { end: true });
// 请求完成后发遥测
if (path.includes('/v1/messages') && savedHeaders) {
emitPostRequestTelemetry(savedHeaders, proxyRes.statusCode);
}
resolve('ok');
});
proxyReq.on('error', (err) => {
if (err.message === 'socket hang up' && (Date.now() - startTime) < 2000) {
log('info', 'h1_rejected_switching_to_h2', { host: targetHost });
h2Hosts.add(targetHost);
sendViaH2(targetHost, method, path, reqHeaders, body, res).then(() => resolve('h2'));
sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders).then(() => resolve('h2'));
return;
}
log('error', 'h1_error', { error: err.message, host: targetHost, path });
@ -132,7 +362,7 @@ function sendViaH1(targetHost, method, path, reqHeaders, body, res) {
}
// ─── H2 代理 ─────────────────────────────────────────────
async function sendViaH2(targetHost, method, path, reqHeaders, body, res) {
async function sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders) {
try {
const session = getOrCreateH2Session(targetHost);
await waitForConnect(session);
@ -160,6 +390,9 @@ async function sendViaH2(targetHost, method, path, reqHeaders, body, res) {
res.writeHead(status, rh);
stream.on('data', (c) => res.write(c));
stream.on('end', () => res.end());
if (path.includes('/v1/messages') && savedHeaders) {
emitPostRequestTelemetry(savedHeaders, status);
}
});
stream.on('error', (err) => {
@ -193,12 +426,22 @@ async function proxyRequest(req, res) {
const targetHost = req.headers['x-forwarded-host'] || UPSTREAM_HOST;
log('info', 'proxy_request', { host: targetHost, method: req.method, path: req.url });
// 保存原始 headers 用于遥测
const savedHeaders = { ...req.headers };
const body = await collectBody(req);
// 请求前发遥测(仅 /v1/messages 请求)
if (req.url.includes('/v1/messages') && TELEMETRY_ENABLED) {
emitPreRequestTelemetry(savedHeaders);
// 随机延迟 50-200ms 模拟真实 CLI 行为
await new Promise(r => setTimeout(r, 50 + Math.floor(Math.random() * 150)));
}
if (h2Hosts.has(targetHost)) {
await sendViaH2(targetHost, req.method, req.url, req.headers, body, res);
await sendViaH2(targetHost, req.method, req.url, req.headers, body, res, savedHeaders);
} else {
await sendViaH1(targetHost, req.method, req.url, req.headers, body, res);
await sendViaH1(targetHost, req.method, req.url, req.headers, body, res, savedHeaders);
}
}
@ -206,7 +449,11 @@ async function proxyRequest(req, res) {
const server = http.createServer((req, res) => {
if (req.url === HEALTH_PATH) {
res.writeHead(200, { 'content-type': 'application/json' });
res.end(JSON.stringify({ status: 'ok', node: process.version, openssl: process.versions.openssl, uptime: process.uptime(), h2Hosts: [...h2Hosts] }));
res.end(JSON.stringify({
status: 'ok', node: process.version, openssl: process.versions.openssl,
uptime: process.uptime(), h2Hosts: [...h2Hosts],
telemetry: TELEMETRY_ENABLED, sessions: sessionStates.size,
}));
return;
}
proxyRequest(req, res).catch((err) => {
@ -219,9 +466,20 @@ server.timeout = 0;
server.keepAliveTimeout = IDLE_TIMEOUT;
server.headersTimeout = 60000;
server.listen(LISTEN_PORT, LISTEN_HOST, () => {
log('info', 'node-tls-proxy started', { listen: `${LISTEN_HOST}:${LISTEN_PORT}`, node: process.version, openssl: process.versions.openssl });
log('info', 'node-tls-proxy started', {
listen: `${LISTEN_HOST}:${LISTEN_PORT}`, node: process.version, openssl: process.versions.openssl,
telemetry: TELEMETRY_ENABLED,
});
});
// 定期清理过期 session1 小时无活动)
setInterval(() => {
const now = Date.now();
for (const [id, state] of sessionStates) {
if (now - state.startTime > 3600_000) sessionStates.delete(id);
}
}, 300_000);
let stopping = false;
function shutdown(sig) {
if (stopping) return; stopping = true;