sub2api/antigravity/capture/ja3_extract.py
win 85ed193ff0
Some checks failed
CI / test (push) Failing after 10s
CI / golangci-lint (push) Failing after 6s
Security Scan / backend-security (push) Failing after 8s
Security Scan / frontend-security (push) Failing after 7s
feat(tls): 更新 DoWithTLS 所有调用点至新三模式签名
- DoWithTLS 签名变更:(bool/profile) → (TLSMode, profile)
- 所有调用方传入 account.GetTLSMode() 以支持 node/utls/off 三模式
- gateway_service.go、gemini_messages_compat、forward_as_* 全部更新
- claude_usage_service 的 ClaudeUsageFetchOptions 新增 TLSMode 字段
- 新增 decompressResponseBody(gzip/brotli/deflate)到 http_upstream.go
- 新增 antigravity_privacy_service.go(setAntigravityPrivacy)
- admin_service 新增 ForceOpenAIPrivacy/EnsureAntigravityPrivacy/ForceAntigravityPrivacy
- antigravity.Client 新增 SetUserSettings/FetchUserInfo API
2026-03-27 22:29:17 +08:00

241 lines
7.9 KiB
Python

#!/usr/bin/env python3
"""
Extract JA3 fingerprint from pcap file (no tshark needed).
Parses TLS ClientHello directly from raw packets.
"""
import struct
import hashlib
import sys
def parse_pcap(filepath):
"""Parse pcap file and extract TLS ClientHello JA3 fingerprints."""
results = []
with open(filepath, 'rb') as f:
# Read pcap global header
magic = struct.unpack('<I', f.read(4))[0]
if magic == 0xa1b2c3d4:
endian = '<'
elif magic == 0xd4c3b2a1:
endian = '>'
else:
print(f"Not a pcap file (magic: {hex(magic)})")
return results
f.read(20) # rest of global header
packet_num = 0
while True:
# Read packet header
pkt_hdr = f.read(16)
if len(pkt_hdr) < 16:
break
ts_sec, ts_usec, incl_len, orig_len = struct.unpack(f'{endian}IIII', pkt_hdr)
pkt_data = f.read(incl_len)
if len(pkt_data) < incl_len:
break
packet_num += 1
# Parse Ethernet header (14 bytes)
if len(pkt_data) < 14:
continue
eth_type = struct.unpack('!H', pkt_data[12:14])[0]
if eth_type != 0x0800: # IPv4
continue
# Parse IP header
ip_start = 14
if len(pkt_data) < ip_start + 20:
continue
ip_ver_ihl = pkt_data[ip_start]
ip_ihl = (ip_ver_ihl & 0x0F) * 4
ip_proto = pkt_data[ip_start + 9]
dst_ip = '.'.join(str(b) for b in pkt_data[ip_start+16:ip_start+20])
if ip_proto != 6: # TCP
continue
# Parse TCP header
tcp_start = ip_start + ip_ihl
if len(pkt_data) < tcp_start + 20:
continue
dst_port = struct.unpack('!H', pkt_data[tcp_start+2:tcp_start+4])[0]
tcp_data_offset = ((pkt_data[tcp_start + 12] >> 4) & 0xF) * 4
# TLS record starts after TCP header
tls_start = tcp_start + tcp_data_offset
if len(pkt_data) < tls_start + 6:
continue
# Check for TLS Handshake (content type 22)
if pkt_data[tls_start] != 22:
continue
tls_version = struct.unpack('!H', pkt_data[tls_start+1:tls_start+3])[0]
tls_length = struct.unpack('!H', pkt_data[tls_start+3:tls_start+5])[0]
# Check for ClientHello (handshake type 1)
hs_start = tls_start + 5
if len(pkt_data) < hs_start + 4:
continue
if pkt_data[hs_start] != 1: # ClientHello
continue
# Parse ClientHello
try:
ja3 = extract_ja3(pkt_data, hs_start, dst_ip, dst_port)
if ja3:
results.append(ja3)
except Exception as e:
pass
return results
def extract_ja3(data, hs_start, dst_ip, dst_port):
"""Extract JA3 components from ClientHello."""
# Handshake header: type(1) + length(3)
pos = hs_start + 4
# ClientHello: version(2) + random(32)
if len(data) < pos + 34:
return None
ch_version = struct.unpack('!H', data[pos:pos+2])[0]
pos += 34 # skip version + random
# Session ID
if len(data) < pos + 1:
return None
session_id_len = data[pos]
pos += 1 + session_id_len
# Cipher Suites
if len(data) < pos + 2:
return None
cs_len = struct.unpack('!H', data[pos:pos+2])[0]
pos += 2
if len(data) < pos + cs_len:
return None
cipher_suites = []
for i in range(0, cs_len, 2):
cs = struct.unpack('!H', data[pos+i:pos+i+2])[0]
# Skip GREASE values
if (cs & 0x0f0f) == 0x0a0a:
continue
cipher_suites.append(str(cs))
pos += cs_len
# Compression methods
if len(data) < pos + 1:
return None
comp_len = data[pos]
pos += 1 + comp_len
# Extensions
extensions = []
elliptic_curves = []
ec_point_formats = []
supported_versions = []
sni = ""
if len(data) > pos + 2:
ext_total_len = struct.unpack('!H', data[pos:pos+2])[0]
pos += 2
ext_end = pos + ext_total_len
while pos + 4 <= ext_end and pos + 4 <= len(data):
ext_type = struct.unpack('!H', data[pos:pos+2])[0]
ext_len = struct.unpack('!H', data[pos+2:pos+4])[0]
ext_data_start = pos + 4
# Skip GREASE
if (ext_type & 0x0f0f) == 0x0a0a:
pos = ext_data_start + ext_len
continue
extensions.append(str(ext_type))
# SNI (type 0)
if ext_type == 0 and ext_len > 5:
try:
name_len = struct.unpack('!H', data[ext_data_start+3:ext_data_start+5])[0]
sni = data[ext_data_start+5:ext_data_start+5+name_len].decode('ascii', errors='replace')
except:
pass
# Supported Groups / Elliptic Curves (type 10)
if ext_type == 10 and ext_len >= 2:
curves_len = struct.unpack('!H', data[ext_data_start:ext_data_start+2])[0]
for i in range(0, curves_len, 2):
if ext_data_start + 2 + i + 2 <= len(data):
curve = struct.unpack('!H', data[ext_data_start+2+i:ext_data_start+2+i+2])[0]
if (curve & 0x0f0f) != 0x0a0a:
elliptic_curves.append(str(curve))
# EC Point Formats (type 11)
if ext_type == 11 and ext_len >= 1:
fmt_len = data[ext_data_start]
for i in range(fmt_len):
if ext_data_start + 1 + i < len(data):
ec_point_formats.append(str(data[ext_data_start+1+i]))
# Supported Versions (type 43)
if ext_type == 43 and ext_len >= 1:
sv_len = data[ext_data_start]
for i in range(0, sv_len, 2):
if ext_data_start + 1 + i + 2 <= len(data):
ver = struct.unpack('!H', data[ext_data_start+1+i:ext_data_start+1+i+2])[0]
if (ver & 0x0f0f) != 0x0a0a:
supported_versions.append(hex(ver))
pos = ext_data_start + ext_len
# Build JA3 string: TLSVersion,Ciphers,Extensions,EllipticCurves,ECPointFormats
ja3_str = ','.join([
str(ch_version),
'-'.join(cipher_suites),
'-'.join(extensions),
'-'.join(elliptic_curves),
'-'.join(ec_point_formats),
])
ja3_hash = hashlib.md5(ja3_str.encode()).hexdigest()
return {
'dst_ip': dst_ip,
'dst_port': dst_port,
'sni': sni,
'ja3_hash': ja3_hash,
'ja3_string': ja3_str,
'tls_version': hex(ch_version),
'cipher_count': len(cipher_suites),
'extension_count': len(extensions),
'supported_versions': supported_versions,
'ciphers': cipher_suites[:10], # first 10 for display
}
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 ja3_extract.py <pcap_file>")
sys.exit(1)
results = parse_pcap(sys.argv[1])
if not results:
print("No TLS ClientHello found in pcap")
sys.exit(0)
# Deduplicate by JA3 hash + SNI
seen = set()
for r in results:
key = f"{r['ja3_hash']}:{r['sni']}"
if key in seen:
continue
seen.add(key)
print(f"SNI: {r['sni']}")
print(f"Dest: {r['dst_ip']}:{r['dst_port']}")
print(f"JA3 Hash: {r['ja3_hash']}")
print(f"TLS Ver: {r['tls_version']}")
print(f"Ciphers: {r['cipher_count']} suites (first 10: {r['ciphers']})")
print(f"Extensions: {r['extension_count']}")
print(f"Sup. Vers: {r['supported_versions']}")
print(f"JA3 Full: {r['ja3_string'][:200]}...")
print()