460 lines
11 KiB
Go

package windsurf
import (
"bufio"
"context"
"crypto/tls"
"fmt"
"io"
"log/slog"
"net"
"net/url"
"os"
"os/exec"
"path/filepath"
"regexp"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/http2"
"golang.org/x/sync/singleflight"
)
const (
DefaultLSBinary = "/opt/windsurf/language_server_linux_x64"
DefaultLSPort = 42100
DefaultCSRF = "windsurf-api-csrf-fixed-token"
DefaultAPIServer = "https://server.self-serve.windsurf.com"
)
type LSPoolConfig struct {
Binary string
BasePort int
CSRFToken string
APIServerURL string
DataDir string
}
func (c *LSPoolConfig) defaults() {
if c.Binary == "" {
// Try env override first, then platform-aware discovery, then legacy
// Linux default so existing /opt/windsurf deployments keep booting.
// Real errors (missing binary, wrong platform) still surface later
// at spawn time with more context than we could give here.
if found, err := DiscoverBinary(*c); err == nil {
c.Binary = found
} else {
c.Binary = DefaultLSBinary
}
}
if c.BasePort <= 0 {
c.BasePort = DefaultLSPort
}
if c.CSRFToken == "" {
c.CSRFToken = DefaultCSRF
}
if c.APIServerURL == "" {
c.APIServerURL = os.Getenv("CODEIUM_API_URL")
if c.APIServerURL == "" {
c.APIServerURL = DefaultAPIServer
}
}
if c.DataDir == "" {
c.DataDir = resolveDataDir(*c)
}
}
type LSEntry struct {
Cmd *exec.Cmd
Port int
CSRFToken string
Client *LocalLSClient
ProxyKey string
Ready atomic.Bool
StartedAt time.Time
done chan struct{} // closed when the process exits
}
type LSPool struct {
pool map[string]*LSEntry
mu sync.RWMutex
sf singleflight.Group
nextPort atomic.Int32
config LSPoolConfig
logFunc func(format string, args ...any)
}
func NewLSPool(cfg LSPoolConfig, logFn func(string, ...any)) *LSPool {
cfg.defaults()
p := &LSPool{
pool: make(map[string]*LSEntry),
config: cfg,
logFunc: logFn,
}
p.nextPort.Store(int32(cfg.BasePort + 1))
return p
}
func (p *LSPool) log(format string, args ...any) {
if p.logFunc != nil {
p.logFunc(format, args...)
}
}
var nonAlphaNum = regexp.MustCompile(`[^a-zA-Z0-9]`)
// proxyKey produces a pool key from a proxy URL.
// Includes auth hash so different credentials on the same host get separate LS instances.
func proxyKey(proxyURL string) string {
proxyURL = strings.TrimSpace(proxyURL)
if proxyURL == "" {
return "default"
}
u, err := url.Parse(proxyURL)
if err != nil {
return "px_" + nonAlphaNum.ReplaceAllString(proxyURL, "_")
}
key := u.Hostname()
if u.Port() != "" {
key += "_" + u.Port()
}
if u.User != nil {
key += "_" + nonAlphaNum.ReplaceAllString(u.User.Username(), "_")
}
return "px_" + nonAlphaNum.ReplaceAllString(key, "_")
}
// redactProxyURL strips credentials from a proxy URL for safe logging.
func redactProxyURL(proxyURL string) string {
if proxyURL == "" {
return "none"
}
u, err := url.Parse(proxyURL)
if err != nil {
return "<invalid>"
}
u.User = nil
return u.String()
}
func (p *LSPool) Ensure(ctx context.Context, proxyURL string) (*LSEntry, error) {
key := proxyKey(proxyURL)
p.mu.RLock()
if e, ok := p.pool[key]; ok && e.Ready.Load() {
p.mu.RUnlock()
return e, nil
}
p.mu.RUnlock()
val, err, _ := p.sf.Do(key, func() (any, error) {
p.mu.RLock()
if e, ok := p.pool[key]; ok && e.Ready.Load() {
p.mu.RUnlock()
return e, nil
}
p.mu.RUnlock()
return p.spawnLS(ctx, key, proxyURL)
})
if err != nil {
return nil, err
}
return val.(*LSEntry), nil
}
func (p *LSPool) Get(proxyURL string) *LSEntry {
p.mu.RLock()
defer p.mu.RUnlock()
return p.pool[proxyKey(proxyURL)]
}
func (p *LSPool) Restart(ctx context.Context, proxyURL string) (*LSEntry, error) {
key := proxyKey(proxyURL)
p.mu.Lock()
if old, ok := p.pool[key]; ok {
p.stopEntry(old)
delete(p.pool, key)
}
p.mu.Unlock()
return p.Ensure(ctx, proxyURL)
}
func (p *LSPool) Shutdown() {
p.mu.Lock()
defer p.mu.Unlock()
for key, entry := range p.pool {
p.stopEntry(entry)
p.log("LS instance %s stopped", key)
}
p.pool = make(map[string]*LSEntry)
}
func (p *LSPool) stopEntry(e *LSEntry) {
e.Ready.Store(false)
if e.Cmd == nil || e.Cmd.Process == nil {
return
}
terminateProcess(e.Cmd.Process, e.done)
}
type LSStatus struct {
Running bool
Instances []LSInstanceStatus
}
type LSInstanceStatus struct {
Key string
Port int
PID int
ProxyKey string
StartedAt time.Time
Ready bool
}
func (p *LSPool) Status() LSStatus {
p.mu.RLock()
defer p.mu.RUnlock()
s := LSStatus{Running: len(p.pool) > 0}
for key, e := range p.pool {
pid := 0
if e.Cmd != nil && e.Cmd.Process != nil {
pid = e.Cmd.Process.Pid
}
s.Instances = append(s.Instances, LSInstanceStatus{
Key: key, Port: e.Port, PID: pid,
ProxyKey: e.ProxyKey, StartedAt: e.StartedAt, Ready: e.Ready.Load(),
})
}
return s
}
func (p *LSPool) allocPort(isDefault bool) (int, error) {
if isDefault {
return p.config.BasePort, nil
}
for i := 0; i < 50; i++ {
port := int(p.nextPort.Add(1)) - 1
if !isPortInUse(port) {
return port, nil
}
p.log("LS port %d busy, advancing", port)
}
return 0, fmt.Errorf("no free port for LS in 50 attempts starting from %d", p.config.BasePort+1)
}
func isPortInUse(port int) bool {
conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), time.Second)
if err != nil {
return false
}
conn.Close()
return true
}
func waitPortReady(port int, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
h2t := &http2.Transport{
AllowHTTP: true,
DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) {
return (&net.Dialer{Timeout: 2 * time.Second}).DialContext(ctx, network, addr)
},
}
defer h2t.CloseIdleConnections()
for time.Now().Before(deadline) {
conn, err := h2t.DialTLSContext(context.Background(), "tcp", fmt.Sprintf("127.0.0.1:%d", port), nil)
if err == nil {
conn.Close()
return nil
}
time.Sleep(500 * time.Millisecond)
}
return fmt.Errorf("LS port %d not ready after %v", port, timeout)
}
func (p *LSPool) spawnLS(ctx context.Context, key, proxyURL string) (*LSEntry, error) {
isDefault := key == "default"
if isDefault && isPortInUse(p.config.BasePort) {
p.log("LS default port %d already in use — adopting existing instance", p.config.BasePort)
entry := &LSEntry{
Port: p.config.BasePort,
CSRFToken: p.config.CSRFToken,
Client: NewLocalLSClient(p.config.BasePort, p.config.CSRFToken),
ProxyKey: key,
StartedAt: time.Now(),
done: make(chan struct{}),
}
entry.Ready.Store(true)
close(entry.done)
p.mu.Lock()
p.pool[key] = entry
p.mu.Unlock()
return entry, nil
}
port, err := p.allocPort(isDefault)
if err != nil {
return nil, err
}
dataDir := filepath.Join(p.config.DataDir, key)
if err := os.MkdirAll(filepath.Join(dataDir, "db"), 0o755); err != nil {
return nil, fmt.Errorf("mkdirAll %s/db: %w", dataDir, err)
}
// Per-instance sandboxed HOME so the LS binary's telemetry/cache writes
// stay inside dataDir instead of leaking into the invoker's real home or
// /root. Required on macOS/Windows where /root does not exist.
homeDir := instanceHomeDir(dataDir)
if err := os.MkdirAll(homeDir, 0o755); err != nil {
return nil, fmt.Errorf("mkdirAll home %s: %w", homeDir, err)
}
args := []string{
fmt.Sprintf("--api_server_url=%s", p.config.APIServerURL),
fmt.Sprintf("--server_port=%d", port),
fmt.Sprintf("--csrf_token=%s", p.config.CSRFToken),
"--register_user_url=https://api.codeium.com/register_user/",
fmt.Sprintf("--codeium_dir=%s", dataDir),
fmt.Sprintf("--database_dir=%s/db", dataDir),
"--enable_local_search=false",
"--enable_index_service=false",
"--enable_lsp=false",
"--detect_proxy=false",
}
// Don't bind LS process lifetime to request context — use background context for the process.
cmd := exec.Command(p.config.Binary, args...)
cmd.Env = append(os.Environ(), homeEnvForPlatform(homeDir, runtime.GOOS)...)
// Run with cwd = binary directory so the LS can find helper binaries
// (e.g. `fd`) shipped alongside it in the official install layout.
cmd.Dir = filepath.Dir(p.config.Binary)
if proxyURL != "" {
cmd.Env = append(cmd.Env,
"HTTPS_PROXY="+proxyURL,
"HTTP_PROXY="+proxyURL,
"https_proxy="+proxyURL,
"http_proxy="+proxyURL,
)
}
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("ls stdout pipe %s: %w", key, err)
}
stderrPipe, err := cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("ls stderr pipe %s: %w", key, err)
}
p.log("Starting LS instance key=%s port=%d proxy=%s", key, port, redactProxyURL(proxyURL))
attachProcessGroup(cmd)
if err := cmd.Start(); err != nil {
return nil, wrapSpawnError(key, p.config.Binary, err)
}
pid := 0
if cmd.Process != nil {
pid = cmd.Process.Pid
}
go scanLSOutput(stdoutPipe, key, pid, "stdout")
go scanLSOutput(stderrPipe, key, pid, "stderr")
entry := &LSEntry{
Cmd: cmd,
Port: port,
CSRFToken: p.config.CSRFToken,
Client: NewLocalLSClient(port, p.config.CSRFToken),
ProxyKey: key,
StartedAt: time.Now(),
done: make(chan struct{}),
}
p.mu.Lock()
p.pool[key] = entry
p.mu.Unlock()
go p.monitorProcess(key, entry)
if err := waitPortReady(port, 25*time.Second); err != nil {
p.log("LS instance %s failed to become ready: %v", key, err)
_ = cmd.Process.Kill()
p.mu.Lock()
delete(p.pool, key)
p.mu.Unlock()
<-entry.done
return nil, err
}
entry.Ready.Store(true)
p.log("LS instance %s ready on port %d", key, port)
return entry, nil
}
// monitorProcess is the sole reaper for the LS process.
func (p *LSPool) monitorProcess(key string, entry *LSEntry) {
err := entry.Cmd.Wait()
close(entry.done)
entry.Ready.Store(false)
exitMsg := "nil"
if err != nil {
exitMsg = err.Error()
}
p.log("LS instance %s exited: %s", key, exitMsg)
p.mu.Lock()
if cur, ok := p.pool[key]; ok && cur == entry {
delete(p.pool, key)
}
p.mu.Unlock()
}
// scanLSOutput forwards each line from the LS process's stdout/stderr to slog.
// The goroutine exits when the pipe is closed (i.e. when the LS process exits
// and the kernel closes the write end).
func scanLSOutput(r io.Reader, key string, pid int, stream string) {
sc := bufio.NewScanner(r)
// Allow up to 1 MiB per line to handle verbose panic stacks.
sc.Buffer(make([]byte, 4096), 1<<20)
for sc.Scan() {
line := strings.TrimRight(sc.Text(), "\r")
if line == "" {
continue
}
slog.Info("windsurf_ls_output",
"key", key,
"pid", pid,
"stream", stream,
"line", line,
)
}
if err := sc.Err(); err != nil && err != io.EOF {
slog.Debug("windsurf_ls_output_scan_error",
"key", key,
"pid", pid,
"stream", stream,
"error", err,
)
}
}
// wrapSpawnError annotates a cmd.Start() failure with platform-specific
// troubleshooting guidance so users don't have to guess why the LS binary
// refused to launch. The original error is preserved via %w so callers can
// still errors.Is/As against it.
func wrapSpawnError(key, binary string, err error) error {
base := fmt.Errorf("spawn LS %s (%s): %w", key, binary, err)
switch runtime.GOOS {
case "darwin":
return fmt.Errorf("%w — if macOS Gatekeeper blocked this, run: xattr -d com.apple.quarantine %s (or reinstall Windsurf from the official app)", base, binary)
case "windows":
return fmt.Errorf("%w — if Windows Defender/SmartScreen blocked this, verify the binary is not quarantined and has execute permissions", base)
}
return base
}