- windsurf_gateway_service: 添加上游延迟/TTFT/错误上下文记录 - endpoint: DeriveUpstreamEndpoint 添加 PlatformWindsurf 分支 - ops_error_logger: guessPlatformFromPath 添加 /windsurf/ 识别
274 lines
6.5 KiB
Go
274 lines
6.5 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Wei-Shaw/sub2api/internal/config"
|
|
"github.com/Wei-Shaw/sub2api/internal/domain"
|
|
"github.com/Wei-Shaw/sub2api/internal/pkg/windsurf"
|
|
)
|
|
|
|
type WindsurfRefreshService struct {
|
|
cfg config.WindsurfConfig
|
|
accountRepo AccountRepository
|
|
proxyRepo ProxyRepository
|
|
authClient *windsurf.AuthClient
|
|
|
|
stopCh chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func NewWindsurfRefreshService(
|
|
cfg config.WindsurfConfig,
|
|
accountRepo AccountRepository,
|
|
proxyRepo ProxyRepository,
|
|
authClient *windsurf.AuthClient,
|
|
) *WindsurfRefreshService {
|
|
return &WindsurfRefreshService{
|
|
cfg: cfg,
|
|
accountRepo: accountRepo,
|
|
proxyRepo: proxyRepo,
|
|
authClient: authClient,
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (s *WindsurfRefreshService) Start() {
|
|
if !s.cfg.Refresh.Enabled {
|
|
slog.Info("windsurf_refresh_disabled")
|
|
return
|
|
}
|
|
|
|
interval := s.cfg.Refresh.TokenScanInterval
|
|
if interval <= 0 {
|
|
interval = 5 * time.Minute
|
|
}
|
|
|
|
s.wg.Add(1)
|
|
go func() {
|
|
defer s.wg.Done()
|
|
s.tokenRefreshLoop(interval)
|
|
}()
|
|
|
|
statusInterval := s.cfg.Refresh.StatusRefreshInterval
|
|
if statusInterval > 0 {
|
|
s.wg.Add(1)
|
|
go func() {
|
|
defer s.wg.Done()
|
|
s.statusRefreshLoop(statusInterval)
|
|
}()
|
|
}
|
|
|
|
slog.Info("windsurf_refresh_started",
|
|
"token_interval", interval,
|
|
"status_interval", statusInterval,
|
|
)
|
|
}
|
|
|
|
func (s *WindsurfRefreshService) Stop() {
|
|
close(s.stopCh)
|
|
s.wg.Wait()
|
|
}
|
|
|
|
func (s *WindsurfRefreshService) tokenRefreshLoop(interval time.Duration) {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-s.stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
s.scanAndRefreshTokens()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *WindsurfRefreshService) statusRefreshLoop(interval time.Duration) {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-s.stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
s.scanAndRefreshStatus()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *WindsurfRefreshService) scanAndRefreshTokens() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
|
defer cancel()
|
|
|
|
accounts, err := s.accountRepo.ListByPlatform(ctx, domain.PlatformWindsurf)
|
|
if err != nil {
|
|
slog.Error("windsurf_refresh_list_failed", "error", err)
|
|
return
|
|
}
|
|
|
|
beforeExpiry := s.cfg.Refresh.RefreshBeforeExpiry
|
|
if beforeExpiry <= 0 {
|
|
beforeExpiry = 10 * time.Minute
|
|
}
|
|
|
|
concurrency := s.cfg.Refresh.WorkerConcurrency
|
|
if concurrency <= 0 {
|
|
concurrency = 4
|
|
}
|
|
|
|
sem := make(chan struct{}, concurrency)
|
|
var wg sync.WaitGroup
|
|
|
|
for i := range accounts {
|
|
acct := accounts[i]
|
|
creds := LoadWindsurfCredentials(acct.Credentials)
|
|
|
|
if !creds.NeedsRefresh(beforeExpiry) {
|
|
continue
|
|
}
|
|
if creds.AuthMethod != "firebase" || creds.RefreshToken == "" {
|
|
continue
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
sem <- struct{}{}
|
|
defer func() { <-sem }()
|
|
|
|
s.refreshOneToken(ctx, &acct, creds)
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func (s *WindsurfRefreshService) refreshOneToken(ctx context.Context, account *Account, creds WindsurfCredentials) {
|
|
proxyURL := ""
|
|
if account.ProxyID != nil {
|
|
proxy, err := s.proxyRepo.GetByID(ctx, *account.ProxyID)
|
|
if err == nil {
|
|
proxyURL = proxy.URL()
|
|
}
|
|
}
|
|
|
|
result, err := s.authClient.RefreshFirebaseToken(ctx, creds.RefreshToken, proxyURL)
|
|
if err != nil {
|
|
extra := LoadWindsurfExtra(account.Extra)
|
|
extra.Refresh.TokenRefreshFailures++
|
|
account.Extra = StoreWindsurfExtra(extra)
|
|
_ = s.accountRepo.Update(ctx, account)
|
|
slog.Warn("windsurf_token_refresh_failed", "account_id", account.ID, "error", err)
|
|
return
|
|
}
|
|
|
|
creds.IDToken = result.IDToken
|
|
creds.RefreshToken = result.RefreshToken
|
|
creds.ExpiresAt = time.Now().Add(time.Duration(result.ExpiresIn) * time.Second).Format(time.RFC3339)
|
|
creds.LastRefreshAt = time.Now().Format(time.RFC3339)
|
|
|
|
regResult, err := s.authClient.ReRegisterWithCodeium(ctx, result.IDToken, proxyURL)
|
|
if err != nil {
|
|
slog.Warn("windsurf_reregister_failed", "account_id", account.ID, "error", err)
|
|
} else {
|
|
creds.APIKey = regResult.APIKey
|
|
creds.LastReregisterAt = time.Now().Format(time.RFC3339)
|
|
}
|
|
|
|
account.Credentials = StoreWindsurfCredentials(creds)
|
|
extra := LoadWindsurfExtra(account.Extra)
|
|
extra.Refresh.LastTokenRefreshAt = time.Now().Format(time.RFC3339)
|
|
extra.Refresh.TokenRefreshFailures = 0
|
|
account.Extra = StoreWindsurfExtra(extra)
|
|
|
|
if err := s.accountRepo.Update(ctx, account); err != nil {
|
|
slog.Error("windsurf_refresh_save_failed", "account_id", account.ID, "error", err)
|
|
}
|
|
}
|
|
|
|
func (s *WindsurfRefreshService) scanAndRefreshStatus() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
|
defer cancel()
|
|
|
|
accounts, err := s.accountRepo.ListByPlatform(ctx, domain.PlatformWindsurf)
|
|
if err != nil {
|
|
slog.Error("windsurf_status_refresh_list_failed", "error", err)
|
|
return
|
|
}
|
|
|
|
concurrency := s.cfg.Refresh.WorkerConcurrency
|
|
if concurrency <= 0 {
|
|
concurrency = 4
|
|
}
|
|
|
|
sem := make(chan struct{}, concurrency)
|
|
var wg sync.WaitGroup
|
|
|
|
for i := range accounts {
|
|
acct := accounts[i]
|
|
creds := LoadWindsurfCredentials(acct.Credentials)
|
|
if creds.APIKey == "" {
|
|
continue
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
sem <- struct{}{}
|
|
defer func() { <-sem }()
|
|
|
|
s.refreshOneStatus(ctx, &acct, creds)
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func (s *WindsurfRefreshService) refreshOneStatus(ctx context.Context, account *Account, creds WindsurfCredentials) {
|
|
proxyURL := ""
|
|
if account.ProxyID != nil {
|
|
proxy, err := s.proxyRepo.GetByID(ctx, *account.ProxyID)
|
|
if err == nil {
|
|
proxyURL = proxy.URL()
|
|
}
|
|
}
|
|
|
|
baseURL := s.cfg.UserStatusBaseURL
|
|
if baseURL == "" {
|
|
baseURL = "https://server.codeium.com"
|
|
}
|
|
client, err := windsurf.NewClient(baseURL, proxyURL)
|
|
if err != nil {
|
|
slog.Warn("windsurf_status_client_failed", "account_id", account.ID, "error", err)
|
|
return
|
|
}
|
|
|
|
userStatus, err := client.GetUserStatus(ctx, creds.APIKey)
|
|
if err != nil {
|
|
extra := LoadWindsurfExtra(account.Extra)
|
|
extra.Refresh.StatusRefreshFailures++
|
|
account.Extra = StoreWindsurfExtra(extra)
|
|
_ = s.accountRepo.Update(ctx, account)
|
|
slog.Warn("windsurf_status_refresh_failed", "account_id", account.ID, "error", err)
|
|
return
|
|
}
|
|
|
|
extra := LoadWindsurfExtra(account.Extra)
|
|
extra.Profile.UserID = userStatus.UserID
|
|
extra.Profile.TeamID = userStatus.TeamID
|
|
extra.Profile.Email = userStatus.Email
|
|
extra.Profile.DisplayName = userStatus.Name
|
|
extra.Refresh.LastStatusRefreshAt = time.Now().Format(time.RFC3339)
|
|
extra.Refresh.StatusRefreshFailures = 0
|
|
account.Extra = StoreWindsurfExtra(extra)
|
|
|
|
if err := s.accountRepo.Update(ctx, account); err != nil {
|
|
slog.Error("windsurf_status_save_failed", "account_id", account.ID, "error", err)
|
|
}
|
|
}
|