// E2E 验证工具:对真实 Antigravity 账号验证本轮优化的 4 项功能。 // // 用法(凭据通过环境变量传入,避免提交到仓库): // // export ANTIGRAVITY_E2E_ACCESS_TOKEN=ya29.... // export ANTIGRAVITY_E2E_REFRESH_TOKEN=1//... // export ANTIGRAVITY_E2E_PROJECT_ID=mega-rhythm-890z1 // export ANTIGRAVITY_E2E_PROXY=socks5://user:pwd@host:port # 可选 // go run ./cmd/test_antigravity_e2e // // 验证目标: // 1. 动态 UA:拉取的 antigravity/<最新版> / // 2. Token 端点 UA:用 refresh_token 换新 token,确认 Go-http-client/2.0 不被拒 // 3. LoadCodeAssist 余额提取:paidTier.availableCredits 写入账号 Extra // 4. 业务请求 + 图像生成 requestId 形态对比 package main import ( "context" "crypto/tls" "encoding/json" "fmt" "io" "net/http" "os" "strings" "time" "github.com/Wei-Shaw/sub2api/internal/pkg/antigravity" "github.com/Wei-Shaw/sub2api/internal/pkg/proxyurl" "github.com/Wei-Shaw/sub2api/internal/pkg/proxyutil" "github.com/Wei-Shaw/sub2api/internal/pkg/tlsfingerprint" ) func main() { accessToken := mustEnv("ANTIGRAVITY_E2E_ACCESS_TOKEN") refreshToken := mustEnv("ANTIGRAVITY_E2E_REFRESH_TOKEN") projectID := mustEnv("ANTIGRAVITY_E2E_PROJECT_ID") proxyURL := os.Getenv("ANTIGRAVITY_E2E_PROXY") ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() step("1/5", "动态版本号 UA") // 触发后台拉取再读取一次(首次 init 已启动,等 fetcher 拿到值) time.Sleep(3 * time.Second) fmt.Printf(" GetUserAgent() = %q\n", antigravity.GetUserAgent()) client, err := antigravity.NewClient(proxyURL) if err != nil { fail("create client: %v", err) } step("2/5", "Token 端点 UA:RefreshToken 验证 Go-http-client/2.0 通过") tokenResp, err := client.RefreshToken(ctx, refreshToken, false) if err != nil { fail("refresh failed: %v", err) } fmt.Printf(" new access_token len=%d, expires_in=%d\n", len(tokenResp.AccessToken), tokenResp.ExpiresIn) if tokenResp.AccessToken != "" { accessToken = tokenResp.AccessToken } step("3/5", "LoadCodeAssist:提取 paidTier.availableCredits 余额") loadResp, _, err := client.LoadCodeAssist(ctx, accessToken) if err != nil { fail("loadCodeAssist failed: %v", err) } fmt.Printf(" project=%q tier=%q\n", loadResp.CloudAICompanionProject, loadResp.GetTier()) credits := loadResp.GetAvailableCredits() fmt.Printf(" credits 条数=%d\n", len(credits)) for _, c := range credits { fmt.Printf(" type=%s amount=%s minimum=%s\n", c.CreditType, c.CreditAmount, c.MinimumCreditAmountForUsage) } step("4/5", "构造普通请求 payload,验证 requestId=agent-") normalBody := buildPayload("claude-sonnet-4-5", projectID) checkRequestIDPrefix(normalBody, "agent-", false) step("5/5", "构造图像生成请求 payload,验证 requestId=image_gen///12") imgBody := buildPayload("gemini-3.1-flash-image", projectID) checkRequestIDPrefix(imgBody, "image_gen/", true) step("✓", "实际发送一次普通对话验证上游 200(走 SOCKS5 代理)") if err := sendOnceAndCheck(ctx, accessToken, projectID, proxyURL); err != nil { fmt.Printf(" [WARN] 上游返回非 200:%v(可能因模型/配额限制,不影响 UA/路由验证)\n", err) } else { fmt.Printf(" 上游 200 OK\n") } _ = client fmt.Println("\nE2E 验证完成。") } func step(idx, desc string) { fmt.Printf("\n[%s] %s\n", idx, desc) } func fail(format string, args ...any) { fmt.Fprintf(os.Stderr, "FAIL: "+format+"\n", args...) os.Exit(1) } func mustEnv(name string) string { v := strings.TrimSpace(os.Getenv(name)) if v == "" { fail("missing env %s", name) } return v } func buildPayload(model, projectID string) []byte { return buildPayloadWithCredits(model, projectID, false) } func buildPayloadWithCredits(model, projectID string, enableCredits bool) []byte { req := &antigravity.ClaudeRequest{ Model: model, MaxTokens: 16, Messages: []antigravity.ClaudeMessage{ {Role: "user", Content: json.RawMessage(`[{"type":"text","text":"Reply with exactly one word: OK"}]`)}, }, } opts := antigravity.DefaultTransformOptions() opts.EnableAICredits = enableCredits // 与 acct_test 工具对齐:关闭 identity patch,发最简 payload opts.EnableIdentityPatch = false opts.EnableMCPXML = false body, err := antigravity.TransformClaudeToGeminiWithOptions(req, projectID, model, opts) if err != nil { fail("transform: %v", err) } return body } func checkRequestIDPrefix(body []byte, wantPrefix string, mustHaveImageGenSuffix bool) { var v antigravity.V1InternalRequest if err := json.Unmarshal(body, &v); err != nil { fail("unmarshal: %v", err) } fmt.Printf(" requestId = %q\n", v.RequestID) fmt.Printf(" requestType = %q\n", v.RequestType) if !strings.HasPrefix(v.RequestID, wantPrefix) { fail("requestId 应以 %q 开头", wantPrefix) } if mustHaveImageGenSuffix { parts := strings.Split(v.RequestID, "/") if len(parts) != 4 || parts[3] != "12" { fail("image_gen requestId 格式错误: %s", v.RequestID) } } } func sendOnceAndCheck(ctx context.Context, accessToken, projectID, proxyURL string) error { // 启用 enabledCreditTypes=["GOOGLE_ONE_AI"],让请求落到付费 credits(账号有 102 GOOGLE_ONE_AI 余额) body := buildPayloadWithCredits("gemini-2.5-flash", projectID, true) fmt.Printf(" payload (with credits): %s\n", abbreviate(string(body))) // 三级 URL fallback 实测:prod → daily → sandbox 任一个 200 即通过 urls := antigravity.BaseURLs if len(urls) == 0 { return fmt.Errorf("no forward base URLs") } hc := newProxyHTTPClient(proxyURL) var lastErr error for _, baseURL := range urls { req, err := antigravity.NewAPIRequestWithURL(ctx, baseURL, "generateContent", accessToken, body) if err != nil { return err } resp, err := hc.Do(req) if err != nil { lastErr = err fmt.Printf(" %s → 网络错误:%v\n", baseURL, err) continue } respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 16*1024)) _ = resp.Body.Close() fmt.Printf(" %s → HTTP %d\n", baseURL, resp.StatusCode) fmt.Printf(" body: %s\n", string(respBody)) if resp.StatusCode == http.StatusOK { return nil } lastErr = fmt.Errorf("status=%d", resp.StatusCode) } return lastErr } func abbreviate(s string) string { if len(s) > 200 { return s[:100] + "...[truncated]..." + s[len(s)-50:] } return s } // newProxyHTTPClient 构造一个走 SOCKS5 代理 + Node.js TLS 指纹的 http.Client。 // 与生产路径一致:utls Node.js 24.x 指纹,避免 Google 把裸 Go ClientHello 限流。 func newProxyHTTPClient(proxyURL string) *http.Client { hc := &http.Client{Timeout: 60 * time.Second} profile := &tlsfingerprint.Profile{Name: "claude_cli_builtin", EnableGREASE: true} transport := &http.Transport{ ForceAttemptHTTP2: false, TLSNextProto: map[string]func(string, *tls.Conn) http.RoundTripper{}, ResponseHeaderTimeout: 30 * time.Second, } _, parsed, err := proxyurl.Parse(proxyURL) if err == nil && parsed != nil { switch parsed.Scheme { case "socks5", "socks5h": d := tlsfingerprint.NewSOCKS5ProxyDialer(profile, parsed) transport.DialTLSContext = d.DialTLSContext case "http", "https": d := tlsfingerprint.NewHTTPProxyDialer(profile, parsed) transport.DialTLSContext = d.DialTLSContext default: d := tlsfingerprint.NewDialer(profile, nil) transport.DialTLSContext = d.DialTLSContext _ = proxyutil.ConfigureTransportProxy(transport, parsed) } } else { d := tlsfingerprint.NewDialer(profile, nil) transport.DialTLSContext = d.DialTLSContext } hc.Transport = transport return hc }