""" MiniGravity Traffic Capture - mitmproxy addon Captures and categorizes traffic from Claude Code and Antigravity IDE. Records: headers (with ordering), body, TLS info, timing. Usage: # Claude Code (terminal) HTTPS_PROXY=http://127.0.0.1:8080 claude login HTTPS_PROXY=http://127.0.0.1:8080 claude "hello" # Antigravity (VS Code) - set proxy in VS Code settings or env HTTPS_PROXY=http://127.0.0.1:8080 code . # Start mitmproxy with this addon mitmproxy -s capture_traffic.py --set stream_large_bodies=10m # or headless: mitmdump -s capture_traffic.py --set stream_large_bodies=10m Output: ./captures/ - JSON files per request ./captures/_summary.jsonl - One-line-per-request summary ./captures/_report.txt - Human-readable report (generated on exit) """ import json import os import time import hashlib from datetime import datetime, timezone from pathlib import Path from mitmproxy import http, ctx, tls from mitmproxy.net.http.http1.assemble import assemble_request_head # ─── Target domains and classification ─── TARGET_DOMAINS = { # Claude / Anthropic "claude.ai", "platform.claude.com", "api.anthropic.com", # Google / Antigravity "accounts.google.com", "oauth2.googleapis.com", "cloudaicompanion.googleapis.com", "generativelanguage.googleapis.com", # Telemetry "http-intake.logs.us5.datadoghq.com", "sentry.io", } def classify_request(flow: http.HTTPFlow) -> dict: """Classify a request by source tool and purpose.""" host = flow.request.pretty_host path = flow.request.path method = flow.request.method ua = flow.request.headers.get("user-agent", "") # Determine source tool source = "unknown" if "claude-cli" in ua or "claude-code" in ua: source = "claude-cli" elif "node" in ua.lower() and ("stainless" in str(flow.request.headers)): source = "claude-cli" elif "axios" in ua: source = "claude-cli-sdk" elif "vscode" in ua.lower() or "visual studio" in ua.lower(): source = "vscode-extension" elif "electron" in ua.lower(): source = "desktop-app" elif "chrome" in ua.lower() or "safari" in ua.lower() or "mozilla" in ua.lower(): source = "browser" elif "node" in ua.lower(): source = "node-generic" elif "python" in ua.lower(): source = "python-client" elif "go-http" in ua.lower() or "go/" in ua.lower(): source = "go-client" # Determine request purpose purpose = "unknown" # OAuth flows if "/oauth/authorize" in path: purpose = "oauth-authorize" elif "/oauth/token" in path or "/v1/oauth/token" in path: # Distinguish exchange vs refresh body = _get_request_body_str(flow) if "refresh_token" in body: purpose = "oauth-token-refresh" else: purpose = "oauth-token-exchange" elif "/o/oauth2" in path or "/oauth2/" in path: purpose = "google-oauth" # API calls elif "/v1/messages" in path: purpose = "api-messages" elif "/v1/complete" in path: purpose = "api-complete" # Organization / setup elif "/api/organizations" in path: purpose = "org-list" elif "/v1/oauth/" in path and "/authorize" in path: purpose = "oauth-authorize-api" # Telemetry elif "/api/event_logging" in path: purpose = "telemetry-otel" elif "datadoghq.com" in host: purpose = "telemetry-datadog" elif "sentry" in host: purpose = "telemetry-sentry" # Google AI elif "cloudaicompanion" in host: purpose = "antigravity-api" elif "generativelanguage" in host: purpose = "gemini-api" return { "source": source, "purpose": purpose, } def _get_request_body_str(flow: http.HTTPFlow) -> str: """Safely get request body as string.""" try: if flow.request.content: return flow.request.content.decode("utf-8", errors="replace") except Exception: pass return "" def _get_response_body_str(flow: http.HTTPFlow, max_len: int = 4096) -> str: """Safely get response body as string, truncated.""" try: if flow.response and flow.response.content: body = flow.response.content.decode("utf-8", errors="replace") if len(body) > max_len: return body[:max_len] + f"\n... [truncated, total {len(body)} bytes]" return body except Exception: pass return "" def _parse_json_body(body_str: str) -> any: """Try to parse body as JSON, return raw string if fails.""" if not body_str: return None try: return json.loads(body_str) except (json.JSONDecodeError, ValueError): return body_str def _get_tls_info(flow: http.HTTPFlow) -> dict: """Extract available TLS information from the flow.""" info = {} if flow.server_conn and flow.server_conn.tls_version: info["tls_version"] = flow.server_conn.tls_version if flow.server_conn and hasattr(flow.server_conn, "alpn_proto_negotiated"): info["alpn"] = ( flow.server_conn.alpn_proto_negotiated.decode() if flow.server_conn.alpn_proto_negotiated else None ) # Client TLS info (what the client sent to mitmproxy) if flow.client_conn: if hasattr(flow.client_conn, "tls_version") and flow.client_conn.tls_version: info["client_tls_version"] = flow.client_conn.tls_version if ( hasattr(flow.client_conn, "alpn_proto_negotiated") and flow.client_conn.alpn_proto_negotiated ): info["client_alpn"] = flow.client_conn.alpn_proto_negotiated.decode() # SNI if hasattr(flow.client_conn, "sni") and flow.client_conn.sni: info["client_sni"] = flow.client_conn.sni return info class TrafficCapture: def __init__(self): self.capture_dir = Path("./captures") self.capture_dir.mkdir(exist_ok=True) self.summary_file = self.capture_dir / "_summary.jsonl" self.counter = 0 self.captures = [] # Write session start marker session_start = { "event": "session_start", "timestamp": datetime.now(timezone.utc).isoformat(), "note": "New capture session started", } with open(self.summary_file, "a") as f: f.write(json.dumps(session_start) + "\n") ctx.log.info( f"[capture] Traffic capture started. Output: {self.capture_dir.absolute()}" ) def request(self, flow: http.HTTPFlow): """Tag requests to target domains.""" host = flow.request.pretty_host is_target = any(host == d or host.endswith("." + d) for d in TARGET_DOMAINS) flow.metadata["is_target"] = is_target if is_target: flow.metadata["capture_time_start"] = time.time() def response(self, flow: http.HTTPFlow): """Capture complete request/response for target domains.""" if not flow.metadata.get("is_target"): return self.counter += 1 classification = classify_request(flow) elapsed = None if flow.metadata.get("capture_time_start"): elapsed = round(time.time() - flow.metadata["capture_time_start"], 3) # Build ordered header list (order matters for fingerprinting!) request_headers_ordered = [ [k, v] for k, v in flow.request.headers.fields ] request_headers_ordered_decoded = [] for k, v in request_headers_ordered: try: request_headers_ordered_decoded.append( [k.decode("utf-8", errors="replace"), v.decode("utf-8", errors="replace")] ) except AttributeError: request_headers_ordered_decoded.append([str(k), str(v)]) response_headers_ordered = [] if flow.response: for k, v in flow.response.headers.fields: try: response_headers_ordered.append( [k.decode("utf-8", errors="replace"), v.decode("utf-8", errors="replace")] ) except AttributeError: response_headers_ordered.append([str(k), str(v)]) req_body = _get_request_body_str(flow) resp_body = _get_response_body_str(flow) # Redact sensitive values req_body_parsed = _parse_json_body(req_body) if isinstance(req_body_parsed, dict): req_body_parsed = _redact_sensitive(req_body_parsed) resp_body_parsed = _parse_json_body(resp_body) if isinstance(resp_body_parsed, dict): resp_body_parsed = _redact_sensitive(resp_body_parsed) record = { "id": self.counter, "timestamp": datetime.now(timezone.utc).isoformat(), "elapsed_sec": elapsed, # Classification "source": classification["source"], "purpose": classification["purpose"], # Request "request": { "method": flow.request.method, "url": flow.request.pretty_url, "host": flow.request.pretty_host, "path": flow.request.path, "http_version": flow.request.http_version, "headers_ordered": request_headers_ordered_decoded, "body": req_body_parsed, "content_length": len(flow.request.content) if flow.request.content else 0, }, # Response "response": { "status_code": flow.response.status_code if flow.response else None, "http_version": flow.response.http_version if flow.response else None, "headers_ordered": response_headers_ordered, "body": resp_body_parsed, "content_length": ( len(flow.response.content) if flow.response and flow.response.content else 0 ), }, # TLS "tls": _get_tls_info(flow), # Connection "connection": { "client_address": ( f"{flow.client_conn.peername[0]}:{flow.client_conn.peername[1]}" if flow.client_conn.peername else None ), "server_address": ( f"{flow.server_conn.peername[0]}:{flow.server_conn.peername[1]}" if flow.server_conn and flow.server_conn.peername else None ), }, } self.captures.append(record) # Save individual capture file filename = ( f"{self.counter:04d}_{classification['source']}" f"_{classification['purpose']}" f"_{flow.request.pretty_host}.json" ) filepath = self.capture_dir / filename with open(filepath, "w") as f: json.dump(record, f, indent=2, ensure_ascii=False, default=str) # Append to summary summary_line = { "id": self.counter, "ts": datetime.now(timezone.utc).strftime("%H:%M:%S"), "source": classification["source"], "purpose": classification["purpose"], "method": flow.request.method, "url": flow.request.pretty_url[:120], "status": flow.response.status_code if flow.response else None, "ua": flow.request.headers.get("user-agent", "")[:80], "elapsed": elapsed, } with open(self.summary_file, "a") as f: f.write(json.dumps(summary_line) + "\n") # Console output status = flow.response.status_code if flow.response else "???" ctx.log.info( f"[capture #{self.counter}] " f"[{classification['source']}] " f"[{classification['purpose']}] " f"{flow.request.method} {flow.request.pretty_url[:80]} " f"→ {status} " f"({elapsed}s)" ) # Highlight important findings ua = flow.request.headers.get("user-agent", "") if classification["purpose"] in ( "oauth-token-exchange", "oauth-token-refresh", ): ctx.log.warn( f"[capture] TOKEN EXCHANGE/REFRESH detected!\n" f" UA: {ua}\n" f" Headers: {[h[0] for h in request_headers_ordered_decoded]}" ) def done(self): """Generate report on exit.""" if not self.captures: ctx.log.info("[capture] No captures recorded.") return report_path = self.capture_dir / "_report.txt" with open(report_path, "w") as f: f.write("=" * 80 + "\n") f.write(" MiniGravity Traffic Capture Report\n") f.write(f" Generated: {datetime.now().isoformat()}\n") f.write(f" Total requests captured: {len(self.captures)}\n") f.write("=" * 80 + "\n\n") # Group by source by_source = {} for cap in self.captures: src = cap["source"] if src not in by_source: by_source[src] = [] by_source[src].append(cap) for source, caps in sorted(by_source.items()): f.write(f"\n{'─' * 60}\n") f.write(f" Source: {source} ({len(caps)} requests)\n") f.write(f"{'─' * 60}\n\n") # Group by purpose within source by_purpose = {} for cap in caps: p = cap["purpose"] if p not in by_purpose: by_purpose[p] = [] by_purpose[p].append(cap) for purpose, pcaps in sorted(by_purpose.items()): f.write(f" [{purpose}] ({len(pcaps)} requests)\n\n") for cap in pcaps: req = cap["request"] f.write(f" #{cap['id']} {req['method']} {req['url'][:100]}\n") f.write(f" HTTP Version: {req['http_version']}\n") f.write(" Request Headers (ordered):\n") for hdr in req["headers_ordered"]: val = hdr[1] # Truncate long values if len(val) > 100: val = val[:100] + "..." f.write(f" {hdr[0]}: {val}\n") if req["body"]: body_str = json.dumps( req["body"], indent=6, ensure_ascii=False, default=str ) if len(body_str) > 500: body_str = body_str[:500] + "\n ..." f.write(f" Request Body:\n {body_str}\n") resp = cap["response"] f.write(f" Response: {resp['status_code']}\n") if cap["tls"]: f.write(f" TLS: {json.dumps(cap['tls'])}\n") f.write("\n") # Comparison section f.write(f"\n{'=' * 80}\n") f.write(" FINGERPRINT COMPARISON\n") f.write(f"{'=' * 80}\n\n") # Collect unique UA per source+purpose ua_map = {} for cap in self.captures: key = f"{cap['source']}:{cap['purpose']}" ua = dict(cap["request"]["headers_ordered"]).get("user-agent", "N/A") if key not in ua_map: ua_map[key] = set() ua_map[key].add(ua) f.write(" User-Agent by source:purpose\n") for key, uas in sorted(ua_map.items()): for ua in uas: f.write(f" {key:40s} → {ua}\n") # Collect header sets per source+purpose f.write("\n Header names by source:purpose\n") header_map = {} for cap in self.captures: key = f"{cap['source']}:{cap['purpose']}" hdrs = tuple(h[0].lower() for h in cap["request"]["headers_ordered"]) if key not in header_map: header_map[key] = set() header_map[key].add(hdrs) for key, hdr_sets in sorted(header_map.items()): for hdrs in hdr_sets: f.write(f" {key}:\n") for h in hdrs: f.write(f" - {h}\n") f.write("\n") ctx.log.info( f"[capture] Report written to {report_path.absolute()}\n" f"[capture] {len(self.captures)} requests captured in {self.capture_dir.absolute()}" ) def _redact_sensitive(d: dict) -> dict: """Redact sensitive values in a dict, preserving structure.""" sensitive_keys = { "access_token", "refresh_token", "code", "code_verifier", "session_key", "sessionKey", "password", "secret", "authorization", "cookie", } result = {} for k, v in d.items(): if k.lower() in {s.lower() for s in sensitive_keys}: if isinstance(v, str) and len(v) > 8: result[k] = v[:4] + "****" + v[-4:] else: result[k] = "****" elif isinstance(v, dict): result[k] = _redact_sensitive(v) elif isinstance(v, list): result[k] = [ _redact_sensitive(item) if isinstance(item, dict) else item for item in v ] else: result[k] = v return result addons = [TrafficCapture()]