package service import ( "context" "fmt" "io/fs" "log/slog" "net/http" "os" "path/filepath" "time" connect "connectrpc.com/connect" "github.com/Wei-Shaw/sub2api/internal/gen/language_server_pb" "github.com/Wei-Shaw/sub2api/internal/gen/language_server_pbconnect" "github.com/Wei-Shaw/sub2api/internal/pkg/antigravity" "google.golang.org/protobuf/types/known/timestamppb" ) const upstreamLSRPCBaseURL = "https://cloudcode-pa.googleapis.com" // LSRPCHandler implements LanguageServerServiceHandler by proxying to the real upstream // lsrpc service using OAuth tokens obtained from AntigravityGatewayService. // File RPCs (ReadFile/WriteFile/ReadDir/etc.) operate on the local filesystem. type LSRPCHandler struct { language_server_pbconnect.UnimplementedLanguageServerServiceHandler antigravitySvc *AntigravityGatewayService accountRepo AccountRepository logger *slog.Logger } // NewLSRPCHandler creates a new LSRPCHandler. func NewLSRPCHandler( antigravitySvc *AntigravityGatewayService, accountRepo AccountRepository, logger *slog.Logger, ) *LSRPCHandler { if logger == nil { logger = slog.Default() } return &LSRPCHandler{ antigravitySvc: antigravitySvc, accountRepo: accountRepo, logger: logger, } } // upstreamClient creates a connectrpc client to the real lsrpc upstream, // authenticated with the OAuth token from the given account. func (h *LSRPCHandler) upstreamClient(ctx context.Context) (language_server_pbconnect.LanguageServerServiceClient, error) { accounts, err := h.accountRepo.ListByPlatform(ctx, PlatformAntigravity) if err != nil || len(accounts) == 0 { return nil, fmt.Errorf("no antigravity accounts available: %w", err) } account := &accounts[0] tokenProvider := h.antigravitySvc.GetTokenProvider() if tokenProvider == nil { return nil, fmt.Errorf("antigravity token provider not configured") } accessToken, err := tokenProvider.GetAccessToken(ctx, account) if err != nil { return nil, fmt.Errorf("failed to get access token: %w", err) } httpClient := &http.Client{ Timeout: 5 * time.Minute, Transport: &bearerTransport{ base: http.DefaultTransport, token: accessToken, }, } client := language_server_pbconnect.NewLanguageServerServiceClient( httpClient, upstreamLSRPCBaseURL, connect.WithGRPC(), ) return client, nil } // bearerTransport injects Authorization: Bearer into every request. type bearerTransport struct { base http.RoundTripper token string } func (t *bearerTransport) RoundTrip(req *http.Request) (*http.Response, error) { clone := req.Clone(req.Context()) clone.Header.Set("Authorization", "Bearer "+t.token) return t.base.RoundTrip(clone) } // ============================================================================ // Cascade RPCs — proxied to real upstream // ============================================================================ func (h *LSRPCHandler) StartCascade( ctx context.Context, req *connect.Request[language_server_pb.StartCascadeRequest], ) (*connect.Response[language_server_pb.StartCascadeResponse], error) { client, err := h.upstreamClient(ctx) if err != nil { return nil, connect.NewError(connect.CodeUnavailable, err) } return client.StartCascade(ctx, req) } func (h *LSRPCHandler) SendUserCascadeMessage( ctx context.Context, req *connect.Request[language_server_pb.SendUserCascadeMessageRequest], stream *connect.ServerStream[language_server_pb.CascadeReactiveUpdate], ) error { client, err := h.upstreamClient(ctx) if err != nil { return connect.NewError(connect.CodeUnavailable, err) } upstreamStream, err := client.SendUserCascadeMessage(ctx, req) if err != nil { return err } defer upstreamStream.Close() for upstreamStream.Receive() { if err := stream.Send(upstreamStream.Msg()); err != nil { return err } } return upstreamStream.Err() } func (h *LSRPCHandler) CancelCascadeInvocation( ctx context.Context, req *connect.Request[language_server_pb.CancelCascadeInvocationRequest], ) (*connect.Response[language_server_pb.CancelCascadeInvocationResponse], error) { client, err := h.upstreamClient(ctx) if err != nil { return nil, connect.NewError(connect.CodeUnavailable, err) } return client.CancelCascadeInvocation(ctx, req) } func (h *LSRPCHandler) AcknowledgeCascadeCodeEdit( ctx context.Context, req *connect.Request[language_server_pb.AcknowledgeCascadeCodeEditRequest], ) (*connect.Response[language_server_pb.AcknowledgeCascadeCodeEditResponse], error) { client, err := h.upstreamClient(ctx) if err != nil { return nil, connect.NewError(connect.CodeUnavailable, err) } return client.AcknowledgeCascadeCodeEdit(ctx, req) } // ============================================================================ // Model config RPCs — proxied to real upstream // ============================================================================ func (h *LSRPCHandler) GetCascadeModelConfigs( ctx context.Context, req *connect.Request[language_server_pb.GetCascadeModelConfigsRequest], ) (*connect.Response[language_server_pb.GetCascadeModelConfigsResponse], error) { client, err := h.upstreamClient(ctx) if err != nil { // Fall back to static list when upstream unavailable. return connect.NewResponse(&language_server_pb.GetCascadeModelConfigsResponse{ Models: staticCascadeModels(), }), nil } resp, err := client.GetCascadeModelConfigs(ctx, req) if err != nil { return connect.NewResponse(&language_server_pb.GetCascadeModelConfigsResponse{ Models: staticCascadeModels(), }), nil } return resp, nil } func (h *LSRPCHandler) GetCommandModelConfigs( ctx context.Context, req *connect.Request[language_server_pb.GetCommandModelConfigsRequest], ) (*connect.Response[language_server_pb.GetCommandModelConfigsResponse], error) { client, err := h.upstreamClient(ctx) if err != nil { return connect.NewResponse(&language_server_pb.GetCommandModelConfigsResponse{ Models: staticCascadeModels(), }), nil } resp, err := client.GetCommandModelConfigs(ctx, req) if err != nil { return connect.NewResponse(&language_server_pb.GetCommandModelConfigsResponse{ Models: staticCascadeModels(), }), nil } return resp, nil } // staticCascadeModels returns a hard-coded model list as fallback. func staticCascadeModels() []*language_server_pb.ModelConfig { return []*language_server_pb.ModelConfig{ {Name: "claude-opus-4-7", DisplayName: "Claude Opus 4.7", MaxTokens: 200000, SupportsThinking: true, ThinkingBudget: 32000, SupportsImages: true, Provider: "anthropic"}, {Name: "claude-opus-4-6", DisplayName: "Claude Opus 4.6", MaxTokens: 200000, SupportsThinking: true, ThinkingBudget: 32000, SupportsImages: true, Provider: "anthropic"}, {Name: "claude-sonnet-4-6", DisplayName: "Claude Sonnet 4.6", MaxTokens: 200000, SupportsImages: true, Provider: "anthropic"}, {Name: "claude-haiku-4-5", DisplayName: "Claude Haiku 4.5", MaxTokens: 200000, SupportsImages: true, Provider: "anthropic"}, } } // ============================================================================ // File RPCs — local filesystem implementation // ============================================================================ func (h *LSRPCHandler) ReadFile( ctx context.Context, req *connect.Request[language_server_pb.ReadFileRequest], ) (*connect.Response[language_server_pb.ReadFileResponse], error) { path := req.Msg.GetPath() data, err := os.ReadFile(path) if err != nil { if os.IsNotExist(err) { return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("file not found: %s", path)) } return nil, connect.NewError(connect.CodeInternal, err) } return connect.NewResponse(&language_server_pb.ReadFileResponse{ Content: string(data), }), nil } func (h *LSRPCHandler) WriteFile( ctx context.Context, req *connect.Request[language_server_pb.WriteFileRequest], ) (*connect.Response[language_server_pb.WriteFileResponse], error) { path := req.Msg.GetPath() if req.Msg.GetCreateParent() { if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { return nil, connect.NewError(connect.CodeInternal, err) } } if err := os.WriteFile(path, []byte(req.Msg.GetContent()), 0o644); err != nil { return nil, connect.NewError(connect.CodeInternal, err) } return connect.NewResponse(&language_server_pb.WriteFileResponse{}), nil } func (h *LSRPCHandler) ReadDir( ctx context.Context, req *connect.Request[language_server_pb.ReadDirRequest], ) (*connect.Response[language_server_pb.ReadDirResponse], error) { path := req.Msg.GetPath() entries, err := os.ReadDir(path) if err != nil { if os.IsNotExist(err) { return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("directory not found: %s", path)) } return nil, connect.NewError(connect.CodeInternal, err) } files := make([]*language_server_pb.FileInfo, 0, len(entries)) for _, entry := range entries { info, err := entry.Info() if err != nil { continue } files = append(files, fileInfoFromOS(entry.Name(), info)) } return connect.NewResponse(&language_server_pb.ReadDirResponse{ Files: files, }), nil } func (h *LSRPCHandler) DeleteFileOrDirectory( ctx context.Context, req *connect.Request[language_server_pb.DeleteFileOrDirectoryRequest], ) (*connect.Response[language_server_pb.DeleteFileOrDirectoryResponse], error) { path := req.Msg.GetPath() if err := os.RemoveAll(path); err != nil { return nil, connect.NewError(connect.CodeInternal, err) } return connect.NewResponse(&language_server_pb.DeleteFileOrDirectoryResponse{}), nil } func (h *LSRPCHandler) StatUri( ctx context.Context, req *connect.Request[language_server_pb.StatUriRequest], ) (*connect.Response[language_server_pb.StatUriResponse], error) { path := req.Msg.GetPath() info, err := os.Stat(path) if err != nil { if os.IsNotExist(err) { return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("path not found: %s", path)) } return nil, connect.NewError(connect.CodeInternal, err) } return connect.NewResponse(&language_server_pb.StatUriResponse{ FileInfo: fileInfoFromOS(info.Name(), info), }), nil } func (h *LSRPCHandler) WatchDirectory( ctx context.Context, req *connect.Request[language_server_pb.WatchDirectoryRequest], stream *connect.ServerStream[language_server_pb.WatchDirectoryResponse], ) error { // Block until context is cancelled — real FS watching requires fsnotify which // is not in the dependency graph yet. This satisfies the interface contract // without crashing; the client will get an EOF when the connection closes. <-ctx.Done() return nil } // ============================================================================ // Health RPCs // ============================================================================ func (h *LSRPCHandler) Heartbeat( ctx context.Context, req *connect.Request[language_server_pb.HeartbeatRequest], ) (*connect.Response[language_server_pb.HeartbeatResponse], error) { return connect.NewResponse(&language_server_pb.HeartbeatResponse{ Healthy: true, Version: "sub2api", }), nil } func (h *LSRPCHandler) GetStatus( ctx context.Context, req *connect.Request[language_server_pb.GetStatusRequest], ) (*connect.Response[language_server_pb.GetStatusResponse], error) { return connect.NewResponse(&language_server_pb.GetStatusResponse{ Status: "running", Version: antigravity.BaseURL, }), nil } // ============================================================================ // Helpers // ============================================================================ func fileInfoFromOS(name string, info fs.FileInfo) *language_server_pb.FileInfo { t := language_server_pb.FileInfo_FILE if info.IsDir() { t = language_server_pb.FileInfo_DIRECTORY } else if info.Mode()&os.ModeSymlink != 0 { t = language_server_pb.FileInfo_SYMLINK } return &language_server_pb.FileInfo{ Path: name, Type: t, Size: info.Size(), ModifiedTime: timestamppb.New(info.ModTime()), } }