sub2api/backend/internal/pkg/windsurf/docker_discovery.go
win 21325afb33
Some checks failed
CI / test (push) Failing after 10s
CI / frontend (push) Failing after 8s
CI / golangci-lint (push) Failing after 5s
Security Scan / backend-security (push) Failing after 5s
Security Scan / frontend-security (push) Failing after 4s
feat(windsurf): 补全ops日志记录与endpoint派生,对齐其他平台
- windsurf_gateway_service: 添加上游延迟/TTFT/错误上下文记录
- endpoint: DeriveUpstreamEndpoint 添加 PlatformWindsurf 分支
- ops_error_logger: guessPlatformFromPath 添加 /windsurf/ 识别
2026-04-23 20:46:27 +08:00

494 lines
12 KiB
Go

package windsurf
import (
"context"
"fmt"
"log/slog"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
)
type DockerDiscoveryConfig struct {
// ContainerNamePrefix filters containers whose name starts with this prefix.
// Default: "sub2api-windsurf-ls"
ContainerNamePrefix string
// FallbackHost is used when Docker hostnames can't be resolved (local dev).
// Default: "127.0.0.1"
FallbackHost string
// DefaultCSRFToken is the CSRF token for LS gRPC calls.
DefaultCSRFToken string
// ProbeInterval controls how often health probes run.
// Default: 30s
ProbeInterval time.Duration
// ProbeTimeout is the TCP dial timeout for health checks.
// Default: 3s
ProbeTimeout time.Duration
// DiscoverInterval controls how often Docker API is polled for new containers.
// Default: 60s
DiscoverInterval time.Duration
}
func (c *DockerDiscoveryConfig) defaults() {
if c.ContainerNamePrefix == "" {
c.ContainerNamePrefix = "sub2api-windsurf-ls"
}
if c.FallbackHost == "" {
c.FallbackHost = "127.0.0.1"
}
if c.DefaultCSRFToken == "" {
c.DefaultCSRFToken = DefaultCSRF
}
if c.ProbeInterval <= 0 {
c.ProbeInterval = 30 * time.Second
}
if c.ProbeTimeout <= 0 {
c.ProbeTimeout = 3 * time.Second
}
if c.DiscoverInterval <= 0 {
c.DiscoverInterval = 60 * time.Second
}
}
type lsInstance struct {
ContainerID string
ContainerName string
Host string
Port int
CSRFToken string
Client *LocalLSClient
Healthy atomic.Bool
DiscoveredAt time.Time
LastProbeAt time.Time
LastProbeErr string
}
type DockerDiscoveryConnector struct {
cfg DockerDiscoveryConfig
mu sync.RWMutex
instances []*lsInstance
robin atomic.Uint64
cancel context.CancelFunc
done chan struct{}
}
func NewDockerDiscoveryConnector(cfg DockerDiscoveryConfig) *DockerDiscoveryConnector {
cfg.defaults()
ctx, cancel := context.WithCancel(context.Background())
c := &DockerDiscoveryConnector{
cfg: cfg,
cancel: cancel,
done: make(chan struct{}),
}
go c.loop(ctx)
return c
}
func (c *DockerDiscoveryConnector) Mode() string { return "docker" }
func (c *DockerDiscoveryConnector) Acquire(_ context.Context, _ string) (*LSLease, error) {
c.mu.RLock()
defer c.mu.RUnlock()
healthy := c.healthyInstances()
if len(healthy) == 0 {
return nil, fmt.Errorf("no healthy LS instances available")
}
idx := c.robin.Add(1) - 1
inst := healthy[idx%uint64(len(healthy))]
return &LSLease{
Mode: "docker",
Endpoint: fmt.Sprintf("%s:%d", inst.Host, inst.Port),
Client: inst.Client,
Release: func() {},
}, nil
}
// AcquireByID returns the LS instance matching containerID. Falls back to round-robin if not found.
func (c *DockerDiscoveryConnector) AcquireByID(containerID string) (*LSLease, error) {
if containerID == "" {
return c.Acquire(context.Background(), "")
}
c.mu.RLock()
defer c.mu.RUnlock()
for _, inst := range c.instances {
if inst.ContainerID == containerID || inst.ContainerName == containerID {
if !inst.Healthy.Load() {
slog.Warn("windsurf_ls_bound_unhealthy", "container", containerID)
}
return &LSLease{
Mode: "docker",
Endpoint: fmt.Sprintf("%s:%d", inst.Host, inst.Port),
Client: inst.Client,
Release: func() {},
}, nil
}
}
slog.Warn("windsurf_ls_bound_not_found", "container", containerID, "fallback", "round-robin")
return c.acquireRoundRobin()
}
func (c *DockerDiscoveryConnector) acquireRoundRobin() (*LSLease, error) {
healthy := c.healthyInstances()
if len(healthy) == 0 {
return nil, fmt.Errorf("no healthy LS instances available")
}
idx := c.robin.Add(1) - 1
inst := healthy[idx%uint64(len(healthy))]
return &LSLease{
Mode: "docker",
Endpoint: fmt.Sprintf("%s:%d", inst.Host, inst.Port),
Client: inst.Client,
Release: func() {},
}, nil
}
func (c *DockerDiscoveryConnector) Health(_ context.Context) error {
c.mu.RLock()
defer c.mu.RUnlock()
if len(c.healthyInstances()) == 0 {
return fmt.Errorf("no healthy LS instances")
}
return nil
}
func (c *DockerDiscoveryConnector) Status() *LSConnectorStatus {
c.mu.RLock()
defer c.mu.RUnlock()
healthy := c.healthyInstances()
return &LSConnectorStatus{
Mode: "docker",
Healthy: len(healthy) > 0,
Instances: len(c.instances),
Endpoint: c.endpointSummary(healthy),
}
}
func (c *DockerDiscoveryConnector) Shutdown() {
c.cancel()
<-c.done
}
// healthyInstances returns instances where Healthy is true. Caller must hold at least RLock.
func (c *DockerDiscoveryConnector) healthyInstances() []*lsInstance {
var result []*lsInstance
for _, inst := range c.instances {
if inst.Healthy.Load() {
result = append(result, inst)
}
}
return result
}
func (c *DockerDiscoveryConnector) endpointSummary(healthy []*lsInstance) string {
if len(healthy) == 0 {
return "none"
}
parts := make([]string, len(healthy))
for i, inst := range healthy {
parts[i] = fmt.Sprintf("%s:%d", inst.Host, inst.Port)
}
return strings.Join(parts, ",")
}
func (c *DockerDiscoveryConnector) loop(ctx context.Context) {
defer close(c.done)
c.discover(ctx)
c.probeAll(ctx)
discoverTick := time.NewTicker(c.cfg.DiscoverInterval)
probeTick := time.NewTicker(c.cfg.ProbeInterval)
defer discoverTick.Stop()
defer probeTick.Stop()
for {
select {
case <-ctx.Done():
return
case <-discoverTick.C:
c.discover(ctx)
case <-probeTick.C:
c.probeAll(ctx)
}
}
}
func (c *DockerDiscoveryConnector) discover(ctx context.Context) {
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
slog.Warn("windsurf_ls_docker_client_error", "error", err)
return
}
defer cli.Close()
containers, err := cli.ContainerList(ctx, container.ListOptions{
Filters: filters.NewArgs(
filters.Arg("status", "running"),
),
})
if err != nil {
slog.Warn("windsurf_ls_docker_list_error", "error", err)
return
}
var found []*lsInstance
for _, ctr := range containers {
name := containerName(ctr.Names)
if !strings.Contains(name, "windsurf-ls") {
continue
}
host, port, csrfToken := c.extractEndpoint(ctr)
if port == 0 {
continue
}
found = append(found, &lsInstance{
ContainerID: ctr.ID[:12],
ContainerName: name,
Host: host,
Port: port,
CSRFToken: csrfToken,
DiscoveredAt: time.Now(),
})
}
c.mu.Lock()
c.reconcile(found)
c.mu.Unlock()
slog.Info("windsurf_ls_discovery", "found", len(found), "total", len(c.instances))
}
func containerName(names []string) string {
for _, n := range names {
return strings.TrimPrefix(n, "/")
}
return ""
}
func (c *DockerDiscoveryConnector) extractEndpoint(ctr container.Summary) (string, int, string) {
host := containerName(ctr.Names)
csrfToken := c.cfg.DefaultCSRFToken
for _, env := range ctr.Labels {
// labels can carry csrf overrides if needed
_ = env
}
if _, err := net.LookupHost(host); err != nil {
host = c.cfg.FallbackHost
}
for _, p := range ctr.Ports {
if p.PrivatePort == 42099 || p.PrivatePort == 42100 || (p.PublicPort >= 42099 && p.PublicPort <= 42200) {
port := int(p.PublicPort)
if port == 0 {
port = int(p.PrivatePort)
}
// When port has a host-bound IP (e.g. 127.0.0.1:42100->42100),
// use that IP instead of the container name. This ensures the
// backend can reach the LS when running on the host (go run)
// rather than inside the Docker network.
if p.IP != "" && p.PublicPort > 0 {
host = p.IP
port = int(p.PublicPort)
} else if host == c.cfg.FallbackHost && p.PublicPort > 0 {
port = int(p.PublicPort)
}
for _, e := range envFromLabels(ctr.Labels) {
if strings.HasPrefix(e, "LS_CSRF_TOKEN=") {
csrfToken = strings.TrimPrefix(e, "LS_CSRF_TOKEN=")
}
}
return host, port, csrfToken
}
}
return host, 0, csrfToken
}
func envFromLabels(labels map[string]string) []string {
var result []string
for k, v := range labels {
if strings.HasPrefix(k, "windsurf.") {
result = append(result, strings.TrimPrefix(k, "windsurf.")+"="+v)
}
}
return result
}
// reconcile merges discovered containers into the existing pool. Caller must hold Lock.
func (c *DockerDiscoveryConnector) reconcile(found []*lsInstance) {
existing := make(map[string]*lsInstance)
for _, inst := range c.instances {
existing[inst.ContainerID] = inst
}
var merged []*lsInstance
for _, f := range found {
if old, ok := existing[f.ContainerID]; ok {
old.Host = f.Host
old.Port = f.Port
merged = append(merged, old)
} else {
f.Client = NewLocalLSClient(f.Port, f.CSRFToken)
if f.Host != "localhost" && f.Host != "127.0.0.1" {
f.Client.BaseURL = fmt.Sprintf("http://%s:%d", f.Host, f.Port)
}
merged = append(merged, f)
}
}
if len(merged) == 0 && len(c.instances) > 0 {
slog.Warn("windsurf_ls_discovery_empty", "keeping_old", len(c.instances))
return
}
c.instances = merged
}
func (c *DockerDiscoveryConnector) probeAll(ctx context.Context) {
c.mu.RLock()
snapshot := make([]*lsInstance, len(c.instances))
copy(snapshot, c.instances)
c.mu.RUnlock()
for _, inst := range snapshot {
healthy := c.probeOne(ctx, inst)
inst.Healthy.Store(healthy)
inst.LastProbeAt = time.Now()
if healthy {
inst.LastProbeErr = ""
}
}
}
func (c *DockerDiscoveryConnector) probeOne(_ context.Context, inst *lsInstance) bool {
addr := fmt.Sprintf("%s:%d", inst.Host, inst.Port)
conn, err := net.DialTimeout("tcp", addr, c.cfg.ProbeTimeout)
if err != nil {
inst.LastProbeErr = err.Error()
if inst.Healthy.Load() {
slog.Warn("windsurf_ls_unhealthy", "container", inst.ContainerName, "addr", addr, "error", err)
}
return false
}
conn.Close()
if !inst.Healthy.Load() {
slog.Info("windsurf_ls_healthy", "container", inst.ContainerName, "addr", addr)
}
return true
}
// InstanceStatuses returns detailed status for each discovered instance (for admin API).
func (c *DockerDiscoveryConnector) InstanceStatuses() []DockerLSInstanceStatus {
c.mu.RLock()
defer c.mu.RUnlock()
result := make([]DockerLSInstanceStatus, len(c.instances))
for i, inst := range c.instances {
result[i] = DockerLSInstanceStatus{
ContainerID: inst.ContainerID,
ContainerName: inst.ContainerName,
Host: inst.Host,
Port: inst.Port,
Healthy: inst.Healthy.Load(),
DiscoveredAt: inst.DiscoveredAt,
LastProbeAt: inst.LastProbeAt,
LastProbeErr: inst.LastProbeErr,
}
}
return result
}
type DockerLSInstanceStatus struct {
ContainerID string `json:"container_id"`
ContainerName string `json:"container_name"`
Host string `json:"host"`
Port int `json:"port"`
Healthy bool `json:"healthy"`
DiscoveredAt time.Time `json:"discovered_at"`
LastProbeAt time.Time `json:"last_probe_at"`
LastProbeErr string `json:"last_probe_err,omitempty"`
}
// NewCompatDockerConnector creates a discovery connector with a static fallback entry.
// It uses the legacy host/port/csrf config as an initial static instance, then overlays
// Docker API auto-discovery. If the configured host can't resolve, it falls back to 127.0.0.1.
func NewCompatDockerConnector(host string, port int, discoveryCfg DockerDiscoveryConfig) *DockerDiscoveryConnector {
resolvedHost := host
if _, err := net.LookupHost(host); err != nil {
resolvedHost = "127.0.0.1"
slog.Info("windsurf_ls_host_fallback", "original", host, "resolved", resolvedHost)
}
if discoveryCfg.DefaultCSRFToken == "" {
discoveryCfg.DefaultCSRFToken = DefaultCSRF
}
discoveryCfg.FallbackHost = "127.0.0.1"
discoveryCfg.defaults()
ctx, cancel := context.WithCancel(context.Background())
c := &DockerDiscoveryConnector{
cfg: discoveryCfg,
cancel: cancel,
done: make(chan struct{}),
}
csrfToken := discoveryCfg.DefaultCSRFToken
staticInst := &lsInstance{
ContainerID: "static",
ContainerName: fmt.Sprintf("static-%s-%d", host, port),
Host: resolvedHost,
Port: port,
CSRFToken: csrfToken,
Client: NewLocalLSClient(port, csrfToken),
DiscoveredAt: time.Now(),
}
if resolvedHost != "localhost" && resolvedHost != "127.0.0.1" {
staticInst.Client.BaseURL = fmt.Sprintf("http://%s:%d", resolvedHost, port)
}
c.mu.Lock()
c.instances = []*lsInstance{staticInst}
c.mu.Unlock()
go c.loop(ctx)
return c
}
// parsePortFromEnv extracts port from LS_PORT environment variable value.
func parsePortFromEnv(envVars []string) int {
for _, e := range envVars {
if strings.HasPrefix(e, "LS_PORT=") {
p, err := strconv.Atoi(strings.TrimPrefix(e, "LS_PORT="))
if err == nil {
return p
}
}
}
return 0
}