sub2api/backend/internal/pkg/lspool/remote_instance.go

100 lines
2.6 KiB
Go

package lspool
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
)
func (i *Instance) callWorkerUnary(ctx context.Context, service, method, mode string, body []byte) ([]byte, error) {
endpoint, err := i.workerEndpoint("/rpc/unary", service, method, mode)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("X-Worker-Token", i.workerToken)
if mode == "json" {
req.Header.Set("Content-Type", "application/json")
} else {
req.Header.Set("Content-Type", "application/octet-stream")
}
resp, err := i.client.Do(req)
if err != nil {
return nil, fmt.Errorf("worker rpc %s/%s: %w", service, method, err)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("worker rpc read response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return respBody, fmt.Errorf("worker rpc %s/%s HTTP %d: %s", service, method, resp.StatusCode, truncate(string(respBody), 200))
}
return respBody, nil
}
func (i *Instance) callWorkerStream(ctx context.Context, service, method, mode string, body []byte) (*http.Response, error) {
endpoint, err := i.workerEndpoint("/rpc/stream", service, method, mode)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("X-Worker-Token", i.workerToken)
if mode == "json" {
req.Header.Set("Content-Type", "application/json")
} else {
req.Header.Set("Content-Type", "application/octet-stream")
}
resp, err := i.client.Do(req)
if err != nil {
return nil, fmt.Errorf("worker stream rpc %s/%s: %w", service, method, err)
}
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("worker stream rpc %s/%s HTTP %d: %s", service, method, resp.StatusCode, truncate(string(body), 200))
}
return resp, nil
}
func (i *Instance) workerEndpoint(path, service, method, mode string) (string, error) {
base := url.URL{
Scheme: "http",
Host: i.Address,
Path: path,
}
values := url.Values{}
values.Set("service", service)
values.Set("method", method)
values.Set("mode", mode)
if i.routingKey != "" {
values.Set("routing_key", i.routingKey)
}
base.RawQuery = values.Encode()
return base.String(), nil
}
func marshalWorkerJSONBody(input any) ([]byte, error) {
if input == nil {
return []byte("{}"), nil
}
body, err := json.Marshal(input)
if err != nil {
return nil, err
}
return body, nil
}