Merge pull request #2776 from mt21625457/fix-http2-bug
[codex] 修复 OpenAI/Codex HTTP/2 响应头超时
This commit is contained in:
commit
a28e8e3d44
@ -680,6 +680,9 @@ type GatewayConfig struct {
|
|||||||
// 等待上游响应头的超时时间(秒),0表示无超时
|
// 等待上游响应头的超时时间(秒),0表示无超时
|
||||||
// 注意:这不影响流式数据传输,只控制等待响应头的时间
|
// 注意:这不影响流式数据传输,只控制等待响应头的时间
|
||||||
ResponseHeaderTimeout int `mapstructure:"response_header_timeout"`
|
ResponseHeaderTimeout int `mapstructure:"response_header_timeout"`
|
||||||
|
// OpenAIResponseHeaderTimeout: OpenAI/Codex 上游等待响应头的超时时间(秒),0表示无超时
|
||||||
|
// OpenAI/Codex 请求可能在上游排队较久;默认不使用通用响应头超时截断。
|
||||||
|
OpenAIResponseHeaderTimeout int `mapstructure:"openai_response_header_timeout"`
|
||||||
// 请求体最大字节数,用于网关请求体大小限制
|
// 请求体最大字节数,用于网关请求体大小限制
|
||||||
MaxBodySize int64 `mapstructure:"max_body_size"`
|
MaxBodySize int64 `mapstructure:"max_body_size"`
|
||||||
// 非流式上游响应体读取上限(字节),用于防止无界读取导致内存放大
|
// 非流式上游响应体读取上限(字节),用于防止无界读取导致内存放大
|
||||||
@ -707,6 +710,8 @@ type GatewayConfig struct {
|
|||||||
OpenAIPassthroughAllowTimeoutHeaders bool `mapstructure:"openai_passthrough_allow_timeout_headers"`
|
OpenAIPassthroughAllowTimeoutHeaders bool `mapstructure:"openai_passthrough_allow_timeout_headers"`
|
||||||
// OpenAIWS: OpenAI Responses WebSocket 配置(默认开启,可按需回滚到 HTTP)
|
// OpenAIWS: OpenAI Responses WebSocket 配置(默认开启,可按需回滚到 HTTP)
|
||||||
OpenAIWS GatewayOpenAIWSConfig `mapstructure:"openai_ws"`
|
OpenAIWS GatewayOpenAIWSConfig `mapstructure:"openai_ws"`
|
||||||
|
// OpenAIHTTP2: OpenAI HTTP 上游协议策略(默认启用 HTTP/2,可按代理能力回退 HTTP/1.1)
|
||||||
|
OpenAIHTTP2 GatewayOpenAIHTTP2Config `mapstructure:"openai_http2"`
|
||||||
// ImageConcurrency: 图片生成独立并发限制配置(默认关闭)
|
// ImageConcurrency: 图片生成独立并发限制配置(默认关闭)
|
||||||
ImageConcurrency ImageConcurrencyConfig `mapstructure:"image_concurrency"`
|
ImageConcurrency ImageConcurrencyConfig `mapstructure:"image_concurrency"`
|
||||||
|
|
||||||
@ -785,6 +790,21 @@ type GatewayConfig struct {
|
|||||||
UserMessageQueue UserMessageQueueConfig `mapstructure:"user_message_queue"`
|
UserMessageQueue UserMessageQueueConfig `mapstructure:"user_message_queue"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GatewayOpenAIHTTP2Config OpenAI HTTP 上游协议配置。
|
||||||
|
// 默认启用 HTTP/2;在部分代理不兼容时按策略回退 HTTP/1.1。
|
||||||
|
type GatewayOpenAIHTTP2Config struct {
|
||||||
|
// Enabled: 是否启用 OpenAI HTTP/2 优先策略
|
||||||
|
Enabled bool `mapstructure:"enabled"`
|
||||||
|
// AllowProxyFallbackToHTTP1: HTTP/HTTPS 代理出现明确 H2 兼容错误时,临时回退 HTTP/1.1
|
||||||
|
AllowProxyFallbackToHTTP1 bool `mapstructure:"allow_proxy_fallback_to_http1"`
|
||||||
|
// FallbackErrorThreshold: 回退窗口内累计多少次兼容错误后触发回退
|
||||||
|
FallbackErrorThreshold int `mapstructure:"fallback_error_threshold"`
|
||||||
|
// FallbackWindowSeconds: 统计兼容错误的时间窗口(秒)
|
||||||
|
FallbackWindowSeconds int `mapstructure:"fallback_window_seconds"`
|
||||||
|
// FallbackTTLSeconds: 触发后回退 HTTP/1.1 的持续时间(秒)
|
||||||
|
FallbackTTLSeconds int `mapstructure:"fallback_ttl_seconds"`
|
||||||
|
}
|
||||||
|
|
||||||
// UserMessageQueueConfig 用户消息串行队列配置
|
// UserMessageQueueConfig 用户消息串行队列配置
|
||||||
// 用于 Anthropic OAuth/SetupToken 账号的用户消息串行化发送
|
// 用于 Anthropic OAuth/SetupToken 账号的用户消息串行化发送
|
||||||
type UserMessageQueueConfig struct {
|
type UserMessageQueueConfig struct {
|
||||||
@ -1743,6 +1763,7 @@ func setDefaults() {
|
|||||||
|
|
||||||
// Gateway
|
// Gateway
|
||||||
viper.SetDefault("gateway.response_header_timeout", 600) // 600秒(10分钟)等待上游响应头,LLM高负载时可能排队较久
|
viper.SetDefault("gateway.response_header_timeout", 600) // 600秒(10分钟)等待上游响应头,LLM高负载时可能排队较久
|
||||||
|
viper.SetDefault("gateway.openai_response_header_timeout", 0)
|
||||||
viper.SetDefault("gateway.log_upstream_error_body", true)
|
viper.SetDefault("gateway.log_upstream_error_body", true)
|
||||||
viper.SetDefault("gateway.log_upstream_error_body_max_bytes", 2048)
|
viper.SetDefault("gateway.log_upstream_error_body_max_bytes", 2048)
|
||||||
viper.SetDefault("gateway.inject_beta_for_apikey", false)
|
viper.SetDefault("gateway.inject_beta_for_apikey", false)
|
||||||
@ -1798,6 +1819,12 @@ func setDefaults() {
|
|||||||
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.queue", 0.7)
|
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.queue", 0.7)
|
||||||
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.error_rate", 0.8)
|
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.error_rate", 0.8)
|
||||||
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.ttft", 0.5)
|
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.ttft", 0.5)
|
||||||
|
// OpenAI HTTP upstream protocol strategy
|
||||||
|
viper.SetDefault("gateway.openai_http2.enabled", true)
|
||||||
|
viper.SetDefault("gateway.openai_http2.allow_proxy_fallback_to_http1", true)
|
||||||
|
viper.SetDefault("gateway.openai_http2.fallback_error_threshold", 2)
|
||||||
|
viper.SetDefault("gateway.openai_http2.fallback_window_seconds", 60)
|
||||||
|
viper.SetDefault("gateway.openai_http2.fallback_ttl_seconds", 600)
|
||||||
viper.SetDefault("gateway.image_concurrency.enabled", false)
|
viper.SetDefault("gateway.image_concurrency.enabled", false)
|
||||||
viper.SetDefault("gateway.image_concurrency.max_concurrent_requests", 0)
|
viper.SetDefault("gateway.image_concurrency.max_concurrent_requests", 0)
|
||||||
viper.SetDefault("gateway.image_concurrency.overflow_mode", ImageConcurrencyOverflowModeReject)
|
viper.SetDefault("gateway.image_concurrency.overflow_mode", ImageConcurrencyOverflowModeReject)
|
||||||
@ -2365,6 +2392,12 @@ func (c *Config) Validate() error {
|
|||||||
if c.Gateway.ProxyProbeResponseReadMaxBytes <= 0 {
|
if c.Gateway.ProxyProbeResponseReadMaxBytes <= 0 {
|
||||||
return fmt.Errorf("gateway.proxy_probe_response_read_max_bytes must be positive")
|
return fmt.Errorf("gateway.proxy_probe_response_read_max_bytes must be positive")
|
||||||
}
|
}
|
||||||
|
if c.Gateway.ResponseHeaderTimeout < 0 {
|
||||||
|
return fmt.Errorf("gateway.response_header_timeout must be non-negative")
|
||||||
|
}
|
||||||
|
if c.Gateway.OpenAIResponseHeaderTimeout < 0 {
|
||||||
|
return fmt.Errorf("gateway.openai_response_header_timeout must be non-negative")
|
||||||
|
}
|
||||||
if strings.TrimSpace(c.Gateway.ConnectionPoolIsolation) != "" {
|
if strings.TrimSpace(c.Gateway.ConnectionPoolIsolation) != "" {
|
||||||
switch c.Gateway.ConnectionPoolIsolation {
|
switch c.Gateway.ConnectionPoolIsolation {
|
||||||
case ConnectionPoolIsolationProxy, ConnectionPoolIsolationAccount, ConnectionPoolIsolationAccountProxy:
|
case ConnectionPoolIsolationProxy, ConnectionPoolIsolationAccount, ConnectionPoolIsolationAccountProxy:
|
||||||
@ -2539,6 +2572,15 @@ func (c *Config) Validate() error {
|
|||||||
if c.Gateway.OpenAIWS.StickyPreviousResponseTTLSeconds < 0 {
|
if c.Gateway.OpenAIWS.StickyPreviousResponseTTLSeconds < 0 {
|
||||||
return fmt.Errorf("gateway.openai_ws.sticky_previous_response_ttl_seconds must be non-negative")
|
return fmt.Errorf("gateway.openai_ws.sticky_previous_response_ttl_seconds must be non-negative")
|
||||||
}
|
}
|
||||||
|
if c.Gateway.OpenAIHTTP2.FallbackErrorThreshold < 0 {
|
||||||
|
return fmt.Errorf("gateway.openai_http2.fallback_error_threshold must be non-negative")
|
||||||
|
}
|
||||||
|
if c.Gateway.OpenAIHTTP2.FallbackWindowSeconds < 0 {
|
||||||
|
return fmt.Errorf("gateway.openai_http2.fallback_window_seconds must be non-negative")
|
||||||
|
}
|
||||||
|
if c.Gateway.OpenAIHTTP2.FallbackTTLSeconds < 0 {
|
||||||
|
return fmt.Errorf("gateway.openai_http2.fallback_ttl_seconds must be non-negative")
|
||||||
|
}
|
||||||
if c.Gateway.OpenAIWS.SchedulerScoreWeights.Priority < 0 ||
|
if c.Gateway.OpenAIWS.SchedulerScoreWeights.Priority < 0 ||
|
||||||
c.Gateway.OpenAIWS.SchedulerScoreWeights.Load < 0 ||
|
c.Gateway.OpenAIWS.SchedulerScoreWeights.Load < 0 ||
|
||||||
c.Gateway.OpenAIWS.SchedulerScoreWeights.Queue < 0 ||
|
c.Gateway.OpenAIWS.SchedulerScoreWeights.Queue < 0 ||
|
||||||
|
|||||||
@ -163,6 +163,41 @@ func TestLoadDefaultOpenAIWSConfig(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLoadDefaultOpenAIHTTP2Enabled(t *testing.T) {
|
||||||
|
resetViperWithJWTSecret(t)
|
||||||
|
|
||||||
|
cfg, err := Load()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, cfg.Gateway.OpenAIHTTP2.Enabled)
|
||||||
|
require.True(t, cfg.Gateway.OpenAIHTTP2.AllowProxyFallbackToHTTP1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadOpenAIHTTP2DisabledFromEnv(t *testing.T) {
|
||||||
|
resetViperWithJWTSecret(t)
|
||||||
|
t.Setenv("GATEWAY_OPENAI_HTTP2_ENABLED", "false")
|
||||||
|
|
||||||
|
cfg, err := Load()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, cfg.Gateway.OpenAIHTTP2.Enabled)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadDefaultOpenAIResponseHeaderTimeoutUnlimited(t *testing.T) {
|
||||||
|
resetViperWithJWTSecret(t)
|
||||||
|
|
||||||
|
cfg, err := Load()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, cfg.Gateway.OpenAIResponseHeaderTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadOpenAIResponseHeaderTimeoutFromEnv(t *testing.T) {
|
||||||
|
resetViperWithJWTSecret(t)
|
||||||
|
t.Setenv("GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT", "1800")
|
||||||
|
|
||||||
|
cfg, err := Load()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1800, cfg.Gateway.OpenAIResponseHeaderTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
func TestLoadOpenAIWSStickyTTLCompatibility(t *testing.T) {
|
func TestLoadOpenAIWSStickyTTLCompatibility(t *testing.T) {
|
||||||
resetViperWithJWTSecret(t)
|
resetViperWithJWTSecret(t)
|
||||||
t.Setenv("GATEWAY_OPENAI_WS_STICKY_RESPONSE_ID_TTL_SECONDS", "0")
|
t.Setenv("GATEWAY_OPENAI_WS_STICKY_RESPONSE_ID_TTL_SECONDS", "0")
|
||||||
@ -1220,6 +1255,16 @@ func TestValidateConfigErrors(t *testing.T) {
|
|||||||
mutate: func(c *Config) { c.Gateway.MaxBodySize = 0 },
|
mutate: func(c *Config) { c.Gateway.MaxBodySize = 0 },
|
||||||
wantErr: "gateway.max_body_size",
|
wantErr: "gateway.max_body_size",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "gateway response header timeout",
|
||||||
|
mutate: func(c *Config) { c.Gateway.ResponseHeaderTimeout = -1 },
|
||||||
|
wantErr: "gateway.response_header_timeout",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "gateway openai response header timeout",
|
||||||
|
mutate: func(c *Config) { c.Gateway.OpenAIResponseHeaderTimeout = -1 },
|
||||||
|
wantErr: "gateway.openai_response_header_timeout",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "gateway max idle conns",
|
name: "gateway max idle conns",
|
||||||
mutate: func(c *Config) { c.Gateway.MaxIdleConns = 0 },
|
mutate: func(c *Config) { c.Gateway.MaxIdleConns = 0 },
|
||||||
@ -1275,6 +1320,21 @@ func TestValidateConfigErrors(t *testing.T) {
|
|||||||
mutate: func(c *Config) { c.Gateway.OpenAIWS.APIKeyMaxConnsFactor = 0 },
|
mutate: func(c *Config) { c.Gateway.OpenAIWS.APIKeyMaxConnsFactor = 0 },
|
||||||
wantErr: "gateway.openai_ws.apikey_max_conns_factor",
|
wantErr: "gateway.openai_ws.apikey_max_conns_factor",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "gateway openai http2 fallback threshold",
|
||||||
|
mutate: func(c *Config) { c.Gateway.OpenAIHTTP2.FallbackErrorThreshold = -1 },
|
||||||
|
wantErr: "gateway.openai_http2.fallback_error_threshold",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "gateway openai http2 fallback window",
|
||||||
|
mutate: func(c *Config) { c.Gateway.OpenAIHTTP2.FallbackWindowSeconds = -1 },
|
||||||
|
wantErr: "gateway.openai_http2.fallback_window_seconds",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "gateway openai http2 fallback ttl",
|
||||||
|
mutate: func(c *Config) { c.Gateway.OpenAIHTTP2.FallbackTTLSeconds = -1 },
|
||||||
|
wantErr: "gateway.openai_http2.fallback_ttl_seconds",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "gateway stream data interval range",
|
name: "gateway stream data interval range",
|
||||||
mutate: func(c *Config) { c.Gateway.StreamDataIntervalTimeout = 5 },
|
mutate: func(c *Config) { c.Gateway.StreamDataIntervalTimeout = 5 },
|
||||||
|
|||||||
@ -3,6 +3,8 @@ package repository
|
|||||||
import (
|
import (
|
||||||
"compress/flate"
|
"compress/flate"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -49,6 +51,17 @@ const (
|
|||||||
defaultMaxUpstreamClients = 5000
|
defaultMaxUpstreamClients = 5000
|
||||||
// defaultClientIdleTTLSeconds: 默认客户端空闲回收阈值(15分钟)
|
// defaultClientIdleTTLSeconds: 默认客户端空闲回收阈值(15分钟)
|
||||||
defaultClientIdleTTLSeconds = 900
|
defaultClientIdleTTLSeconds = 900
|
||||||
|
// OpenAI HTTP/2 代理回退策略默认值
|
||||||
|
defaultOpenAIHTTP2FallbackErrorThreshold = 2
|
||||||
|
defaultOpenAIHTTP2FallbackWindow = 60 * time.Second
|
||||||
|
defaultOpenAIHTTP2FallbackTTL = 10 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
upstreamProtocolModeDefault = "default"
|
||||||
|
upstreamProtocolModeOpenAIH1 = "openai_h1"
|
||||||
|
upstreamProtocolModeOpenAIH2 = "openai_h2"
|
||||||
|
upstreamProtocolModeOpenAIH1Fallback = "openai_h1_fallback"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errUpstreamClientLimitReached = errors.New("upstream client cache limit reached")
|
var errUpstreamClientLimitReached = errors.New("upstream client cache limit reached")
|
||||||
@ -63,14 +76,30 @@ type poolSettings struct {
|
|||||||
responseHeaderTimeout time.Duration // 等待响应头超时时间
|
responseHeaderTimeout time.Duration // 等待响应头超时时间
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type openAIHTTP2Settings struct {
|
||||||
|
enabled bool
|
||||||
|
allowProxyFallbackToHTTP1 bool
|
||||||
|
fallbackErrorThreshold int
|
||||||
|
fallbackWindow time.Duration
|
||||||
|
fallbackTTL time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
// upstreamClientEntry 上游客户端缓存条目
|
// upstreamClientEntry 上游客户端缓存条目
|
||||||
// 记录客户端实例及其元数据,用于连接池管理和淘汰策略
|
// 记录客户端实例及其元数据,用于连接池管理和淘汰策略
|
||||||
type upstreamClientEntry struct {
|
type upstreamClientEntry struct {
|
||||||
client *http.Client // HTTP 客户端实例
|
client *http.Client // HTTP 客户端实例
|
||||||
proxyKey string // 代理标识(用于检测代理变更)
|
proxyKey string // 代理标识(用于检测代理变更)
|
||||||
poolKey string // 连接池配置标识(用于检测配置变更)
|
poolKey string // 连接池配置标识(用于检测配置变更)
|
||||||
lastUsed int64 // 最后使用时间戳(纳秒),用于 LRU 淘汰
|
protocolMode string // 协议模式(default/openai_h1/openai_h2/openai_h1_fallback)
|
||||||
inFlight int64 // 当前进行中的请求数,>0 时不可淘汰
|
lastUsed int64 // 最后使用时间戳(纳秒),用于 LRU 淘汰
|
||||||
|
inFlight int64 // 当前进行中的请求数,>0 时不可淘汰
|
||||||
|
}
|
||||||
|
|
||||||
|
type openAIHTTP2FallbackState struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
windowStart time.Time
|
||||||
|
errorCount int
|
||||||
|
fallbackUntil time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// httpUpstreamService 通用 HTTP 上游服务
|
// httpUpstreamService 通用 HTTP 上游服务
|
||||||
@ -94,6 +123,8 @@ type httpUpstreamService struct {
|
|||||||
cfg *config.Config // 全局配置
|
cfg *config.Config // 全局配置
|
||||||
mu sync.RWMutex // 保护 clients map 的读写锁
|
mu sync.RWMutex // 保护 clients map 的读写锁
|
||||||
clients map[string]*upstreamClientEntry // 客户端缓存池,key 由隔离策略决定
|
clients map[string]*upstreamClientEntry // 客户端缓存池,key 由隔离策略决定
|
||||||
|
// OpenAI 走 HTTP/HTTPS 代理时的 H2->H1 回退状态(key=标准化 proxyKey)
|
||||||
|
openAIHTTP2Fallbacks sync.Map
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHTTPUpstream 创建通用 HTTP 上游服务
|
// NewHTTPUpstream 创建通用 HTTP 上游服务
|
||||||
@ -131,9 +162,13 @@ func (s *httpUpstreamService) Do(req *http.Request, proxyURL string, accountID i
|
|||||||
if err := s.validateRequestHost(req); err != nil {
|
if err := s.validateRequestHost(req); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
profile := service.HTTPUpstreamProfileDefault
|
||||||
|
if req != nil {
|
||||||
|
profile = service.HTTPUpstreamProfileFromContext(req.Context())
|
||||||
|
}
|
||||||
|
|
||||||
// 获取或创建对应的客户端,并标记请求占用
|
// 获取或创建对应的客户端,并标记请求占用
|
||||||
entry, err := s.acquireClient(proxyURL, accountID, accountConcurrency)
|
entry, err := s.acquireClientWithProfile(proxyURL, accountID, accountConcurrency, profile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -141,11 +176,13 @@ func (s *httpUpstreamService) Do(req *http.Request, proxyURL string, accountID i
|
|||||||
// 执行请求
|
// 执行请求
|
||||||
resp, err := entry.client.Do(req)
|
resp, err := entry.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.recordOpenAIHTTP2Failure(profile, entry.protocolMode, entry.proxyKey, err)
|
||||||
// 请求失败,立即减少计数
|
// 请求失败,立即减少计数
|
||||||
atomic.AddInt64(&entry.inFlight, -1)
|
atomic.AddInt64(&entry.inFlight, -1)
|
||||||
atomic.StoreInt64(&entry.lastUsed, time.Now().UnixNano())
|
atomic.StoreInt64(&entry.lastUsed, time.Now().UnixNano())
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
s.recordOpenAIHTTP2Success(profile, entry.protocolMode, entry.proxyKey)
|
||||||
|
|
||||||
// 如果上游返回了压缩内容,解压后再交给业务层
|
// 如果上游返回了压缩内容,解压后再交给业务层
|
||||||
decompressResponseBody(resp)
|
decompressResponseBody(resp)
|
||||||
@ -168,6 +205,10 @@ func (s *httpUpstreamService) DoWithTLS(req *http.Request, proxyURL string, acco
|
|||||||
if profile == nil {
|
if profile == nil {
|
||||||
return s.Do(req, proxyURL, accountID, accountConcurrency)
|
return s.Do(req, proxyURL, accountID, accountConcurrency)
|
||||||
}
|
}
|
||||||
|
upstreamProfile := service.HTTPUpstreamProfileDefault
|
||||||
|
if req != nil {
|
||||||
|
upstreamProfile = service.HTTPUpstreamProfileFromContext(req.Context())
|
||||||
|
}
|
||||||
|
|
||||||
targetHost := ""
|
targetHost := ""
|
||||||
if req != nil && req.URL != nil {
|
if req != nil && req.URL != nil {
|
||||||
@ -183,7 +224,7 @@ func (s *httpUpstreamService) DoWithTLS(req *http.Request, proxyURL string, acco
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
entry, err := s.acquireClientWithTLS(proxyURL, accountID, accountConcurrency, profile)
|
entry, err := s.acquireClientWithTLS(proxyURL, accountID, accountConcurrency, profile, upstreamProfile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Debug("tls_fingerprint_acquire_client_failed", "account_id", accountID, "error", err)
|
slog.Debug("tls_fingerprint_acquire_client_failed", "account_id", accountID, "error", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -208,21 +249,23 @@ func (s *httpUpstreamService) DoWithTLS(req *http.Request, proxyURL string, acco
|
|||||||
}
|
}
|
||||||
|
|
||||||
// acquireClientWithTLS 获取或创建带 TLS 指纹的客户端
|
// acquireClientWithTLS 获取或创建带 TLS 指纹的客户端
|
||||||
func (s *httpUpstreamService) acquireClientWithTLS(proxyURL string, accountID int64, accountConcurrency int, profile *tlsfingerprint.Profile) (*upstreamClientEntry, error) {
|
func (s *httpUpstreamService) acquireClientWithTLS(proxyURL string, accountID int64, accountConcurrency int, profile *tlsfingerprint.Profile, upstreamProfile service.HTTPUpstreamProfile) (*upstreamClientEntry, error) {
|
||||||
return s.getClientEntryWithTLS(proxyURL, accountID, accountConcurrency, profile, true, true)
|
return s.getClientEntryWithTLS(proxyURL, accountID, accountConcurrency, profile, upstreamProfile, true, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getClientEntryWithTLS 获取或创建带 TLS 指纹的客户端条目
|
// getClientEntryWithTLS 获取或创建带 TLS 指纹的客户端条目
|
||||||
// TLS 指纹客户端使用独立的缓存键,与普通客户端隔离
|
// TLS 指纹客户端使用独立的缓存键,与普通客户端隔离
|
||||||
func (s *httpUpstreamService) getClientEntryWithTLS(proxyURL string, accountID int64, accountConcurrency int, profile *tlsfingerprint.Profile, markInFlight bool, enforceLimit bool) (*upstreamClientEntry, error) {
|
func (s *httpUpstreamService) getClientEntryWithTLS(proxyURL string, accountID int64, accountConcurrency int, profile *tlsfingerprint.Profile, upstreamProfile service.HTTPUpstreamProfile, markInFlight bool, enforceLimit bool) (*upstreamClientEntry, error) {
|
||||||
isolation := s.getIsolationMode()
|
isolation := s.getIsolationMode()
|
||||||
proxyKey, parsedProxy, err := normalizeProxyURL(proxyURL)
|
proxyKey, parsedProxy, err := normalizeProxyURL(proxyURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
settings := s.resolvePoolSettings(isolation, accountConcurrency)
|
||||||
|
settings = s.applyProfilePoolSettings(settings, upstreamProfile)
|
||||||
// TLS 指纹客户端使用独立的缓存键,加 "tls:" 前缀
|
// TLS 指纹客户端使用独立的缓存键,加 "tls:" 前缀
|
||||||
cacheKey := "tls:" + buildCacheKey(isolation, proxyKey, accountID)
|
cacheKey := "tls:" + buildCacheKey(isolation, proxyKey, accountID, upstreamProtocolModeDefault)
|
||||||
poolKey := s.buildPoolKey(isolation, accountConcurrency) + ":tls"
|
poolKey := buildPoolKey(settings, upstreamProtocolModeDefault) + ":tls"
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
nowUnix := now.UnixNano()
|
nowUnix := now.UnixNano()
|
||||||
@ -273,7 +316,6 @@ func (s *httpUpstreamService) getClientEntryWithTLS(proxyURL string, accountID i
|
|||||||
|
|
||||||
// 创建带 TLS 指纹的 Transport
|
// 创建带 TLS 指纹的 Transport
|
||||||
slog.Debug("tls_fingerprint_creating_new_client", "account_id", accountID, "cache_key", cacheKey, "proxy", proxyKey)
|
slog.Debug("tls_fingerprint_creating_new_client", "account_id", accountID, "cache_key", cacheKey, "proxy", proxyKey)
|
||||||
settings := s.resolvePoolSettings(isolation, accountConcurrency)
|
|
||||||
transport, err := buildUpstreamTransportWithTLSFingerprint(settings, parsedProxy, profile)
|
transport, err := buildUpstreamTransportWithTLSFingerprint(settings, parsedProxy, profile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
@ -339,7 +381,12 @@ func (s *httpUpstreamService) redirectChecker(req *http.Request, via []*http.Req
|
|||||||
// acquireClient 获取或创建客户端,并标记为进行中请求
|
// acquireClient 获取或创建客户端,并标记为进行中请求
|
||||||
// 用于请求路径,避免在获取后被淘汰
|
// 用于请求路径,避免在获取后被淘汰
|
||||||
func (s *httpUpstreamService) acquireClient(proxyURL string, accountID int64, accountConcurrency int) (*upstreamClientEntry, error) {
|
func (s *httpUpstreamService) acquireClient(proxyURL string, accountID int64, accountConcurrency int) (*upstreamClientEntry, error) {
|
||||||
return s.getClientEntry(proxyURL, accountID, accountConcurrency, true, true)
|
return s.acquireClientWithProfile(proxyURL, accountID, accountConcurrency, service.HTTPUpstreamProfileDefault)
|
||||||
|
}
|
||||||
|
|
||||||
|
// acquireClientWithProfile 获取或创建客户端,并按请求 profile 选择协议策略。
|
||||||
|
func (s *httpUpstreamService) acquireClientWithProfile(proxyURL string, accountID int64, accountConcurrency int, profile service.HTTPUpstreamProfile) (*upstreamClientEntry, error) {
|
||||||
|
return s.getClientEntry(proxyURL, accountID, accountConcurrency, profile, true, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getOrCreateClient 获取或创建客户端
|
// getOrCreateClient 获取或创建客户端
|
||||||
@ -358,13 +405,13 @@ func (s *httpUpstreamService) acquireClient(proxyURL string, accountID int64, ac
|
|||||||
// - account: 按账户隔离,同一账户共享客户端(代理变更时重建)
|
// - account: 按账户隔离,同一账户共享客户端(代理变更时重建)
|
||||||
// - account_proxy: 按账户+代理组合隔离,最细粒度
|
// - account_proxy: 按账户+代理组合隔离,最细粒度
|
||||||
func (s *httpUpstreamService) getOrCreateClient(proxyURL string, accountID int64, accountConcurrency int) (*upstreamClientEntry, error) {
|
func (s *httpUpstreamService) getOrCreateClient(proxyURL string, accountID int64, accountConcurrency int) (*upstreamClientEntry, error) {
|
||||||
return s.getClientEntry(proxyURL, accountID, accountConcurrency, false, false)
|
return s.getClientEntry(proxyURL, accountID, accountConcurrency, service.HTTPUpstreamProfileDefault, false, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getClientEntry 获取或创建客户端条目
|
// getClientEntry 获取或创建客户端条目
|
||||||
// markInFlight=true 时会标记进行中请求,用于请求路径防止被淘汰
|
// markInFlight=true 时会标记进行中请求,用于请求路径防止被淘汰
|
||||||
// enforceLimit=true 时会限制客户端数量,超限且无法淘汰时返回错误
|
// enforceLimit=true 时会限制客户端数量,超限且无法淘汰时返回错误
|
||||||
func (s *httpUpstreamService) getClientEntry(proxyURL string, accountID int64, accountConcurrency int, markInFlight bool, enforceLimit bool) (*upstreamClientEntry, error) {
|
func (s *httpUpstreamService) getClientEntry(proxyURL string, accountID int64, accountConcurrency int, profile service.HTTPUpstreamProfile, markInFlight bool, enforceLimit bool) (*upstreamClientEntry, error) {
|
||||||
// 获取隔离模式
|
// 获取隔离模式
|
||||||
isolation := s.getIsolationMode()
|
isolation := s.getIsolationMode()
|
||||||
// 标准化代理 URL 并解析
|
// 标准化代理 URL 并解析
|
||||||
@ -372,10 +419,14 @@ func (s *httpUpstreamService) getClientEntry(proxyURL string, accountID int64, a
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
// 根据请求 profile(例如 OpenAI)选择协议模式
|
||||||
|
protocolMode := s.resolveProtocolMode(profile, proxyKey, parsedProxy)
|
||||||
|
settings := s.resolvePoolSettings(isolation, accountConcurrency)
|
||||||
|
settings = s.applyProfilePoolSettings(settings, profile)
|
||||||
// 构建缓存键(根据隔离策略不同)
|
// 构建缓存键(根据隔离策略不同)
|
||||||
cacheKey := buildCacheKey(isolation, proxyKey, accountID)
|
cacheKey := buildCacheKey(isolation, proxyKey, accountID, protocolMode)
|
||||||
// 构建连接池配置键(用于检测配置变更)
|
// 构建连接池配置键(用于检测配置变更)
|
||||||
poolKey := s.buildPoolKey(isolation, accountConcurrency)
|
poolKey := buildPoolKey(settings, protocolMode)
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
nowUnix := now.UnixNano()
|
nowUnix := now.UnixNano()
|
||||||
@ -418,8 +469,7 @@ func (s *httpUpstreamService) getClientEntry(proxyURL string, accountID int64, a
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 缓存未命中或需要重建,创建新客户端
|
// 缓存未命中或需要重建,创建新客户端
|
||||||
settings := s.resolvePoolSettings(isolation, accountConcurrency)
|
transport, err := buildUpstreamTransport(settings, parsedProxy, protocolMode)
|
||||||
transport, err := buildUpstreamTransport(settings, parsedProxy)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
return nil, fmt.Errorf("build transport: %w", err)
|
return nil, fmt.Errorf("build transport: %w", err)
|
||||||
@ -429,9 +479,10 @@ func (s *httpUpstreamService) getClientEntry(proxyURL string, accountID int64, a
|
|||||||
client.CheckRedirect = s.redirectChecker
|
client.CheckRedirect = s.redirectChecker
|
||||||
}
|
}
|
||||||
entry := &upstreamClientEntry{
|
entry := &upstreamClientEntry{
|
||||||
client: client,
|
client: client,
|
||||||
proxyKey: proxyKey,
|
proxyKey: proxyKey,
|
||||||
poolKey: poolKey,
|
poolKey: poolKey,
|
||||||
|
protocolMode: protocolMode,
|
||||||
}
|
}
|
||||||
atomic.StoreInt64(&entry.lastUsed, nowUnix)
|
atomic.StoreInt64(&entry.lastUsed, nowUnix)
|
||||||
if markInFlight {
|
if markInFlight {
|
||||||
@ -615,22 +666,31 @@ func (s *httpUpstreamService) resolvePoolSettings(isolation string, accountConcu
|
|||||||
return settings
|
return settings
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildPoolKey 构建连接池配置键
|
func (s *httpUpstreamService) applyProfilePoolSettings(settings poolSettings, profile service.HTTPUpstreamProfile) poolSettings {
|
||||||
// 用于检测配置变更,配置变更时需要重建客户端
|
if profile != service.HTTPUpstreamProfileOpenAI {
|
||||||
//
|
return settings
|
||||||
// 参数:
|
|
||||||
// - isolation: 隔离模式
|
|
||||||
// - accountConcurrency: 账户并发限制
|
|
||||||
//
|
|
||||||
// 返回:
|
|
||||||
// - string: 配置键
|
|
||||||
func (s *httpUpstreamService) buildPoolKey(isolation string, accountConcurrency int) string {
|
|
||||||
if isolation == config.ConnectionPoolIsolationAccount || isolation == config.ConnectionPoolIsolationAccountProxy {
|
|
||||||
if accountConcurrency > 0 {
|
|
||||||
return fmt.Sprintf("account:%d", accountConcurrency)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return "default"
|
settings.responseHeaderTimeout = 0
|
||||||
|
if s != nil && s.cfg != nil && s.cfg.Gateway.OpenAIResponseHeaderTimeout > 0 {
|
||||||
|
settings.responseHeaderTimeout = time.Duration(s.cfg.Gateway.OpenAIResponseHeaderTimeout) * time.Second
|
||||||
|
}
|
||||||
|
return settings
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildPoolKey 构建连接池配置键,用于检测连接池配置变更。
|
||||||
|
func buildPoolKey(settings poolSettings, protocolMode string) string {
|
||||||
|
base := fmt.Sprintf(
|
||||||
|
"idle:%d|idle_host:%d|max:%d|idle_timeout:%s|header_timeout:%s",
|
||||||
|
settings.maxIdleConns,
|
||||||
|
settings.maxIdleConnsPerHost,
|
||||||
|
settings.maxConnsPerHost,
|
||||||
|
settings.idleConnTimeout,
|
||||||
|
settings.responseHeaderTimeout,
|
||||||
|
)
|
||||||
|
if protocolMode == "" || protocolMode == upstreamProtocolModeDefault {
|
||||||
|
return base
|
||||||
|
}
|
||||||
|
return base + "|proto:" + protocolMode
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildCacheKey 构建客户端缓存键
|
// buildCacheKey 构建客户端缓存键
|
||||||
@ -648,15 +708,245 @@ func (s *httpUpstreamService) buildPoolKey(isolation string, accountConcurrency
|
|||||||
// - proxy 模式: "proxy:{proxyKey}"
|
// - proxy 模式: "proxy:{proxyKey}"
|
||||||
// - account 模式: "account:{accountID}"
|
// - account 模式: "account:{accountID}"
|
||||||
// - account_proxy 模式: "account:{accountID}|proxy:{proxyKey}"
|
// - account_proxy 模式: "account:{accountID}|proxy:{proxyKey}"
|
||||||
func buildCacheKey(isolation, proxyKey string, accountID int64) string {
|
func buildCacheKey(isolation, proxyKey string, accountID int64, protocolMode string) string {
|
||||||
|
var base string
|
||||||
switch isolation {
|
switch isolation {
|
||||||
case config.ConnectionPoolIsolationAccount:
|
case config.ConnectionPoolIsolationAccount:
|
||||||
return fmt.Sprintf("account:%d", accountID)
|
base = fmt.Sprintf("account:%d", accountID)
|
||||||
case config.ConnectionPoolIsolationAccountProxy:
|
case config.ConnectionPoolIsolationAccountProxy:
|
||||||
return fmt.Sprintf("account:%d|proxy:%s", accountID, proxyKey)
|
base = fmt.Sprintf("account:%d|proxy:%s", accountID, proxyKey)
|
||||||
default:
|
default:
|
||||||
return fmt.Sprintf("proxy:%s", proxyKey)
|
base = fmt.Sprintf("proxy:%s", proxyKey)
|
||||||
}
|
}
|
||||||
|
if protocolMode != "" && protocolMode != upstreamProtocolModeDefault {
|
||||||
|
base += "|proto:" + protocolMode
|
||||||
|
}
|
||||||
|
return base
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *httpUpstreamService) resolveOpenAIHTTP2Settings() openAIHTTP2Settings {
|
||||||
|
settings := openAIHTTP2Settings{
|
||||||
|
enabled: false,
|
||||||
|
allowProxyFallbackToHTTP1: true,
|
||||||
|
fallbackErrorThreshold: defaultOpenAIHTTP2FallbackErrorThreshold,
|
||||||
|
fallbackWindow: defaultOpenAIHTTP2FallbackWindow,
|
||||||
|
fallbackTTL: defaultOpenAIHTTP2FallbackTTL,
|
||||||
|
}
|
||||||
|
if s == nil || s.cfg == nil {
|
||||||
|
return settings
|
||||||
|
}
|
||||||
|
cfg := s.cfg.Gateway.OpenAIHTTP2
|
||||||
|
settings.enabled = cfg.Enabled
|
||||||
|
settings.allowProxyFallbackToHTTP1 = cfg.AllowProxyFallbackToHTTP1
|
||||||
|
if cfg.FallbackErrorThreshold > 0 {
|
||||||
|
settings.fallbackErrorThreshold = cfg.FallbackErrorThreshold
|
||||||
|
}
|
||||||
|
if cfg.FallbackWindowSeconds > 0 {
|
||||||
|
settings.fallbackWindow = time.Duration(cfg.FallbackWindowSeconds) * time.Second
|
||||||
|
}
|
||||||
|
if cfg.FallbackTTLSeconds > 0 {
|
||||||
|
settings.fallbackTTL = time.Duration(cfg.FallbackTTLSeconds) * time.Second
|
||||||
|
}
|
||||||
|
return settings
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *httpUpstreamService) resolveProtocolMode(profile service.HTTPUpstreamProfile, proxyKey string, parsedProxy *url.URL) string {
|
||||||
|
if profile != service.HTTPUpstreamProfileOpenAI {
|
||||||
|
return upstreamProtocolModeDefault
|
||||||
|
}
|
||||||
|
settings := s.resolveOpenAIHTTP2Settings()
|
||||||
|
if !settings.enabled {
|
||||||
|
return upstreamProtocolModeOpenAIH1
|
||||||
|
}
|
||||||
|
if parsedProxy == nil {
|
||||||
|
return upstreamProtocolModeOpenAIH2
|
||||||
|
}
|
||||||
|
scheme := strings.ToLower(parsedProxy.Scheme)
|
||||||
|
if scheme != "http" && scheme != "https" {
|
||||||
|
return upstreamProtocolModeOpenAIH2
|
||||||
|
}
|
||||||
|
if settings.allowProxyFallbackToHTTP1 && s.isOpenAIHTTP2FallbackActive(proxyKey) {
|
||||||
|
return upstreamProtocolModeOpenAIH1Fallback
|
||||||
|
}
|
||||||
|
return upstreamProtocolModeOpenAIH2
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *httpUpstreamService) isOpenAIHTTP2FallbackActive(proxyKey string) bool {
|
||||||
|
raw, ok := s.openAIHTTP2Fallbacks.Load(proxyKey)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
state, ok := raw.(*openAIHTTP2FallbackState)
|
||||||
|
if !ok || state == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return state.isFallbackActive(time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *httpUpstreamService) getOrCreateOpenAIHTTP2FallbackState(proxyKey string) *openAIHTTP2FallbackState {
|
||||||
|
state := &openAIHTTP2FallbackState{}
|
||||||
|
actual, _ := s.openAIHTTP2Fallbacks.LoadOrStore(proxyKey, state)
|
||||||
|
cached, ok := actual.(*openAIHTTP2FallbackState)
|
||||||
|
if !ok || cached == nil {
|
||||||
|
return state
|
||||||
|
}
|
||||||
|
return cached
|
||||||
|
}
|
||||||
|
|
||||||
|
func isHTTPProxyKey(proxyKey string) bool {
|
||||||
|
return strings.HasPrefix(proxyKey, "http://") || strings.HasPrefix(proxyKey, "https://")
|
||||||
|
}
|
||||||
|
|
||||||
|
func isOpenAIHTTP2CompatibilityError(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if isUpstreamTimeoutError(err) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
msg := strings.ToLower(err.Error())
|
||||||
|
if msg == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
markers := []string{
|
||||||
|
"alpn",
|
||||||
|
"no application protocol",
|
||||||
|
"protocol error",
|
||||||
|
"stream error",
|
||||||
|
"goaway",
|
||||||
|
"refused_stream",
|
||||||
|
"frame too large",
|
||||||
|
}
|
||||||
|
for _, marker := range markers {
|
||||||
|
if strings.Contains(msg, marker) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func isUpstreamTimeoutError(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if errors.Is(err, context.DeadlineExceeded) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
var netErr net.Error
|
||||||
|
if errors.As(err, &netErr) && netErr.Timeout() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
msg := strings.ToLower(err.Error())
|
||||||
|
if msg == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
timeoutMarkers := []string{
|
||||||
|
"timeout awaiting response headers",
|
||||||
|
"i/o timeout",
|
||||||
|
"context deadline exceeded",
|
||||||
|
"client.timeout exceeded while awaiting headers",
|
||||||
|
"tls handshake timeout",
|
||||||
|
}
|
||||||
|
for _, marker := range timeoutMarkers {
|
||||||
|
if strings.Contains(msg, marker) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *httpUpstreamService) recordOpenAIHTTP2Failure(profile service.HTTPUpstreamProfile, protocolMode, proxyKey string, err error) {
|
||||||
|
if profile != service.HTTPUpstreamProfileOpenAI || protocolMode != upstreamProtocolModeOpenAIH2 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
settings := s.resolveOpenAIHTTP2Settings()
|
||||||
|
if !settings.enabled || !settings.allowProxyFallbackToHTTP1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !isHTTPProxyKey(proxyKey) || !isOpenAIHTTP2CompatibilityError(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
state := s.getOrCreateOpenAIHTTP2FallbackState(proxyKey)
|
||||||
|
activated, until := state.recordFailure(time.Now(), settings.fallbackErrorThreshold, settings.fallbackWindow, settings.fallbackTTL)
|
||||||
|
if activated {
|
||||||
|
slog.Warn("openai_http2_proxy_fallback_activated",
|
||||||
|
"proxy", proxyKey,
|
||||||
|
"fallback_until", until.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *httpUpstreamService) recordOpenAIHTTP2Success(profile service.HTTPUpstreamProfile, protocolMode, proxyKey string) {
|
||||||
|
if profile != service.HTTPUpstreamProfileOpenAI || protocolMode != upstreamProtocolModeOpenAIH2 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !isHTTPProxyKey(proxyKey) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
raw, ok := s.openAIHTTP2Fallbacks.Load(proxyKey)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
state, ok := raw.(*openAIHTTP2FallbackState)
|
||||||
|
if !ok || state == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
state.resetErrorWindow()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *openAIHTTP2FallbackState) isFallbackActive(now time.Time) bool {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
if s.fallbackUntil.IsZero() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if now.Before(s.fallbackUntil) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
s.fallbackUntil = time.Time{}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *openAIHTTP2FallbackState) resetErrorWindow() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.windowStart = time.Time{}
|
||||||
|
s.errorCount = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *openAIHTTP2FallbackState) recordFailure(now time.Time, threshold int, window, ttl time.Duration) (bool, time.Time) {
|
||||||
|
if threshold <= 0 {
|
||||||
|
threshold = defaultOpenAIHTTP2FallbackErrorThreshold
|
||||||
|
}
|
||||||
|
if window <= 0 {
|
||||||
|
window = defaultOpenAIHTTP2FallbackWindow
|
||||||
|
}
|
||||||
|
if ttl <= 0 {
|
||||||
|
ttl = defaultOpenAIHTTP2FallbackTTL
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if !s.fallbackUntil.IsZero() && now.Before(s.fallbackUntil) {
|
||||||
|
return false, s.fallbackUntil
|
||||||
|
}
|
||||||
|
if !s.fallbackUntil.IsZero() && !now.Before(s.fallbackUntil) {
|
||||||
|
s.fallbackUntil = time.Time{}
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.windowStart.IsZero() || now.Sub(s.windowStart) > window {
|
||||||
|
s.windowStart = now
|
||||||
|
s.errorCount = 0
|
||||||
|
}
|
||||||
|
s.errorCount++
|
||||||
|
if s.errorCount < threshold {
|
||||||
|
return false, time.Time{}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.fallbackUntil = now.Add(ttl)
|
||||||
|
s.windowStart = time.Time{}
|
||||||
|
s.errorCount = 0
|
||||||
|
return true, s.fallbackUntil
|
||||||
}
|
}
|
||||||
|
|
||||||
// normalizeProxyURL 标准化代理 URL
|
// normalizeProxyURL 标准化代理 URL
|
||||||
@ -728,7 +1018,7 @@ func defaultPoolSettings(cfg *config.Config) poolSettings {
|
|||||||
if cfg.Gateway.IdleConnTimeoutSeconds > 0 {
|
if cfg.Gateway.IdleConnTimeoutSeconds > 0 {
|
||||||
idleConnTimeout = time.Duration(cfg.Gateway.IdleConnTimeoutSeconds) * time.Second
|
idleConnTimeout = time.Duration(cfg.Gateway.IdleConnTimeoutSeconds) * time.Second
|
||||||
}
|
}
|
||||||
if cfg.Gateway.ResponseHeaderTimeout > 0 {
|
if cfg.Gateway.ResponseHeaderTimeout >= 0 {
|
||||||
responseHeaderTimeout = time.Duration(cfg.Gateway.ResponseHeaderTimeout) * time.Second
|
responseHeaderTimeout = time.Duration(cfg.Gateway.ResponseHeaderTimeout) * time.Second
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -759,7 +1049,7 @@ func defaultPoolSettings(cfg *config.Config) poolSettings {
|
|||||||
// - MaxConnsPerHost: 每主机最大连接数(达到后新请求等待)
|
// - MaxConnsPerHost: 每主机最大连接数(达到后新请求等待)
|
||||||
// - IdleConnTimeout: 空闲连接超时(超时后关闭)
|
// - IdleConnTimeout: 空闲连接超时(超时后关闭)
|
||||||
// - ResponseHeaderTimeout: 等待响应头超时(不影响流式传输)
|
// - ResponseHeaderTimeout: 等待响应头超时(不影响流式传输)
|
||||||
func buildUpstreamTransport(settings poolSettings, proxyURL *url.URL) (*http.Transport, error) {
|
func buildUpstreamTransport(settings poolSettings, proxyURL *url.URL, protocolMode string) (*http.Transport, error) {
|
||||||
transport := &http.Transport{
|
transport := &http.Transport{
|
||||||
MaxIdleConns: settings.maxIdleConns,
|
MaxIdleConns: settings.maxIdleConns,
|
||||||
MaxIdleConnsPerHost: settings.maxIdleConnsPerHost,
|
MaxIdleConnsPerHost: settings.maxIdleConnsPerHost,
|
||||||
@ -767,6 +1057,17 @@ func buildUpstreamTransport(settings poolSettings, proxyURL *url.URL) (*http.Tra
|
|||||||
IdleConnTimeout: settings.idleConnTimeout,
|
IdleConnTimeout: settings.idleConnTimeout,
|
||||||
ResponseHeaderTimeout: settings.responseHeaderTimeout,
|
ResponseHeaderTimeout: settings.responseHeaderTimeout,
|
||||||
}
|
}
|
||||||
|
switch protocolMode {
|
||||||
|
case upstreamProtocolModeOpenAIH2:
|
||||||
|
transport.ForceAttemptHTTP2 = true
|
||||||
|
case upstreamProtocolModeOpenAIH1:
|
||||||
|
transport.ForceAttemptHTTP2 = false
|
||||||
|
transport.TLSNextProto = make(map[string]func(string, *tls.Conn) http.RoundTripper)
|
||||||
|
case upstreamProtocolModeOpenAIH1Fallback:
|
||||||
|
// 显式禁用 HTTP/2,确保代理不兼容场景回退到 HTTP/1.1。
|
||||||
|
transport.ForceAttemptHTTP2 = false
|
||||||
|
transport.TLSNextProto = make(map[string]func(string, *tls.Conn) http.RoundTripper)
|
||||||
|
}
|
||||||
if err := proxyutil.ConfigureTransportProxy(transport, proxyURL); err != nil {
|
if err := proxyutil.ConfigureTransportProxy(transport, proxyURL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -45,7 +45,7 @@ func BenchmarkHTTPUpstreamProxyClient(b *testing.B) {
|
|||||||
settings := defaultPoolSettings(cfg)
|
settings := defaultPoolSettings(cfg)
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
// 每次迭代都创建新客户端,包含 Transport 分配
|
// 每次迭代都创建新客户端,包含 Transport 分配
|
||||||
transport, err := buildUpstreamTransport(settings, parsedProxy)
|
transport, err := buildUpstreamTransport(settings, parsedProxy, upstreamProtocolModeDefault)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("创建 Transport 失败: %v", err)
|
b.Fatalf("创建 Transport 失败: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package repository
|
package repository
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
@ -8,6 +9,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Wei-Shaw/sub2api/internal/config"
|
"github.com/Wei-Shaw/sub2api/internal/config"
|
||||||
|
"github.com/Wei-Shaw/sub2api/internal/pkg/tlsfingerprint"
|
||||||
|
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
)
|
)
|
||||||
@ -41,12 +44,23 @@ func (s *HTTPUpstreamSuite) newService() *httpUpstreamService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TestDefaultResponseHeaderTimeout 测试默认响应头超时配置
|
// TestDefaultResponseHeaderTimeout 测试默认响应头超时配置
|
||||||
// 验证未配置时使用 300 秒默认值
|
// 验证显式 0 会禁用等待响应头超时
|
||||||
func (s *HTTPUpstreamSuite) TestDefaultResponseHeaderTimeout() {
|
func (s *HTTPUpstreamSuite) TestDefaultResponseHeaderTimeout() {
|
||||||
svc := s.newService()
|
svc := s.newService()
|
||||||
entry := mustGetOrCreateClient(s.T(), svc, "", 0, 0)
|
entry := mustGetOrCreateClient(s.T(), svc, "", 0, 0)
|
||||||
transport, ok := entry.client.Transport.(*http.Transport)
|
transport, ok := entry.client.Transport.(*http.Transport)
|
||||||
require.True(s.T(), ok, "expected *http.Transport")
|
require.True(s.T(), ok, "expected *http.Transport")
|
||||||
|
require.Equal(s.T(), time.Duration(0), transport.ResponseHeaderTimeout, "ResponseHeaderTimeout mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNilConfigResponseHeaderTimeoutFallback 验证 nil 配置使用代码级兜底值。
|
||||||
|
func (s *HTTPUpstreamSuite) TestNilConfigResponseHeaderTimeoutFallback() {
|
||||||
|
up := NewHTTPUpstream(nil)
|
||||||
|
svc, ok := up.(*httpUpstreamService)
|
||||||
|
require.True(s.T(), ok, "expected *httpUpstreamService")
|
||||||
|
entry := mustGetOrCreateClient(s.T(), svc, "", 0, 0)
|
||||||
|
transport, ok := entry.client.Transport.(*http.Transport)
|
||||||
|
require.True(s.T(), ok, "expected *http.Transport")
|
||||||
require.Equal(s.T(), 300*time.Second, transport.ResponseHeaderTimeout, "ResponseHeaderTimeout mismatch")
|
require.Equal(s.T(), 300*time.Second, transport.ResponseHeaderTimeout, "ResponseHeaderTimeout mismatch")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -65,10 +79,130 @@ func (s *HTTPUpstreamSuite) TestCustomResponseHeaderTimeout() {
|
|||||||
// 验证解析失败时拒绝回退到直连模式
|
// 验证解析失败时拒绝回退到直连模式
|
||||||
func (s *HTTPUpstreamSuite) TestGetOrCreateClient_InvalidURLReturnsError() {
|
func (s *HTTPUpstreamSuite) TestGetOrCreateClient_InvalidURLReturnsError() {
|
||||||
svc := s.newService()
|
svc := s.newService()
|
||||||
_, err := svc.getClientEntry("://bad-proxy-url", 1, 1, false, false)
|
_, err := svc.getClientEntry("://bad-proxy-url", 1, 1, service.HTTPUpstreamProfileDefault, false, false)
|
||||||
require.Error(s.T(), err, "expected error for invalid proxy URL")
|
require.Error(s.T(), err, "expected error for invalid proxy URL")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *HTTPUpstreamSuite) TestOpenAIProfileDefaultsToHTTP2AndNoHeaderTimeout() {
|
||||||
|
s.cfg.Gateway = config.GatewayConfig{
|
||||||
|
ResponseHeaderTimeout: 600,
|
||||||
|
OpenAIHTTP2: config.GatewayOpenAIHTTP2Config{
|
||||||
|
Enabled: true,
|
||||||
|
AllowProxyFallbackToHTTP1: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
svc := s.newService()
|
||||||
|
entry, err := svc.getClientEntry("", 1, 1, service.HTTPUpstreamProfileOpenAI, false, false)
|
||||||
|
require.NoError(s.T(), err)
|
||||||
|
transport, ok := entry.client.Transport.(*http.Transport)
|
||||||
|
require.True(s.T(), ok, "expected *http.Transport")
|
||||||
|
require.Equal(s.T(), time.Duration(0), transport.ResponseHeaderTimeout, "OpenAI profile should not inherit generic header timeout")
|
||||||
|
require.True(s.T(), transport.ForceAttemptHTTP2, "OpenAI profile should prefer HTTP/2")
|
||||||
|
require.Equal(s.T(), upstreamProtocolModeOpenAIH2, entry.protocolMode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *HTTPUpstreamSuite) TestOpenAIProfileCustomHeaderTimeout() {
|
||||||
|
s.cfg.Gateway = config.GatewayConfig{
|
||||||
|
ResponseHeaderTimeout: 600,
|
||||||
|
OpenAIResponseHeaderTimeout: 1800,
|
||||||
|
OpenAIHTTP2: config.GatewayOpenAIHTTP2Config{
|
||||||
|
Enabled: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
svc := s.newService()
|
||||||
|
entry, err := svc.getClientEntry("", 1, 1, service.HTTPUpstreamProfileOpenAI, false, false)
|
||||||
|
require.NoError(s.T(), err)
|
||||||
|
transport, ok := entry.client.Transport.(*http.Transport)
|
||||||
|
require.True(s.T(), ok, "expected *http.Transport")
|
||||||
|
require.Equal(s.T(), 1800*time.Second, transport.ResponseHeaderTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *HTTPUpstreamSuite) TestOpenAIProfileTLSFingerprintDoesNotInheritGenericHeaderTimeout() {
|
||||||
|
s.cfg.Gateway = config.GatewayConfig{
|
||||||
|
ResponseHeaderTimeout: 600,
|
||||||
|
OpenAIHTTP2: config.GatewayOpenAIHTTP2Config{
|
||||||
|
Enabled: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
svc := s.newService()
|
||||||
|
entry, err := svc.getClientEntryWithTLS("", 1, 1, &tlsfingerprint.Profile{Name: "test"}, service.HTTPUpstreamProfileOpenAI, false, false)
|
||||||
|
require.NoError(s.T(), err)
|
||||||
|
transport, ok := entry.client.Transport.(*http.Transport)
|
||||||
|
require.True(s.T(), ok, "expected *http.Transport")
|
||||||
|
require.Equal(s.T(), time.Duration(0), transport.ResponseHeaderTimeout, "OpenAI TLS path should not inherit generic header timeout")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *HTTPUpstreamSuite) TestOpenAIProfileHTTP2DisabledUsesHTTP1Transport() {
|
||||||
|
s.cfg.Gateway = config.GatewayConfig{
|
||||||
|
OpenAIHTTP2: config.GatewayOpenAIHTTP2Config{Enabled: false},
|
||||||
|
}
|
||||||
|
svc := s.newService()
|
||||||
|
entry, err := svc.getClientEntry("", 1, 1, service.HTTPUpstreamProfileOpenAI, false, false)
|
||||||
|
require.NoError(s.T(), err)
|
||||||
|
transport, ok := entry.client.Transport.(*http.Transport)
|
||||||
|
require.True(s.T(), ok, "expected *http.Transport")
|
||||||
|
require.False(s.T(), transport.ForceAttemptHTTP2, "OpenAI HTTP/2 disabled should not force H2")
|
||||||
|
require.NotNil(s.T(), transport.TLSNextProto, "HTTP/1 mode should disable automatic H2 negotiation")
|
||||||
|
require.Equal(s.T(), upstreamProtocolModeOpenAIH1, entry.protocolMode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *HTTPUpstreamSuite) TestOpenAIHeaderTimeoutChangeRebuildsClient() {
|
||||||
|
s.cfg.Gateway = config.GatewayConfig{
|
||||||
|
OpenAIHTTP2: config.GatewayOpenAIHTTP2Config{Enabled: true},
|
||||||
|
}
|
||||||
|
svc := s.newService()
|
||||||
|
entry1, err := svc.getClientEntry("", 1, 1, service.HTTPUpstreamProfileOpenAI, false, false)
|
||||||
|
require.NoError(s.T(), err)
|
||||||
|
|
||||||
|
s.cfg.Gateway.OpenAIResponseHeaderTimeout = 1800
|
||||||
|
entry2, err := svc.getClientEntry("", 1, 1, service.HTTPUpstreamProfileOpenAI, false, false)
|
||||||
|
require.NoError(s.T(), err)
|
||||||
|
require.NotSame(s.T(), entry1, entry2, "OpenAI header timeout changes must rebuild cached client")
|
||||||
|
transport, ok := entry2.client.Transport.(*http.Transport)
|
||||||
|
require.True(s.T(), ok, "expected *http.Transport")
|
||||||
|
require.Equal(s.T(), 1800*time.Second, transport.ResponseHeaderTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *HTTPUpstreamSuite) TestOpenAIHTTP2TimeoutDoesNotActivateProxyFallback() {
|
||||||
|
s.cfg.Gateway = config.GatewayConfig{
|
||||||
|
OpenAIHTTP2: config.GatewayOpenAIHTTP2Config{
|
||||||
|
Enabled: true,
|
||||||
|
AllowProxyFallbackToHTTP1: true,
|
||||||
|
FallbackErrorThreshold: 1,
|
||||||
|
FallbackWindowSeconds: 60,
|
||||||
|
FallbackTTLSeconds: 600,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
svc := s.newService()
|
||||||
|
proxyURL := "http://proxy.local:8080"
|
||||||
|
svc.recordOpenAIHTTP2Failure(service.HTTPUpstreamProfileOpenAI, upstreamProtocolModeOpenAIH2, proxyURL, errors.New("http2: timeout awaiting response headers"))
|
||||||
|
require.False(s.T(), svc.isOpenAIHTTP2FallbackActive(proxyURL), "header timeout should not be treated as H2 compatibility failure")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *HTTPUpstreamSuite) TestOpenAIHTTP2ProxyCompatibilityErrorActivatesFallback() {
|
||||||
|
s.cfg.Gateway = config.GatewayConfig{
|
||||||
|
OpenAIHTTP2: config.GatewayOpenAIHTTP2Config{
|
||||||
|
Enabled: true,
|
||||||
|
AllowProxyFallbackToHTTP1: true,
|
||||||
|
FallbackErrorThreshold: 1,
|
||||||
|
FallbackWindowSeconds: 60,
|
||||||
|
FallbackTTLSeconds: 600,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
svc := s.newService()
|
||||||
|
proxyURL := "http://proxy.local:8080"
|
||||||
|
svc.recordOpenAIHTTP2Failure(service.HTTPUpstreamProfileOpenAI, upstreamProtocolModeOpenAIH2, proxyURL, errors.New("http2: protocol error"))
|
||||||
|
require.True(s.T(), svc.isOpenAIHTTP2FallbackActive(proxyURL))
|
||||||
|
|
||||||
|
entry, err := svc.getClientEntry(proxyURL, 1, 1, service.HTTPUpstreamProfileOpenAI, false, false)
|
||||||
|
require.NoError(s.T(), err)
|
||||||
|
transport, ok := entry.client.Transport.(*http.Transport)
|
||||||
|
require.True(s.T(), ok, "expected *http.Transport")
|
||||||
|
require.False(s.T(), transport.ForceAttemptHTTP2)
|
||||||
|
require.NotNil(s.T(), transport.TLSNextProto)
|
||||||
|
require.Equal(s.T(), upstreamProtocolModeOpenAIH1Fallback, entry.protocolMode)
|
||||||
|
}
|
||||||
|
|
||||||
// TestNormalizeProxyURL_Canonicalizes 测试代理 URL 规范化
|
// TestNormalizeProxyURL_Canonicalizes 测试代理 URL 规范化
|
||||||
// 验证等价地址能够映射到同一缓存键
|
// 验证等价地址能够映射到同一缓存键
|
||||||
func (s *HTTPUpstreamSuite) TestNormalizeProxyURL_Canonicalizes() {
|
func (s *HTTPUpstreamSuite) TestNormalizeProxyURL_Canonicalizes() {
|
||||||
|
|||||||
@ -580,6 +580,7 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return s.sendErrorAndEnd(c, "Failed to create request")
|
return s.sendErrorAndEnd(c, "Failed to create request")
|
||||||
}
|
}
|
||||||
|
req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI))
|
||||||
|
|
||||||
// Set common headers
|
// Set common headers
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
@ -659,6 +660,7 @@ func (s *AccountTestService) testOpenAIChatCompletionsConnection(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return s.sendErrorAndEnd(c, "Failed to create Chat Completions request")
|
return s.sendErrorAndEnd(c, "Failed to create Chat Completions request")
|
||||||
}
|
}
|
||||||
|
req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI))
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
req.Header.Set("Accept", "text/event-stream")
|
req.Header.Set("Accept", "text/event-stream")
|
||||||
req.Header.Set("Authorization", "Bearer "+authToken)
|
req.Header.Set("Authorization", "Bearer "+authToken)
|
||||||
@ -739,6 +741,7 @@ func (s *AccountTestService) testOpenAICompactConnection(c *gin.Context, account
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return s.sendErrorAndEnd(c, "Failed to create request")
|
return s.sendErrorAndEnd(c, "Failed to create request")
|
||||||
}
|
}
|
||||||
|
req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI))
|
||||||
|
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
req.Header.Set("Accept", "application/json")
|
req.Header.Set("Accept", "application/json")
|
||||||
@ -1505,6 +1508,7 @@ func (s *AccountTestService) testOpenAIImageAPIKey(c *gin.Context, ctx context.C
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return s.sendErrorAndEnd(c, "Failed to create request")
|
return s.sendErrorAndEnd(c, "Failed to create request")
|
||||||
}
|
}
|
||||||
|
req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI))
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
req.Header.Set("Authorization", "Bearer "+authToken)
|
req.Header.Set("Authorization", "Bearer "+authToken)
|
||||||
|
|
||||||
@ -1593,6 +1597,7 @@ func (s *AccountTestService) testOpenAIImageOAuth(c *gin.Context, ctx context.Co
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return s.sendErrorAndEnd(c, "Failed to create request")
|
return s.sendErrorAndEnd(c, "Failed to create request")
|
||||||
}
|
}
|
||||||
|
req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI))
|
||||||
req.Host = "chatgpt.com"
|
req.Host = "chatgpt.com"
|
||||||
req.Header.Set("Authorization", "Bearer "+authToken)
|
req.Header.Set("Authorization", "Bearer "+authToken)
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|||||||
@ -57,6 +57,7 @@ func TestAccountTestService_TestAccountConnection_OpenAICompactOAuthSuccessPersi
|
|||||||
require.Equal(t, "application/json", upstream.lastReq.Header.Get("Accept"))
|
require.Equal(t, "application/json", upstream.lastReq.Header.Get("Accept"))
|
||||||
require.Equal(t, codexCLIVersion, upstream.lastReq.Header.Get("Version"))
|
require.Equal(t, codexCLIVersion, upstream.lastReq.Header.Get("Version"))
|
||||||
require.NotEmpty(t, upstream.lastReq.Header.Get("Session_Id"))
|
require.NotEmpty(t, upstream.lastReq.Header.Get("Session_Id"))
|
||||||
|
require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.lastReq.Context()))
|
||||||
require.Equal(t, codexCLIUserAgent, upstream.lastReq.Header.Get("User-Agent"))
|
require.Equal(t, codexCLIUserAgent, upstream.lastReq.Header.Get("User-Agent"))
|
||||||
require.Equal(t, "chatgpt-acc", upstream.lastReq.Header.Get("chatgpt-account-id"))
|
require.Equal(t, "chatgpt-acc", upstream.lastReq.Header.Get("chatgpt-account-id"))
|
||||||
require.Equal(t, "gpt-5.4", gjson.GetBytes(upstream.lastBody, "model").String())
|
require.Equal(t, "gpt-5.4", gjson.GetBytes(upstream.lastBody, "model").String())
|
||||||
|
|||||||
@ -45,6 +45,8 @@ func TestAccountTestService_OpenAIImageOAuthHandlesOutputItemDoneFallback(t *tes
|
|||||||
|
|
||||||
err := svc.testOpenAIImageOAuth(c, context.Background(), account, "gpt-image-2", "draw a cat")
|
err := svc.testOpenAIImageOAuth(c, context.Background(), account, "gpt-image-2", "draw a cat")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, upstream.lastReq)
|
||||||
|
require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.lastReq.Context()))
|
||||||
require.Contains(t, rec.Body.String(), "Calling Codex /responses image tool")
|
require.Contains(t, rec.Body.String(), "Calling Codex /responses image tool")
|
||||||
require.Contains(t, rec.Body.String(), "data:image/png;base64,aGVsbG8=")
|
require.Contains(t, rec.Body.String(), "data:image/png;base64,aGVsbG8=")
|
||||||
require.Contains(t, rec.Body.String(), "\"success\":true")
|
require.Contains(t, rec.Body.String(), "\"success\":true")
|
||||||
@ -83,6 +85,7 @@ func TestAccountTestService_OpenAIImageAPIKeyUsesConfiguredV1BaseURL(t *testing.
|
|||||||
err := svc.testOpenAIImageAPIKey(c, context.Background(), account, "gpt-image-2", "draw a cat")
|
err := svc.testOpenAIImageAPIKey(c, context.Background(), account, "gpt-image-2", "draw a cat")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, upstream.lastReq)
|
require.NotNil(t, upstream.lastReq)
|
||||||
|
require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.lastReq.Context()))
|
||||||
require.Equal(t, "https://image-upstream.example/v1/images/generations", upstream.lastReq.URL.String())
|
require.Equal(t, "https://image-upstream.example/v1/images/generations", upstream.lastReq.URL.String())
|
||||||
require.Equal(t, "Bearer test-api-key", upstream.lastReq.Header.Get("Authorization"))
|
require.Equal(t, "Bearer test-api-key", upstream.lastReq.Header.Get("Authorization"))
|
||||||
require.Contains(t, rec.Body.String(), "data:image/png;base64,aGVsbG8=")
|
require.Contains(t, rec.Body.String(), "data:image/png;base64,aGVsbG8=")
|
||||||
|
|||||||
@ -129,6 +129,8 @@ func TestAccountTestService_OpenAISuccessPersistsSnapshotFromHeaders(t *testing.
|
|||||||
|
|
||||||
err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "", "")
|
err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "", "")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.Len(t, upstream.requests, 1)
|
||||||
|
require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.requests[0].Context()))
|
||||||
require.NotEmpty(t, repo.updatedExtra)
|
require.NotEmpty(t, repo.updatedExtra)
|
||||||
require.Equal(t, 42.0, repo.updatedExtra["codex_5h_used_percent"])
|
require.Equal(t, 42.0, repo.updatedExtra["codex_5h_used_percent"])
|
||||||
require.Equal(t, 88.0, repo.updatedExtra["codex_7d_used_percent"])
|
require.Equal(t, 88.0, repo.updatedExtra["codex_7d_used_percent"])
|
||||||
@ -372,6 +374,7 @@ func TestAccountTestService_OpenAIAPIKeyResponsesUnsupportedUsesChatCompletionsP
|
|||||||
err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "hello", "")
|
err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "hello", "")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, upstream.lastReq)
|
require.NotNil(t, upstream.lastReq)
|
||||||
|
require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.lastReq.Context()))
|
||||||
require.Equal(t, "https://compat-upstream.example/v1/chat/completions", upstream.lastReq.URL.String())
|
require.Equal(t, "https://compat-upstream.example/v1/chat/completions", upstream.lastReq.URL.String())
|
||||||
require.Equal(t, "Bearer sk-test", upstream.lastReq.Header.Get("Authorization"))
|
require.Equal(t, "Bearer sk-test", upstream.lastReq.Header.Get("Authorization"))
|
||||||
require.Equal(t, "text/event-stream", upstream.lastReq.Header.Get("Accept"))
|
require.Equal(t, "text/event-stream", upstream.lastReq.Header.Get("Accept"))
|
||||||
|
|||||||
42
backend/internal/service/http_upstream_profile.go
Normal file
42
backend/internal/service/http_upstream_profile.go
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
// HTTPUpstreamProfile marks HTTP upstream requests that need provider-specific
|
||||||
|
// transport policy.
|
||||||
|
type HTTPUpstreamProfile string
|
||||||
|
|
||||||
|
const (
|
||||||
|
HTTPUpstreamProfileDefault HTTPUpstreamProfile = ""
|
||||||
|
HTTPUpstreamProfileOpenAI HTTPUpstreamProfile = "openai"
|
||||||
|
)
|
||||||
|
|
||||||
|
type httpUpstreamProfileContextKey struct{}
|
||||||
|
|
||||||
|
// WithHTTPUpstreamProfile injects an upstream transport profile into ctx.
|
||||||
|
func WithHTTPUpstreamProfile(ctx context.Context, profile HTTPUpstreamProfile) context.Context {
|
||||||
|
if ctx == nil {
|
||||||
|
ctx = context.Background()
|
||||||
|
}
|
||||||
|
if profile == HTTPUpstreamProfileDefault {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
return context.WithValue(ctx, httpUpstreamProfileContextKey{}, profile)
|
||||||
|
}
|
||||||
|
|
||||||
|
// HTTPUpstreamProfileFromContext resolves the upstream transport profile from ctx.
|
||||||
|
func HTTPUpstreamProfileFromContext(ctx context.Context) HTTPUpstreamProfile {
|
||||||
|
if ctx == nil {
|
||||||
|
return HTTPUpstreamProfileDefault
|
||||||
|
}
|
||||||
|
profile, ok := ctx.Value(httpUpstreamProfileContextKey{}).(HTTPUpstreamProfile)
|
||||||
|
if !ok {
|
||||||
|
return HTTPUpstreamProfileDefault
|
||||||
|
}
|
||||||
|
switch profile {
|
||||||
|
case HTTPUpstreamProfileOpenAI:
|
||||||
|
return profile
|
||||||
|
default:
|
||||||
|
return HTTPUpstreamProfileDefault
|
||||||
|
}
|
||||||
|
}
|
||||||
21
backend/internal/service/http_upstream_profile_test.go
Normal file
21
backend/internal/service/http_upstream_profile_test.go
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestWithHTTPUpstreamProfile_DefaultKeepsContext(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
got := WithHTTPUpstreamProfile(ctx, HTTPUpstreamProfileDefault)
|
||||||
|
if got != ctx {
|
||||||
|
t.Fatal("default profile should not wrap context")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWithHTTPUpstreamProfile_OpenAI(t *testing.T) {
|
||||||
|
ctx := WithHTTPUpstreamProfile(context.TODO(), HTTPUpstreamProfileOpenAI)
|
||||||
|
if profile := HTTPUpstreamProfileFromContext(ctx); profile != HTTPUpstreamProfileOpenAI {
|
||||||
|
t.Fatalf("expected profile %q, got %q", HTTPUpstreamProfileOpenAI, profile)
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -95,6 +95,7 @@ func (s *AccountTestService) ProbeOpenAIAPIKeyResponsesSupport(ctx context.Conte
|
|||||||
logger.LegacyPrintf("service.openai_probe", "probe_build_request_failed: account_id=%d err=%v", accountID, err)
|
logger.LegacyPrintf("service.openai_probe", "probe_build_request_failed: account_id=%d err=%v", accountID, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI))
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
req.Header.Set("Authorization", "Bearer "+apiKey)
|
req.Header.Set("Authorization", "Bearer "+apiKey)
|
||||||
req.Header.Set("Accept", "application/json")
|
req.Header.Set("Accept", "application/json")
|
||||||
|
|||||||
@ -135,6 +135,7 @@ func (s *OpenAIGatewayService) forwardAsRawChatCompletions(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("build upstream request: %w", err)
|
return nil, fmt.Errorf("build upstream request: %w", err)
|
||||||
}
|
}
|
||||||
|
upstreamReq = upstreamReq.WithContext(WithHTTPUpstreamProfile(upstreamReq.Context(), HTTPUpstreamProfileOpenAI))
|
||||||
upstreamReq.Header.Set("Content-Type", "application/json")
|
upstreamReq.Header.Set("Content-Type", "application/json")
|
||||||
upstreamReq.Header.Set("Authorization", "Bearer "+apiKey)
|
upstreamReq.Header.Set("Authorization", "Bearer "+apiKey)
|
||||||
if clientStream {
|
if clientStream {
|
||||||
|
|||||||
@ -116,6 +116,7 @@ func TestForwardAsRawChatCompletions_ForcesStreamUsageUpstreamAndPassesUsageDown
|
|||||||
require.Equal(t, 3, result.Usage.CacheReadInputTokens)
|
require.Equal(t, 3, result.Usage.CacheReadInputTokens)
|
||||||
require.NotNil(t, upstream.lastReq)
|
require.NotNil(t, upstream.lastReq)
|
||||||
require.NoError(t, upstream.lastReq.Context().Err())
|
require.NoError(t, upstream.lastReq.Context().Err())
|
||||||
|
require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.lastReq.Context()))
|
||||||
require.True(t, gjson.GetBytes(upstream.lastBody, "stream_options.include_usage").Bool())
|
require.True(t, gjson.GetBytes(upstream.lastBody, "stream_options.include_usage").Bool())
|
||||||
require.Contains(t, rec.Body.String(), `"usage"`)
|
require.Contains(t, rec.Body.String(), `"usage"`)
|
||||||
require.Contains(t, rec.Body.String(), "data: [DONE]")
|
require.Contains(t, rec.Body.String(), "data: [DONE]")
|
||||||
|
|||||||
@ -116,6 +116,7 @@ func (s *OpenAIGatewayService) forwardResponsesViaRawChatCompletions(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("build upstream request: %w", err)
|
return nil, fmt.Errorf("build upstream request: %w", err)
|
||||||
}
|
}
|
||||||
|
upstreamReq = upstreamReq.WithContext(WithHTTPUpstreamProfile(upstreamReq.Context(), HTTPUpstreamProfileOpenAI))
|
||||||
upstreamReq.Header.Set("Content-Type", "application/json")
|
upstreamReq.Header.Set("Content-Type", "application/json")
|
||||||
upstreamReq.Header.Set("Authorization", "Bearer "+apiKey)
|
upstreamReq.Header.Set("Authorization", "Bearer "+apiKey)
|
||||||
if clientStream {
|
if clientStream {
|
||||||
|
|||||||
@ -42,6 +42,7 @@ func TestForwardResponses_ForceChatCompletionsRoutesNonStreamingToChatCompletion
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, result)
|
require.NotNil(t, result)
|
||||||
require.Equal(t, "http://upstream.example/v1/chat/completions", upstream.lastReq.URL.String())
|
require.Equal(t, "http://upstream.example/v1/chat/completions", upstream.lastReq.URL.String())
|
||||||
|
require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.lastReq.Context()))
|
||||||
require.Equal(t, "hello", gjson.GetBytes(upstream.lastBody, "messages.0.content").String())
|
require.Equal(t, "hello", gjson.GetBytes(upstream.lastBody, "messages.0.content").String())
|
||||||
require.False(t, gjson.GetBytes(upstream.lastBody, "input").Exists())
|
require.False(t, gjson.GetBytes(upstream.lastBody, "input").Exists())
|
||||||
require.Equal(t, "response", gjson.Get(rec.Body.String(), "object").String())
|
require.Equal(t, "response", gjson.Get(rec.Body.String(), "object").String())
|
||||||
|
|||||||
@ -3229,6 +3229,7 @@ func (s *OpenAIGatewayService) buildUpstreamRequestOpenAIPassthrough(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI))
|
||||||
|
|
||||||
// 透传客户端请求头(安全白名单)。
|
// 透传客户端请求头(安全白名单)。
|
||||||
allowTimeoutHeaders := s.isOpenAIPassthroughTimeoutHeadersAllowed()
|
allowTimeoutHeaders := s.isOpenAIPassthroughTimeoutHeadersAllowed()
|
||||||
@ -3951,6 +3952,7 @@ func (s *OpenAIGatewayService) buildUpstreamRequest(ctx context.Context, c *gin.
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI))
|
||||||
|
|
||||||
// Set authentication header
|
// Set authentication header
|
||||||
req.Header.Set("authorization", "Bearer "+token)
|
req.Header.Set("authorization", "Bearer "+token)
|
||||||
|
|||||||
@ -1841,6 +1841,7 @@ func TestOpenAIBuildUpstreamRequestOpenAIPassthroughPreservesCompactPath(t *test
|
|||||||
require.Equal(t, "application/json", req.Header.Get("Accept"))
|
require.Equal(t, "application/json", req.Header.Get("Accept"))
|
||||||
require.Equal(t, codexCLIVersion, req.Header.Get("Version"))
|
require.Equal(t, codexCLIVersion, req.Header.Get("Version"))
|
||||||
require.NotEmpty(t, req.Header.Get("Session_Id"))
|
require.NotEmpty(t, req.Header.Get("Session_Id"))
|
||||||
|
require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(req.Context()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOpenAIBuildUpstreamRequestCompactForcesJSONAcceptForOAuth(t *testing.T) {
|
func TestOpenAIBuildUpstreamRequestCompactForcesJSONAcceptForOAuth(t *testing.T) {
|
||||||
@ -1861,6 +1862,7 @@ func TestOpenAIBuildUpstreamRequestCompactForcesJSONAcceptForOAuth(t *testing.T)
|
|||||||
require.Equal(t, "application/json", req.Header.Get("Accept"))
|
require.Equal(t, "application/json", req.Header.Get("Accept"))
|
||||||
require.Equal(t, codexCLIVersion, req.Header.Get("Version"))
|
require.Equal(t, codexCLIVersion, req.Header.Get("Version"))
|
||||||
require.NotEmpty(t, req.Header.Get("Session_Id"))
|
require.NotEmpty(t, req.Header.Get("Session_Id"))
|
||||||
|
require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(req.Context()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOpenAIBuildUpstreamRequestOAuthMessagesBridgeUsesSessionOnly(t *testing.T) {
|
func TestOpenAIBuildUpstreamRequestOAuthMessagesBridgeUsesSessionOnly(t *testing.T) {
|
||||||
|
|||||||
@ -743,6 +743,7 @@ func (s *OpenAIGatewayService) buildOpenAIImagesRequest(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI))
|
||||||
req.Header.Set("Authorization", "Bearer "+token)
|
req.Header.Set("Authorization", "Bearer "+token)
|
||||||
for key, values := range c.Request.Header {
|
for key, values := range c.Request.Header {
|
||||||
if !openaiPassthroughAllowedHeaders[strings.ToLower(key)] {
|
if !openaiPassthroughAllowedHeaders[strings.ToLower(key)] {
|
||||||
|
|||||||
@ -528,6 +528,7 @@ func TestOpenAIGatewayServiceForwardImages_OAuthPassesNAndReturnsAllImages(t *te
|
|||||||
require.NotNil(t, upstream.lastReq)
|
require.NotNil(t, upstream.lastReq)
|
||||||
require.Equal(t, chatgptCodexURL, upstream.lastReq.URL.String())
|
require.Equal(t, chatgptCodexURL, upstream.lastReq.URL.String())
|
||||||
require.Equal(t, "chatgpt.com", upstream.lastReq.Host)
|
require.Equal(t, "chatgpt.com", upstream.lastReq.Host)
|
||||||
|
require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.lastReq.Context()))
|
||||||
require.Equal(t, "application/json", upstream.lastReq.Header.Get("Content-Type"))
|
require.Equal(t, "application/json", upstream.lastReq.Header.Get("Content-Type"))
|
||||||
require.Equal(t, "text/event-stream", upstream.lastReq.Header.Get("Accept"))
|
require.Equal(t, "text/event-stream", upstream.lastReq.Header.Get("Accept"))
|
||||||
require.Equal(t, "acct-123", upstream.lastReq.Header.Get("chatgpt-account-id"))
|
require.Equal(t, "acct-123", upstream.lastReq.Header.Get("chatgpt-account-id"))
|
||||||
|
|||||||
@ -251,6 +251,14 @@ RATE_LIMIT_OVERLOAD_COOLDOWN_MINUTES=10
|
|||||||
#
|
#
|
||||||
# 默认:false
|
# 默认:false
|
||||||
GATEWAY_FORCE_CODEX_CLI=false
|
GATEWAY_FORCE_CODEX_CLI=false
|
||||||
|
# OpenAI/Codex 等待上游响应头超时(秒);0 表示不使用本地响应头超时截断。
|
||||||
|
GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT=0
|
||||||
|
# OpenAI HTTP 上游默认启用 HTTP/2;如需紧急回滚可设为 false。
|
||||||
|
GATEWAY_OPENAI_HTTP2_ENABLED=true
|
||||||
|
GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1=true
|
||||||
|
GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD=2
|
||||||
|
GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS=60
|
||||||
|
GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS=600
|
||||||
# 上游连接池:每主机最大连接数(默认 1024;流式/HTTP1.1 场景可调大,如 2400/4096)
|
# 上游连接池:每主机最大连接数(默认 1024;流式/HTTP1.1 场景可调大,如 2400/4096)
|
||||||
GATEWAY_MAX_CONNS_PER_HOST=2048
|
GATEWAY_MAX_CONNS_PER_HOST=2048
|
||||||
# 上游连接池:最大空闲连接总数(默认 2560;账号/代理隔离 + 高并发场景可调大)
|
# 上游连接池:最大空闲连接总数(默认 2560;账号/代理隔离 + 高并发场景可调大)
|
||||||
|
|||||||
@ -149,6 +149,9 @@ gateway:
|
|||||||
# Timeout for waiting upstream response headers (seconds)
|
# Timeout for waiting upstream response headers (seconds)
|
||||||
# 等待上游响应头超时时间(秒)
|
# 等待上游响应头超时时间(秒)
|
||||||
response_header_timeout: 600
|
response_header_timeout: 600
|
||||||
|
# OpenAI/Codex upstream response header timeout (seconds, 0=disabled)
|
||||||
|
# OpenAI/Codex 等待上游响应头超时时间(秒,0=禁用本地响应头超时)
|
||||||
|
openai_response_header_timeout: 0
|
||||||
# Max request body size in bytes (default: 256MB)
|
# Max request body size in bytes (default: 256MB)
|
||||||
# 请求体最大字节数(默认 256MB)
|
# 请求体最大字节数(默认 256MB)
|
||||||
max_body_size: 268435456
|
max_body_size: 268435456
|
||||||
@ -317,6 +320,14 @@ gateway:
|
|||||||
queue: 0.7
|
queue: 0.7
|
||||||
error_rate: 0.8
|
error_rate: 0.8
|
||||||
ttft: 0.5
|
ttft: 0.5
|
||||||
|
# OpenAI HTTP upstream protocol strategy.
|
||||||
|
# OpenAI HTTP 上游协议策略(默认 HTTP/2;代理明确不兼容时可临时回退 HTTP/1.1)。
|
||||||
|
openai_http2:
|
||||||
|
enabled: true
|
||||||
|
allow_proxy_fallback_to_http1: true
|
||||||
|
fallback_error_threshold: 2
|
||||||
|
fallback_window_seconds: 60
|
||||||
|
fallback_ttl_seconds: 600
|
||||||
# HTTP upstream connection pool settings (HTTP/2 + multi-proxy scenario defaults)
|
# HTTP upstream connection pool settings (HTTP/2 + multi-proxy scenario defaults)
|
||||||
# HTTP 上游连接池配置(HTTP/2 + 多代理场景默认值)
|
# HTTP 上游连接池配置(HTTP/2 + 多代理场景默认值)
|
||||||
# Max idle connections across all hosts
|
# Max idle connections across all hosts
|
||||||
|
|||||||
@ -40,6 +40,13 @@ services:
|
|||||||
- JWT_SECRET=${JWT_SECRET:-}
|
- JWT_SECRET=${JWT_SECRET:-}
|
||||||
- TOTP_ENCRYPTION_KEY=${TOTP_ENCRYPTION_KEY:-}
|
- TOTP_ENCRYPTION_KEY=${TOTP_ENCRYPTION_KEY:-}
|
||||||
- TZ=${TZ:-Asia/Shanghai}
|
- TZ=${TZ:-Asia/Shanghai}
|
||||||
|
# OpenAI HTTP upstream protocol/timeout
|
||||||
|
- GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT=${GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT:-0}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_ENABLED=${GATEWAY_OPENAI_HTTP2_ENABLED:-true}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1=${GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1:-true}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD=${GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD:-2}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS:-60}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS:-600}
|
||||||
- GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT=${GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT:-900}
|
- GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT=${GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT:-900}
|
||||||
- GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL=${GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL:-10}
|
- GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL=${GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL:-10}
|
||||||
- GATEWAY_IMAGE_CONCURRENCY_ENABLED=${GATEWAY_IMAGE_CONCURRENCY_ENABLED:-false}
|
- GATEWAY_IMAGE_CONCURRENCY_ENABLED=${GATEWAY_IMAGE_CONCURRENCY_ENABLED:-false}
|
||||||
|
|||||||
@ -151,6 +151,13 @@ services:
|
|||||||
# =======================================================================
|
# =======================================================================
|
||||||
# Image Generation Stream & Concurrency
|
# Image Generation Stream & Concurrency
|
||||||
# =======================================================================
|
# =======================================================================
|
||||||
|
# OpenAI HTTP upstream protocol/timeout
|
||||||
|
- GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT=${GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT:-0}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_ENABLED=${GATEWAY_OPENAI_HTTP2_ENABLED:-true}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1=${GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1:-true}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD=${GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD:-2}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS:-60}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS:-600}
|
||||||
- GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT=${GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT:-900}
|
- GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT=${GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT:-900}
|
||||||
- GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL=${GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL:-10}
|
- GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL=${GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL:-10}
|
||||||
- GATEWAY_IMAGE_CONCURRENCY_ENABLED=${GATEWAY_IMAGE_CONCURRENCY_ENABLED:-false}
|
- GATEWAY_IMAGE_CONCURRENCY_ENABLED=${GATEWAY_IMAGE_CONCURRENCY_ENABLED:-false}
|
||||||
|
|||||||
@ -98,6 +98,13 @@ services:
|
|||||||
# =======================================================================
|
# =======================================================================
|
||||||
# Image Generation Stream & Concurrency
|
# Image Generation Stream & Concurrency
|
||||||
# =======================================================================
|
# =======================================================================
|
||||||
|
# OpenAI HTTP upstream protocol/timeout
|
||||||
|
- GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT=${GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT:-0}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_ENABLED=${GATEWAY_OPENAI_HTTP2_ENABLED:-true}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1=${GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1:-true}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD=${GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD:-2}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS:-60}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS:-600}
|
||||||
- GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT=${GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT:-900}
|
- GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT=${GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT:-900}
|
||||||
- GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL=${GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL:-10}
|
- GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL=${GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL:-10}
|
||||||
- GATEWAY_IMAGE_CONCURRENCY_ENABLED=${GATEWAY_IMAGE_CONCURRENCY_ENABLED:-false}
|
- GATEWAY_IMAGE_CONCURRENCY_ENABLED=${GATEWAY_IMAGE_CONCURRENCY_ENABLED:-false}
|
||||||
|
|||||||
@ -147,6 +147,13 @@ services:
|
|||||||
# =======================================================================
|
# =======================================================================
|
||||||
# Image Generation Stream & Concurrency
|
# Image Generation Stream & Concurrency
|
||||||
# =======================================================================
|
# =======================================================================
|
||||||
|
# OpenAI HTTP upstream protocol/timeout
|
||||||
|
- GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT=${GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT:-0}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_ENABLED=${GATEWAY_OPENAI_HTTP2_ENABLED:-true}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1=${GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1:-true}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD=${GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD:-2}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS:-60}
|
||||||
|
- GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS:-600}
|
||||||
- GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT=${GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT:-900}
|
- GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT=${GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT:-900}
|
||||||
- GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL=${GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL:-10}
|
- GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL=${GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL:-10}
|
||||||
- GATEWAY_IMAGE_CONCURRENCY_ENABLED=${GATEWAY_IMAGE_CONCURRENCY_ENABLED:-false}
|
- GATEWAY_IMAGE_CONCURRENCY_ENABLED=${GATEWAY_IMAGE_CONCURRENCY_ENABLED:-false}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user