package lspool import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "net/url" ) func (i *Instance) callWorkerUnary(ctx context.Context, service, method, mode string, body []byte) ([]byte, error) { endpoint, err := i.workerEndpoint("/rpc/unary", service, method, mode) if err != nil { return nil, err } req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) if err != nil { return nil, err } req.Header.Set("X-Worker-Token", i.workerToken) if mode == "json" { req.Header.Set("Content-Type", "application/json") } else { req.Header.Set("Content-Type", "application/octet-stream") } resp, err := i.client.Do(req) if err != nil { return nil, fmt.Errorf("worker rpc %s/%s: %w", service, method, err) } defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("worker rpc read response: %w", err) } if resp.StatusCode != http.StatusOK { return respBody, fmt.Errorf("worker rpc %s/%s HTTP %d: %s", service, method, resp.StatusCode, truncate(string(respBody), 200)) } return respBody, nil } func (i *Instance) callWorkerStream(ctx context.Context, service, method, mode string, body []byte) (*http.Response, error) { endpoint, err := i.workerEndpoint("/rpc/stream", service, method, mode) if err != nil { return nil, err } req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) if err != nil { return nil, err } req.Header.Set("X-Worker-Token", i.workerToken) if mode == "json" { req.Header.Set("Content-Type", "application/json") } else { req.Header.Set("Content-Type", "application/octet-stream") } resp, err := i.client.Do(req) if err != nil { return nil, fmt.Errorf("worker stream rpc %s/%s: %w", service, method, err) } if resp.StatusCode != http.StatusOK { defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("worker stream rpc %s/%s HTTP %d: %s", service, method, resp.StatusCode, truncate(string(body), 200)) } return resp, nil } func (i *Instance) workerEndpoint(path, service, method, mode string) (string, error) { base := url.URL{ Scheme: "http", Host: i.Address, Path: path, } values := url.Values{} values.Set("service", service) values.Set("method", method) values.Set("mode", mode) if i.routingKey != "" { values.Set("routing_key", i.routingKey) } base.RawQuery = values.Encode() return base.String(), nil } func marshalWorkerJSONBody(input any) ([]byte, error) { if input == nil { return []byte("{}"), nil } body, err := json.Marshal(input) if err != nil { return nil, err } return body, nil }