100 lines
2.6 KiB
Go
100 lines
2.6 KiB
Go
package lspool
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
)
|
|
|
|
func (i *Instance) callWorkerUnary(ctx context.Context, service, method, mode string, body []byte) ([]byte, error) {
|
|
endpoint, err := i.workerEndpoint("/rpc/unary", service, method, mode)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Header.Set("X-Worker-Token", i.workerToken)
|
|
if mode == "json" {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
} else {
|
|
req.Header.Set("Content-Type", "application/octet-stream")
|
|
}
|
|
|
|
resp, err := i.client.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("worker rpc %s/%s: %w", service, method, err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
respBody, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("worker rpc read response: %w", err)
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
return respBody, fmt.Errorf("worker rpc %s/%s HTTP %d: %s", service, method, resp.StatusCode, truncate(string(respBody), 200))
|
|
}
|
|
return respBody, nil
|
|
}
|
|
|
|
func (i *Instance) callWorkerStream(ctx context.Context, service, method, mode string, body []byte) (*http.Response, error) {
|
|
endpoint, err := i.workerEndpoint("/rpc/stream", service, method, mode)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Header.Set("X-Worker-Token", i.workerToken)
|
|
if mode == "json" {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
} else {
|
|
req.Header.Set("Content-Type", "application/octet-stream")
|
|
}
|
|
|
|
resp, err := i.client.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("worker stream rpc %s/%s: %w", service, method, err)
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
defer resp.Body.Close()
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("worker stream rpc %s/%s HTTP %d: %s", service, method, resp.StatusCode, truncate(string(body), 200))
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (i *Instance) workerEndpoint(path, service, method, mode string) (string, error) {
|
|
base := url.URL{
|
|
Scheme: "http",
|
|
Host: i.Address,
|
|
Path: path,
|
|
}
|
|
values := url.Values{}
|
|
values.Set("service", service)
|
|
values.Set("method", method)
|
|
values.Set("mode", mode)
|
|
if i.routingKey != "" {
|
|
values.Set("routing_key", i.routingKey)
|
|
}
|
|
base.RawQuery = values.Encode()
|
|
return base.String(), nil
|
|
}
|
|
|
|
func marshalWorkerJSONBody(input any) ([]byte, error) {
|
|
if input == nil {
|
|
return []byte("{}"), nil
|
|
}
|
|
body, err := json.Marshal(input)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return body, nil
|
|
}
|