sub2api/backend/internal/service/windsurf_refresh_service.go
win 21325afb33
Some checks failed
CI / test (push) Failing after 10s
CI / frontend (push) Failing after 8s
CI / golangci-lint (push) Failing after 5s
Security Scan / backend-security (push) Failing after 5s
Security Scan / frontend-security (push) Failing after 4s
feat(windsurf): 补全ops日志记录与endpoint派生,对齐其他平台
- windsurf_gateway_service: 添加上游延迟/TTFT/错误上下文记录
- endpoint: DeriveUpstreamEndpoint 添加 PlatformWindsurf 分支
- ops_error_logger: guessPlatformFromPath 添加 /windsurf/ 识别
2026-04-23 20:46:27 +08:00

274 lines
6.5 KiB
Go

package service
import (
"context"
"log/slog"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/domain"
"github.com/Wei-Shaw/sub2api/internal/pkg/windsurf"
)
type WindsurfRefreshService struct {
cfg config.WindsurfConfig
accountRepo AccountRepository
proxyRepo ProxyRepository
authClient *windsurf.AuthClient
stopCh chan struct{}
wg sync.WaitGroup
}
func NewWindsurfRefreshService(
cfg config.WindsurfConfig,
accountRepo AccountRepository,
proxyRepo ProxyRepository,
authClient *windsurf.AuthClient,
) *WindsurfRefreshService {
return &WindsurfRefreshService{
cfg: cfg,
accountRepo: accountRepo,
proxyRepo: proxyRepo,
authClient: authClient,
stopCh: make(chan struct{}),
}
}
func (s *WindsurfRefreshService) Start() {
if !s.cfg.Refresh.Enabled {
slog.Info("windsurf_refresh_disabled")
return
}
interval := s.cfg.Refresh.TokenScanInterval
if interval <= 0 {
interval = 5 * time.Minute
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.tokenRefreshLoop(interval)
}()
statusInterval := s.cfg.Refresh.StatusRefreshInterval
if statusInterval > 0 {
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.statusRefreshLoop(statusInterval)
}()
}
slog.Info("windsurf_refresh_started",
"token_interval", interval,
"status_interval", statusInterval,
)
}
func (s *WindsurfRefreshService) Stop() {
close(s.stopCh)
s.wg.Wait()
}
func (s *WindsurfRefreshService) tokenRefreshLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-s.stopCh:
return
case <-ticker.C:
s.scanAndRefreshTokens()
}
}
}
func (s *WindsurfRefreshService) statusRefreshLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-s.stopCh:
return
case <-ticker.C:
s.scanAndRefreshStatus()
}
}
}
func (s *WindsurfRefreshService) scanAndRefreshTokens() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
accounts, err := s.accountRepo.ListByPlatform(ctx, domain.PlatformWindsurf)
if err != nil {
slog.Error("windsurf_refresh_list_failed", "error", err)
return
}
beforeExpiry := s.cfg.Refresh.RefreshBeforeExpiry
if beforeExpiry <= 0 {
beforeExpiry = 10 * time.Minute
}
concurrency := s.cfg.Refresh.WorkerConcurrency
if concurrency <= 0 {
concurrency = 4
}
sem := make(chan struct{}, concurrency)
var wg sync.WaitGroup
for i := range accounts {
acct := accounts[i]
creds := LoadWindsurfCredentials(acct.Credentials)
if !creds.NeedsRefresh(beforeExpiry) {
continue
}
if creds.AuthMethod != "firebase" || creds.RefreshToken == "" {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
s.refreshOneToken(ctx, &acct, creds)
}()
}
wg.Wait()
}
func (s *WindsurfRefreshService) refreshOneToken(ctx context.Context, account *Account, creds WindsurfCredentials) {
proxyURL := ""
if account.ProxyID != nil {
proxy, err := s.proxyRepo.GetByID(ctx, *account.ProxyID)
if err == nil {
proxyURL = proxy.URL()
}
}
result, err := s.authClient.RefreshFirebaseToken(ctx, creds.RefreshToken, proxyURL)
if err != nil {
extra := LoadWindsurfExtra(account.Extra)
extra.Refresh.TokenRefreshFailures++
account.Extra = StoreWindsurfExtra(extra)
_ = s.accountRepo.Update(ctx, account)
slog.Warn("windsurf_token_refresh_failed", "account_id", account.ID, "error", err)
return
}
creds.IDToken = result.IDToken
creds.RefreshToken = result.RefreshToken
creds.ExpiresAt = time.Now().Add(time.Duration(result.ExpiresIn) * time.Second).Format(time.RFC3339)
creds.LastRefreshAt = time.Now().Format(time.RFC3339)
regResult, err := s.authClient.ReRegisterWithCodeium(ctx, result.IDToken, proxyURL)
if err != nil {
slog.Warn("windsurf_reregister_failed", "account_id", account.ID, "error", err)
} else {
creds.APIKey = regResult.APIKey
creds.LastReregisterAt = time.Now().Format(time.RFC3339)
}
account.Credentials = StoreWindsurfCredentials(creds)
extra := LoadWindsurfExtra(account.Extra)
extra.Refresh.LastTokenRefreshAt = time.Now().Format(time.RFC3339)
extra.Refresh.TokenRefreshFailures = 0
account.Extra = StoreWindsurfExtra(extra)
if err := s.accountRepo.Update(ctx, account); err != nil {
slog.Error("windsurf_refresh_save_failed", "account_id", account.ID, "error", err)
}
}
func (s *WindsurfRefreshService) scanAndRefreshStatus() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
accounts, err := s.accountRepo.ListByPlatform(ctx, domain.PlatformWindsurf)
if err != nil {
slog.Error("windsurf_status_refresh_list_failed", "error", err)
return
}
concurrency := s.cfg.Refresh.WorkerConcurrency
if concurrency <= 0 {
concurrency = 4
}
sem := make(chan struct{}, concurrency)
var wg sync.WaitGroup
for i := range accounts {
acct := accounts[i]
creds := LoadWindsurfCredentials(acct.Credentials)
if creds.APIKey == "" {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
s.refreshOneStatus(ctx, &acct, creds)
}()
}
wg.Wait()
}
func (s *WindsurfRefreshService) refreshOneStatus(ctx context.Context, account *Account, creds WindsurfCredentials) {
proxyURL := ""
if account.ProxyID != nil {
proxy, err := s.proxyRepo.GetByID(ctx, *account.ProxyID)
if err == nil {
proxyURL = proxy.URL()
}
}
baseURL := s.cfg.UserStatusBaseURL
if baseURL == "" {
baseURL = "https://server.codeium.com"
}
client, err := windsurf.NewClient(baseURL, proxyURL)
if err != nil {
slog.Warn("windsurf_status_client_failed", "account_id", account.ID, "error", err)
return
}
userStatus, err := client.GetUserStatus(ctx, creds.APIKey)
if err != nil {
extra := LoadWindsurfExtra(account.Extra)
extra.Refresh.StatusRefreshFailures++
account.Extra = StoreWindsurfExtra(extra)
_ = s.accountRepo.Update(ctx, account)
slog.Warn("windsurf_status_refresh_failed", "account_id", account.ID, "error", err)
return
}
extra := LoadWindsurfExtra(account.Extra)
extra.Profile.UserID = userStatus.UserID
extra.Profile.TeamID = userStatus.TeamID
extra.Profile.Email = userStatus.Email
extra.Profile.DisplayName = userStatus.Name
extra.Refresh.LastStatusRefreshAt = time.Now().Format(time.RFC3339)
extra.Refresh.StatusRefreshFailures = 0
account.Extra = StoreWindsurfExtra(extra)
if err := s.accountRepo.Update(ctx, account); err != nil {
slog.Error("windsurf_status_save_failed", "account_id", account.ID, "error", err)
}
}