package windsurf import ( "context" "fmt" "log/slog" "net" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" ) type DockerDiscoveryConfig struct { // ContainerNamePrefix filters containers whose name starts with this prefix. // Default: "sub2api-windsurf-ls" ContainerNamePrefix string // FallbackHost is used when Docker hostnames can't be resolved (local dev). // Default: "127.0.0.1" FallbackHost string // DefaultCSRFToken is the CSRF token for LS gRPC calls. DefaultCSRFToken string // ProbeInterval controls how often health probes run. // Default: 30s ProbeInterval time.Duration // ProbeTimeout is the TCP dial timeout for health checks. // Default: 3s ProbeTimeout time.Duration // DiscoverInterval controls how often Docker API is polled for new containers. // Default: 60s DiscoverInterval time.Duration } func (c *DockerDiscoveryConfig) defaults() { if c.ContainerNamePrefix == "" { c.ContainerNamePrefix = "sub2api-windsurf-ls" } if c.FallbackHost == "" { c.FallbackHost = "127.0.0.1" } if c.DefaultCSRFToken == "" { c.DefaultCSRFToken = DefaultCSRF } if c.ProbeInterval <= 0 { c.ProbeInterval = 30 * time.Second } if c.ProbeTimeout <= 0 { c.ProbeTimeout = 3 * time.Second } if c.DiscoverInterval <= 0 { c.DiscoverInterval = 60 * time.Second } } type lsInstance struct { ContainerID string ContainerName string Host string Port int CSRFToken string Client *LocalLSClient Healthy atomic.Bool DiscoveredAt time.Time LastProbeAt time.Time LastProbeErr string } type DockerDiscoveryConnector struct { cfg DockerDiscoveryConfig mu sync.RWMutex instances []*lsInstance robin atomic.Uint64 cancel context.CancelFunc done chan struct{} } func NewDockerDiscoveryConnector(cfg DockerDiscoveryConfig) *DockerDiscoveryConnector { cfg.defaults() ctx, cancel := context.WithCancel(context.Background()) c := &DockerDiscoveryConnector{ cfg: cfg, cancel: cancel, done: make(chan struct{}), } go c.loop(ctx) return c } func (c *DockerDiscoveryConnector) Mode() string { return "docker" } func (c *DockerDiscoveryConnector) Acquire(_ context.Context, _ string) (*LSLease, error) { c.mu.RLock() defer c.mu.RUnlock() healthy := c.healthyInstances() if len(healthy) == 0 { return nil, fmt.Errorf("no healthy LS instances available") } idx := c.robin.Add(1) - 1 inst := healthy[idx%uint64(len(healthy))] return &LSLease{ Mode: "docker", Endpoint: fmt.Sprintf("%s:%d", inst.Host, inst.Port), Client: inst.Client, Release: func() {}, }, nil } // AcquireByID returns the LS instance matching containerID. Falls back to round-robin if not found. func (c *DockerDiscoveryConnector) AcquireByID(containerID string) (*LSLease, error) { if containerID == "" { return c.Acquire(context.Background(), "") } c.mu.RLock() defer c.mu.RUnlock() for _, inst := range c.instances { if inst.ContainerID == containerID || inst.ContainerName == containerID { if !inst.Healthy.Load() { slog.Warn("windsurf_ls_bound_unhealthy", "container", containerID) } return &LSLease{ Mode: "docker", Endpoint: fmt.Sprintf("%s:%d", inst.Host, inst.Port), Client: inst.Client, Release: func() {}, }, nil } } slog.Warn("windsurf_ls_bound_not_found", "container", containerID, "fallback", "round-robin") return c.acquireRoundRobin() } func (c *DockerDiscoveryConnector) acquireRoundRobin() (*LSLease, error) { healthy := c.healthyInstances() if len(healthy) == 0 { return nil, fmt.Errorf("no healthy LS instances available") } idx := c.robin.Add(1) - 1 inst := healthy[idx%uint64(len(healthy))] return &LSLease{ Mode: "docker", Endpoint: fmt.Sprintf("%s:%d", inst.Host, inst.Port), Client: inst.Client, Release: func() {}, }, nil } func (c *DockerDiscoveryConnector) Health(_ context.Context) error { c.mu.RLock() defer c.mu.RUnlock() if len(c.healthyInstances()) == 0 { return fmt.Errorf("no healthy LS instances") } return nil } func (c *DockerDiscoveryConnector) Status() *LSConnectorStatus { c.mu.RLock() defer c.mu.RUnlock() healthy := c.healthyInstances() return &LSConnectorStatus{ Mode: "docker", Healthy: len(healthy) > 0, Instances: len(c.instances), Endpoint: c.endpointSummary(healthy), } } func (c *DockerDiscoveryConnector) Shutdown() { c.cancel() <-c.done } // healthyInstances returns instances where Healthy is true. Caller must hold at least RLock. func (c *DockerDiscoveryConnector) healthyInstances() []*lsInstance { var result []*lsInstance for _, inst := range c.instances { if inst.Healthy.Load() { result = append(result, inst) } } return result } func (c *DockerDiscoveryConnector) endpointSummary(healthy []*lsInstance) string { if len(healthy) == 0 { return "none" } parts := make([]string, len(healthy)) for i, inst := range healthy { parts[i] = fmt.Sprintf("%s:%d", inst.Host, inst.Port) } return strings.Join(parts, ",") } func (c *DockerDiscoveryConnector) loop(ctx context.Context) { defer close(c.done) c.discover(ctx) c.probeAll(ctx) discoverTick := time.NewTicker(c.cfg.DiscoverInterval) probeTick := time.NewTicker(c.cfg.ProbeInterval) defer discoverTick.Stop() defer probeTick.Stop() for { select { case <-ctx.Done(): return case <-discoverTick.C: c.discover(ctx) case <-probeTick.C: c.probeAll(ctx) } } } func (c *DockerDiscoveryConnector) discover(ctx context.Context) { cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) if err != nil { slog.Warn("windsurf_ls_docker_client_error", "error", err) return } defer cli.Close() containers, err := cli.ContainerList(ctx, container.ListOptions{ Filters: filters.NewArgs( filters.Arg("status", "running"), ), }) if err != nil { slog.Warn("windsurf_ls_docker_list_error", "error", err) return } var found []*lsInstance for _, ctr := range containers { name := containerName(ctr.Names) if !strings.Contains(name, "windsurf-ls") { continue } host, port, csrfToken := c.extractEndpoint(ctr) if port == 0 { continue } found = append(found, &lsInstance{ ContainerID: ctr.ID[:12], ContainerName: name, Host: host, Port: port, CSRFToken: csrfToken, DiscoveredAt: time.Now(), }) } c.mu.Lock() c.reconcile(found) c.mu.Unlock() slog.Info("windsurf_ls_discovery", "found", len(found), "total", len(c.instances)) } func containerName(names []string) string { for _, n := range names { return strings.TrimPrefix(n, "/") } return "" } func (c *DockerDiscoveryConnector) extractEndpoint(ctr container.Summary) (string, int, string) { host := containerName(ctr.Names) csrfToken := c.cfg.DefaultCSRFToken for _, env := range ctr.Labels { // labels can carry csrf overrides if needed _ = env } if _, err := net.LookupHost(host); err != nil { host = c.cfg.FallbackHost } for _, p := range ctr.Ports { if p.PrivatePort == 42099 || p.PrivatePort == 42100 || (p.PublicPort >= 42099 && p.PublicPort <= 42200) { port := int(p.PublicPort) if port == 0 { port = int(p.PrivatePort) } // When port has a host-bound IP (e.g. 127.0.0.1:42100->42100), // use that IP instead of the container name. This ensures the // backend can reach the LS when running on the host (go run) // rather than inside the Docker network. if p.IP != "" && p.PublicPort > 0 { host = p.IP port = int(p.PublicPort) } else if host == c.cfg.FallbackHost && p.PublicPort > 0 { port = int(p.PublicPort) } for _, e := range envFromLabels(ctr.Labels) { if strings.HasPrefix(e, "LS_CSRF_TOKEN=") { csrfToken = strings.TrimPrefix(e, "LS_CSRF_TOKEN=") } } return host, port, csrfToken } } return host, 0, csrfToken } func envFromLabels(labels map[string]string) []string { var result []string for k, v := range labels { if strings.HasPrefix(k, "windsurf.") { result = append(result, strings.TrimPrefix(k, "windsurf.")+"="+v) } } return result } // reconcile merges discovered containers into the existing pool. Caller must hold Lock. func (c *DockerDiscoveryConnector) reconcile(found []*lsInstance) { existing := make(map[string]*lsInstance) for _, inst := range c.instances { existing[inst.ContainerID] = inst } var merged []*lsInstance for _, f := range found { if old, ok := existing[f.ContainerID]; ok { old.Host = f.Host old.Port = f.Port merged = append(merged, old) } else { f.Client = NewLocalLSClient(f.Port, f.CSRFToken) if f.Host != "localhost" && f.Host != "127.0.0.1" { f.Client.BaseURL = fmt.Sprintf("http://%s:%d", f.Host, f.Port) } merged = append(merged, f) } } if len(merged) == 0 && len(c.instances) > 0 { slog.Warn("windsurf_ls_discovery_empty", "keeping_old", len(c.instances)) return } c.instances = merged } func (c *DockerDiscoveryConnector) probeAll(ctx context.Context) { c.mu.RLock() snapshot := make([]*lsInstance, len(c.instances)) copy(snapshot, c.instances) c.mu.RUnlock() for _, inst := range snapshot { healthy := c.probeOne(ctx, inst) inst.Healthy.Store(healthy) inst.LastProbeAt = time.Now() if healthy { inst.LastProbeErr = "" } } } func (c *DockerDiscoveryConnector) probeOne(_ context.Context, inst *lsInstance) bool { addr := fmt.Sprintf("%s:%d", inst.Host, inst.Port) conn, err := net.DialTimeout("tcp", addr, c.cfg.ProbeTimeout) if err != nil { inst.LastProbeErr = err.Error() if inst.Healthy.Load() { slog.Warn("windsurf_ls_unhealthy", "container", inst.ContainerName, "addr", addr, "error", err) } return false } conn.Close() if !inst.Healthy.Load() { slog.Info("windsurf_ls_healthy", "container", inst.ContainerName, "addr", addr) } return true } // InstanceStatuses returns detailed status for each discovered instance (for admin API). func (c *DockerDiscoveryConnector) InstanceStatuses() []DockerLSInstanceStatus { c.mu.RLock() defer c.mu.RUnlock() result := make([]DockerLSInstanceStatus, len(c.instances)) for i, inst := range c.instances { result[i] = DockerLSInstanceStatus{ ContainerID: inst.ContainerID, ContainerName: inst.ContainerName, Host: inst.Host, Port: inst.Port, Healthy: inst.Healthy.Load(), DiscoveredAt: inst.DiscoveredAt, LastProbeAt: inst.LastProbeAt, LastProbeErr: inst.LastProbeErr, } } return result } type DockerLSInstanceStatus struct { ContainerID string `json:"container_id"` ContainerName string `json:"container_name"` Host string `json:"host"` Port int `json:"port"` Healthy bool `json:"healthy"` DiscoveredAt time.Time `json:"discovered_at"` LastProbeAt time.Time `json:"last_probe_at"` LastProbeErr string `json:"last_probe_err,omitempty"` } // NewCompatDockerConnector creates a discovery connector with a static fallback entry. // It uses the legacy host/port/csrf config as an initial static instance, then overlays // Docker API auto-discovery. If the configured host can't resolve, it falls back to 127.0.0.1. func NewCompatDockerConnector(host string, port int, discoveryCfg DockerDiscoveryConfig) *DockerDiscoveryConnector { resolvedHost := host if _, err := net.LookupHost(host); err != nil { resolvedHost = "127.0.0.1" slog.Info("windsurf_ls_host_fallback", "original", host, "resolved", resolvedHost) } if discoveryCfg.DefaultCSRFToken == "" { discoveryCfg.DefaultCSRFToken = DefaultCSRF } discoveryCfg.FallbackHost = "127.0.0.1" discoveryCfg.defaults() ctx, cancel := context.WithCancel(context.Background()) c := &DockerDiscoveryConnector{ cfg: discoveryCfg, cancel: cancel, done: make(chan struct{}), } csrfToken := discoveryCfg.DefaultCSRFToken staticInst := &lsInstance{ ContainerID: "static", ContainerName: fmt.Sprintf("static-%s-%d", host, port), Host: resolvedHost, Port: port, CSRFToken: csrfToken, Client: NewLocalLSClient(port, csrfToken), DiscoveredAt: time.Now(), } if resolvedHost != "localhost" && resolvedHost != "127.0.0.1" { staticInst.Client.BaseURL = fmt.Sprintf("http://%s:%d", resolvedHost, port) } c.mu.Lock() c.instances = []*lsInstance{staticInst} c.mu.Unlock() go c.loop(ctx) return c } // parsePortFromEnv extracts port from LS_PORT environment variable value. func parsePortFromEnv(envVars []string) int { for _, e := range envVars { if strings.HasPrefix(e, "LS_PORT=") { p, err := strconv.Atoi(strings.TrimPrefix(e, "LS_PORT=")) if err == nil { return p } } } return 0 }