From 33ac8eb27d799c85a9a8c86849d069c13a212b75 Mon Sep 17 00:00:00 2001 From: mt21625457 <32916545+mt21625457@users.noreply.github.com> Date: Tue, 26 May 2026 13:52:38 +0800 Subject: [PATCH] fix openai http2 response header timeout --- backend/internal/config/config.go | 42 ++ backend/internal/config/config_test.go | 60 +++ backend/internal/repository/http_upstream.go | 389 ++++++++++++++++-- .../http_upstream_benchmark_test.go | 2 +- .../internal/repository/http_upstream_test.go | 138 ++++++- .../internal/service/account_test_service.go | 5 + ...ccount_test_service_openai_compact_test.go | 1 + .../account_test_service_openai_image_test.go | 3 + .../account_test_service_openai_test.go | 3 + .../internal/service/http_upstream_profile.go | 42 ++ .../service/http_upstream_profile_test.go | 21 + .../service/openai_apikey_responses_probe.go | 1 + .../openai_gateway_chat_completions_raw.go | 1 + ...penai_gateway_chat_completions_raw_test.go | 1 + .../openai_gateway_responses_chat_fallback.go | 1 + ...ai_gateway_responses_chat_fallback_test.go | 1 + .../service/openai_gateway_service.go | 2 + .../service/openai_gateway_service_test.go | 2 + backend/internal/service/openai_images.go | 1 + .../internal/service/openai_images_test.go | 1 + deploy/.env.example | 8 + deploy/config.example.yaml | 11 + deploy/docker-compose.dev.yml | 7 + deploy/docker-compose.local.yml | 7 + deploy/docker-compose.standalone.yml | 7 + deploy/docker-compose.yml | 7 + 26 files changed, 717 insertions(+), 47 deletions(-) create mode 100644 backend/internal/service/http_upstream_profile.go create mode 100644 backend/internal/service/http_upstream_profile_test.go diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 661e3296..dd0382c2 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -680,6 +680,9 @@ type GatewayConfig struct { // 等待上游响应头的超时时间(秒),0表示无超时 // 注意:这不影响流式数据传输,只控制等待响应头的时间 ResponseHeaderTimeout int `mapstructure:"response_header_timeout"` + // OpenAIResponseHeaderTimeout: OpenAI/Codex 上游等待响应头的超时时间(秒),0表示无超时 + // OpenAI/Codex 请求可能在上游排队较久;默认不使用通用响应头超时截断。 + OpenAIResponseHeaderTimeout int `mapstructure:"openai_response_header_timeout"` // 请求体最大字节数,用于网关请求体大小限制 MaxBodySize int64 `mapstructure:"max_body_size"` // 非流式上游响应体读取上限(字节),用于防止无界读取导致内存放大 @@ -707,6 +710,8 @@ type GatewayConfig struct { OpenAIPassthroughAllowTimeoutHeaders bool `mapstructure:"openai_passthrough_allow_timeout_headers"` // OpenAIWS: OpenAI Responses WebSocket 配置(默认开启,可按需回滚到 HTTP) OpenAIWS GatewayOpenAIWSConfig `mapstructure:"openai_ws"` + // OpenAIHTTP2: OpenAI HTTP 上游协议策略(默认启用 HTTP/2,可按代理能力回退 HTTP/1.1) + OpenAIHTTP2 GatewayOpenAIHTTP2Config `mapstructure:"openai_http2"` // ImageConcurrency: 图片生成独立并发限制配置(默认关闭) ImageConcurrency ImageConcurrencyConfig `mapstructure:"image_concurrency"` @@ -785,6 +790,21 @@ type GatewayConfig struct { UserMessageQueue UserMessageQueueConfig `mapstructure:"user_message_queue"` } +// GatewayOpenAIHTTP2Config OpenAI HTTP 上游协议配置。 +// 默认启用 HTTP/2;在部分代理不兼容时按策略回退 HTTP/1.1。 +type GatewayOpenAIHTTP2Config struct { + // Enabled: 是否启用 OpenAI HTTP/2 优先策略 + Enabled bool `mapstructure:"enabled"` + // AllowProxyFallbackToHTTP1: HTTP/HTTPS 代理出现明确 H2 兼容错误时,临时回退 HTTP/1.1 + AllowProxyFallbackToHTTP1 bool `mapstructure:"allow_proxy_fallback_to_http1"` + // FallbackErrorThreshold: 回退窗口内累计多少次兼容错误后触发回退 + FallbackErrorThreshold int `mapstructure:"fallback_error_threshold"` + // FallbackWindowSeconds: 统计兼容错误的时间窗口(秒) + FallbackWindowSeconds int `mapstructure:"fallback_window_seconds"` + // FallbackTTLSeconds: 触发后回退 HTTP/1.1 的持续时间(秒) + FallbackTTLSeconds int `mapstructure:"fallback_ttl_seconds"` +} + // UserMessageQueueConfig 用户消息串行队列配置 // 用于 Anthropic OAuth/SetupToken 账号的用户消息串行化发送 type UserMessageQueueConfig struct { @@ -1743,6 +1763,7 @@ func setDefaults() { // Gateway viper.SetDefault("gateway.response_header_timeout", 600) // 600秒(10分钟)等待上游响应头,LLM高负载时可能排队较久 + viper.SetDefault("gateway.openai_response_header_timeout", 0) viper.SetDefault("gateway.log_upstream_error_body", true) viper.SetDefault("gateway.log_upstream_error_body_max_bytes", 2048) viper.SetDefault("gateway.inject_beta_for_apikey", false) @@ -1798,6 +1819,12 @@ func setDefaults() { viper.SetDefault("gateway.openai_ws.scheduler_score_weights.queue", 0.7) viper.SetDefault("gateway.openai_ws.scheduler_score_weights.error_rate", 0.8) viper.SetDefault("gateway.openai_ws.scheduler_score_weights.ttft", 0.5) + // OpenAI HTTP upstream protocol strategy + viper.SetDefault("gateway.openai_http2.enabled", true) + viper.SetDefault("gateway.openai_http2.allow_proxy_fallback_to_http1", true) + viper.SetDefault("gateway.openai_http2.fallback_error_threshold", 2) + viper.SetDefault("gateway.openai_http2.fallback_window_seconds", 60) + viper.SetDefault("gateway.openai_http2.fallback_ttl_seconds", 600) viper.SetDefault("gateway.image_concurrency.enabled", false) viper.SetDefault("gateway.image_concurrency.max_concurrent_requests", 0) viper.SetDefault("gateway.image_concurrency.overflow_mode", ImageConcurrencyOverflowModeReject) @@ -2365,6 +2392,12 @@ func (c *Config) Validate() error { if c.Gateway.ProxyProbeResponseReadMaxBytes <= 0 { return fmt.Errorf("gateway.proxy_probe_response_read_max_bytes must be positive") } + if c.Gateway.ResponseHeaderTimeout < 0 { + return fmt.Errorf("gateway.response_header_timeout must be non-negative") + } + if c.Gateway.OpenAIResponseHeaderTimeout < 0 { + return fmt.Errorf("gateway.openai_response_header_timeout must be non-negative") + } if strings.TrimSpace(c.Gateway.ConnectionPoolIsolation) != "" { switch c.Gateway.ConnectionPoolIsolation { case ConnectionPoolIsolationProxy, ConnectionPoolIsolationAccount, ConnectionPoolIsolationAccountProxy: @@ -2539,6 +2572,15 @@ func (c *Config) Validate() error { if c.Gateway.OpenAIWS.StickyPreviousResponseTTLSeconds < 0 { return fmt.Errorf("gateway.openai_ws.sticky_previous_response_ttl_seconds must be non-negative") } + if c.Gateway.OpenAIHTTP2.FallbackErrorThreshold < 0 { + return fmt.Errorf("gateway.openai_http2.fallback_error_threshold must be non-negative") + } + if c.Gateway.OpenAIHTTP2.FallbackWindowSeconds < 0 { + return fmt.Errorf("gateway.openai_http2.fallback_window_seconds must be non-negative") + } + if c.Gateway.OpenAIHTTP2.FallbackTTLSeconds < 0 { + return fmt.Errorf("gateway.openai_http2.fallback_ttl_seconds must be non-negative") + } if c.Gateway.OpenAIWS.SchedulerScoreWeights.Priority < 0 || c.Gateway.OpenAIWS.SchedulerScoreWeights.Load < 0 || c.Gateway.OpenAIWS.SchedulerScoreWeights.Queue < 0 || diff --git a/backend/internal/config/config_test.go b/backend/internal/config/config_test.go index 99fec46c..1eae5ed9 100644 --- a/backend/internal/config/config_test.go +++ b/backend/internal/config/config_test.go @@ -163,6 +163,41 @@ func TestLoadDefaultOpenAIWSConfig(t *testing.T) { } } +func TestLoadDefaultOpenAIHTTP2Enabled(t *testing.T) { + resetViperWithJWTSecret(t) + + cfg, err := Load() + require.NoError(t, err) + require.True(t, cfg.Gateway.OpenAIHTTP2.Enabled) + require.True(t, cfg.Gateway.OpenAIHTTP2.AllowProxyFallbackToHTTP1) +} + +func TestLoadOpenAIHTTP2DisabledFromEnv(t *testing.T) { + resetViperWithJWTSecret(t) + t.Setenv("GATEWAY_OPENAI_HTTP2_ENABLED", "false") + + cfg, err := Load() + require.NoError(t, err) + require.False(t, cfg.Gateway.OpenAIHTTP2.Enabled) +} + +func TestLoadDefaultOpenAIResponseHeaderTimeoutUnlimited(t *testing.T) { + resetViperWithJWTSecret(t) + + cfg, err := Load() + require.NoError(t, err) + require.Equal(t, 0, cfg.Gateway.OpenAIResponseHeaderTimeout) +} + +func TestLoadOpenAIResponseHeaderTimeoutFromEnv(t *testing.T) { + resetViperWithJWTSecret(t) + t.Setenv("GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT", "1800") + + cfg, err := Load() + require.NoError(t, err) + require.Equal(t, 1800, cfg.Gateway.OpenAIResponseHeaderTimeout) +} + func TestLoadOpenAIWSStickyTTLCompatibility(t *testing.T) { resetViperWithJWTSecret(t) t.Setenv("GATEWAY_OPENAI_WS_STICKY_RESPONSE_ID_TTL_SECONDS", "0") @@ -1220,6 +1255,16 @@ func TestValidateConfigErrors(t *testing.T) { mutate: func(c *Config) { c.Gateway.MaxBodySize = 0 }, wantErr: "gateway.max_body_size", }, + { + name: "gateway response header timeout", + mutate: func(c *Config) { c.Gateway.ResponseHeaderTimeout = -1 }, + wantErr: "gateway.response_header_timeout", + }, + { + name: "gateway openai response header timeout", + mutate: func(c *Config) { c.Gateway.OpenAIResponseHeaderTimeout = -1 }, + wantErr: "gateway.openai_response_header_timeout", + }, { name: "gateway max idle conns", mutate: func(c *Config) { c.Gateway.MaxIdleConns = 0 }, @@ -1275,6 +1320,21 @@ func TestValidateConfigErrors(t *testing.T) { mutate: func(c *Config) { c.Gateway.OpenAIWS.APIKeyMaxConnsFactor = 0 }, wantErr: "gateway.openai_ws.apikey_max_conns_factor", }, + { + name: "gateway openai http2 fallback threshold", + mutate: func(c *Config) { c.Gateway.OpenAIHTTP2.FallbackErrorThreshold = -1 }, + wantErr: "gateway.openai_http2.fallback_error_threshold", + }, + { + name: "gateway openai http2 fallback window", + mutate: func(c *Config) { c.Gateway.OpenAIHTTP2.FallbackWindowSeconds = -1 }, + wantErr: "gateway.openai_http2.fallback_window_seconds", + }, + { + name: "gateway openai http2 fallback ttl", + mutate: func(c *Config) { c.Gateway.OpenAIHTTP2.FallbackTTLSeconds = -1 }, + wantErr: "gateway.openai_http2.fallback_ttl_seconds", + }, { name: "gateway stream data interval range", mutate: func(c *Config) { c.Gateway.StreamDataIntervalTimeout = 5 }, diff --git a/backend/internal/repository/http_upstream.go b/backend/internal/repository/http_upstream.go index 4309e997..476da3ae 100644 --- a/backend/internal/repository/http_upstream.go +++ b/backend/internal/repository/http_upstream.go @@ -3,6 +3,8 @@ package repository import ( "compress/flate" "compress/gzip" + "context" + "crypto/tls" "errors" "fmt" "io" @@ -49,6 +51,17 @@ const ( defaultMaxUpstreamClients = 5000 // defaultClientIdleTTLSeconds: 默认客户端空闲回收阈值(15分钟) defaultClientIdleTTLSeconds = 900 + // OpenAI HTTP/2 代理回退策略默认值 + defaultOpenAIHTTP2FallbackErrorThreshold = 2 + defaultOpenAIHTTP2FallbackWindow = 60 * time.Second + defaultOpenAIHTTP2FallbackTTL = 10 * time.Minute +) + +const ( + upstreamProtocolModeDefault = "default" + upstreamProtocolModeOpenAIH1 = "openai_h1" + upstreamProtocolModeOpenAIH2 = "openai_h2" + upstreamProtocolModeOpenAIH1Fallback = "openai_h1_fallback" ) var errUpstreamClientLimitReached = errors.New("upstream client cache limit reached") @@ -63,14 +76,30 @@ type poolSettings struct { responseHeaderTimeout time.Duration // 等待响应头超时时间 } +type openAIHTTP2Settings struct { + enabled bool + allowProxyFallbackToHTTP1 bool + fallbackErrorThreshold int + fallbackWindow time.Duration + fallbackTTL time.Duration +} + // upstreamClientEntry 上游客户端缓存条目 // 记录客户端实例及其元数据,用于连接池管理和淘汰策略 type upstreamClientEntry struct { - client *http.Client // HTTP 客户端实例 - proxyKey string // 代理标识(用于检测代理变更) - poolKey string // 连接池配置标识(用于检测配置变更) - lastUsed int64 // 最后使用时间戳(纳秒),用于 LRU 淘汰 - inFlight int64 // 当前进行中的请求数,>0 时不可淘汰 + client *http.Client // HTTP 客户端实例 + proxyKey string // 代理标识(用于检测代理变更) + poolKey string // 连接池配置标识(用于检测配置变更) + protocolMode string // 协议模式(default/openai_h1/openai_h2/openai_h1_fallback) + lastUsed int64 // 最后使用时间戳(纳秒),用于 LRU 淘汰 + inFlight int64 // 当前进行中的请求数,>0 时不可淘汰 +} + +type openAIHTTP2FallbackState struct { + mu sync.Mutex + windowStart time.Time + errorCount int + fallbackUntil time.Time } // httpUpstreamService 通用 HTTP 上游服务 @@ -94,6 +123,8 @@ type httpUpstreamService struct { cfg *config.Config // 全局配置 mu sync.RWMutex // 保护 clients map 的读写锁 clients map[string]*upstreamClientEntry // 客户端缓存池,key 由隔离策略决定 + // OpenAI 走 HTTP/HTTPS 代理时的 H2->H1 回退状态(key=标准化 proxyKey) + openAIHTTP2Fallbacks sync.Map } // NewHTTPUpstream 创建通用 HTTP 上游服务 @@ -131,9 +162,13 @@ func (s *httpUpstreamService) Do(req *http.Request, proxyURL string, accountID i if err := s.validateRequestHost(req); err != nil { return nil, err } + profile := service.HTTPUpstreamProfileDefault + if req != nil { + profile = service.HTTPUpstreamProfileFromContext(req.Context()) + } // 获取或创建对应的客户端,并标记请求占用 - entry, err := s.acquireClient(proxyURL, accountID, accountConcurrency) + entry, err := s.acquireClientWithProfile(proxyURL, accountID, accountConcurrency, profile) if err != nil { return nil, err } @@ -141,11 +176,13 @@ func (s *httpUpstreamService) Do(req *http.Request, proxyURL string, accountID i // 执行请求 resp, err := entry.client.Do(req) if err != nil { + s.recordOpenAIHTTP2Failure(profile, entry.protocolMode, entry.proxyKey, err) // 请求失败,立即减少计数 atomic.AddInt64(&entry.inFlight, -1) atomic.StoreInt64(&entry.lastUsed, time.Now().UnixNano()) return nil, err } + s.recordOpenAIHTTP2Success(profile, entry.protocolMode, entry.proxyKey) // 如果上游返回了压缩内容,解压后再交给业务层 decompressResponseBody(resp) @@ -168,6 +205,10 @@ func (s *httpUpstreamService) DoWithTLS(req *http.Request, proxyURL string, acco if profile == nil { return s.Do(req, proxyURL, accountID, accountConcurrency) } + upstreamProfile := service.HTTPUpstreamProfileDefault + if req != nil { + upstreamProfile = service.HTTPUpstreamProfileFromContext(req.Context()) + } targetHost := "" if req != nil && req.URL != nil { @@ -183,7 +224,7 @@ func (s *httpUpstreamService) DoWithTLS(req *http.Request, proxyURL string, acco return nil, err } - entry, err := s.acquireClientWithTLS(proxyURL, accountID, accountConcurrency, profile) + entry, err := s.acquireClientWithTLS(proxyURL, accountID, accountConcurrency, profile, upstreamProfile) if err != nil { slog.Debug("tls_fingerprint_acquire_client_failed", "account_id", accountID, "error", err) return nil, err @@ -208,21 +249,23 @@ func (s *httpUpstreamService) DoWithTLS(req *http.Request, proxyURL string, acco } // acquireClientWithTLS 获取或创建带 TLS 指纹的客户端 -func (s *httpUpstreamService) acquireClientWithTLS(proxyURL string, accountID int64, accountConcurrency int, profile *tlsfingerprint.Profile) (*upstreamClientEntry, error) { - return s.getClientEntryWithTLS(proxyURL, accountID, accountConcurrency, profile, true, true) +func (s *httpUpstreamService) acquireClientWithTLS(proxyURL string, accountID int64, accountConcurrency int, profile *tlsfingerprint.Profile, upstreamProfile service.HTTPUpstreamProfile) (*upstreamClientEntry, error) { + return s.getClientEntryWithTLS(proxyURL, accountID, accountConcurrency, profile, upstreamProfile, true, true) } // getClientEntryWithTLS 获取或创建带 TLS 指纹的客户端条目 // TLS 指纹客户端使用独立的缓存键,与普通客户端隔离 -func (s *httpUpstreamService) getClientEntryWithTLS(proxyURL string, accountID int64, accountConcurrency int, profile *tlsfingerprint.Profile, markInFlight bool, enforceLimit bool) (*upstreamClientEntry, error) { +func (s *httpUpstreamService) getClientEntryWithTLS(proxyURL string, accountID int64, accountConcurrency int, profile *tlsfingerprint.Profile, upstreamProfile service.HTTPUpstreamProfile, markInFlight bool, enforceLimit bool) (*upstreamClientEntry, error) { isolation := s.getIsolationMode() proxyKey, parsedProxy, err := normalizeProxyURL(proxyURL) if err != nil { return nil, err } + settings := s.resolvePoolSettings(isolation, accountConcurrency) + settings = s.applyProfilePoolSettings(settings, upstreamProfile) // TLS 指纹客户端使用独立的缓存键,加 "tls:" 前缀 - cacheKey := "tls:" + buildCacheKey(isolation, proxyKey, accountID) - poolKey := s.buildPoolKey(isolation, accountConcurrency) + ":tls" + cacheKey := "tls:" + buildCacheKey(isolation, proxyKey, accountID, upstreamProtocolModeDefault) + poolKey := buildPoolKey(settings, upstreamProtocolModeDefault) + ":tls" now := time.Now() nowUnix := now.UnixNano() @@ -273,7 +316,6 @@ func (s *httpUpstreamService) getClientEntryWithTLS(proxyURL string, accountID i // 创建带 TLS 指纹的 Transport slog.Debug("tls_fingerprint_creating_new_client", "account_id", accountID, "cache_key", cacheKey, "proxy", proxyKey) - settings := s.resolvePoolSettings(isolation, accountConcurrency) transport, err := buildUpstreamTransportWithTLSFingerprint(settings, parsedProxy, profile) if err != nil { s.mu.Unlock() @@ -339,7 +381,12 @@ func (s *httpUpstreamService) redirectChecker(req *http.Request, via []*http.Req // acquireClient 获取或创建客户端,并标记为进行中请求 // 用于请求路径,避免在获取后被淘汰 func (s *httpUpstreamService) acquireClient(proxyURL string, accountID int64, accountConcurrency int) (*upstreamClientEntry, error) { - return s.getClientEntry(proxyURL, accountID, accountConcurrency, true, true) + return s.acquireClientWithProfile(proxyURL, accountID, accountConcurrency, service.HTTPUpstreamProfileDefault) +} + +// acquireClientWithProfile 获取或创建客户端,并按请求 profile 选择协议策略。 +func (s *httpUpstreamService) acquireClientWithProfile(proxyURL string, accountID int64, accountConcurrency int, profile service.HTTPUpstreamProfile) (*upstreamClientEntry, error) { + return s.getClientEntry(proxyURL, accountID, accountConcurrency, profile, true, true) } // getOrCreateClient 获取或创建客户端 @@ -358,13 +405,13 @@ func (s *httpUpstreamService) acquireClient(proxyURL string, accountID int64, ac // - account: 按账户隔离,同一账户共享客户端(代理变更时重建) // - account_proxy: 按账户+代理组合隔离,最细粒度 func (s *httpUpstreamService) getOrCreateClient(proxyURL string, accountID int64, accountConcurrency int) (*upstreamClientEntry, error) { - return s.getClientEntry(proxyURL, accountID, accountConcurrency, false, false) + return s.getClientEntry(proxyURL, accountID, accountConcurrency, service.HTTPUpstreamProfileDefault, false, false) } // getClientEntry 获取或创建客户端条目 // markInFlight=true 时会标记进行中请求,用于请求路径防止被淘汰 // enforceLimit=true 时会限制客户端数量,超限且无法淘汰时返回错误 -func (s *httpUpstreamService) getClientEntry(proxyURL string, accountID int64, accountConcurrency int, markInFlight bool, enforceLimit bool) (*upstreamClientEntry, error) { +func (s *httpUpstreamService) getClientEntry(proxyURL string, accountID int64, accountConcurrency int, profile service.HTTPUpstreamProfile, markInFlight bool, enforceLimit bool) (*upstreamClientEntry, error) { // 获取隔离模式 isolation := s.getIsolationMode() // 标准化代理 URL 并解析 @@ -372,10 +419,14 @@ func (s *httpUpstreamService) getClientEntry(proxyURL string, accountID int64, a if err != nil { return nil, err } + // 根据请求 profile(例如 OpenAI)选择协议模式 + protocolMode := s.resolveProtocolMode(profile, proxyKey, parsedProxy) + settings := s.resolvePoolSettings(isolation, accountConcurrency) + settings = s.applyProfilePoolSettings(settings, profile) // 构建缓存键(根据隔离策略不同) - cacheKey := buildCacheKey(isolation, proxyKey, accountID) + cacheKey := buildCacheKey(isolation, proxyKey, accountID, protocolMode) // 构建连接池配置键(用于检测配置变更) - poolKey := s.buildPoolKey(isolation, accountConcurrency) + poolKey := buildPoolKey(settings, protocolMode) now := time.Now() nowUnix := now.UnixNano() @@ -418,8 +469,7 @@ func (s *httpUpstreamService) getClientEntry(proxyURL string, accountID int64, a } // 缓存未命中或需要重建,创建新客户端 - settings := s.resolvePoolSettings(isolation, accountConcurrency) - transport, err := buildUpstreamTransport(settings, parsedProxy) + transport, err := buildUpstreamTransport(settings, parsedProxy, protocolMode) if err != nil { s.mu.Unlock() return nil, fmt.Errorf("build transport: %w", err) @@ -429,9 +479,10 @@ func (s *httpUpstreamService) getClientEntry(proxyURL string, accountID int64, a client.CheckRedirect = s.redirectChecker } entry := &upstreamClientEntry{ - client: client, - proxyKey: proxyKey, - poolKey: poolKey, + client: client, + proxyKey: proxyKey, + poolKey: poolKey, + protocolMode: protocolMode, } atomic.StoreInt64(&entry.lastUsed, nowUnix) if markInFlight { @@ -615,22 +666,31 @@ func (s *httpUpstreamService) resolvePoolSettings(isolation string, accountConcu return settings } -// buildPoolKey 构建连接池配置键 -// 用于检测配置变更,配置变更时需要重建客户端 -// -// 参数: -// - isolation: 隔离模式 -// - accountConcurrency: 账户并发限制 -// -// 返回: -// - string: 配置键 -func (s *httpUpstreamService) buildPoolKey(isolation string, accountConcurrency int) string { - if isolation == config.ConnectionPoolIsolationAccount || isolation == config.ConnectionPoolIsolationAccountProxy { - if accountConcurrency > 0 { - return fmt.Sprintf("account:%d", accountConcurrency) - } +func (s *httpUpstreamService) applyProfilePoolSettings(settings poolSettings, profile service.HTTPUpstreamProfile) poolSettings { + if profile != service.HTTPUpstreamProfileOpenAI { + return settings } - return "default" + settings.responseHeaderTimeout = 0 + if s != nil && s.cfg != nil && s.cfg.Gateway.OpenAIResponseHeaderTimeout > 0 { + settings.responseHeaderTimeout = time.Duration(s.cfg.Gateway.OpenAIResponseHeaderTimeout) * time.Second + } + return settings +} + +// buildPoolKey 构建连接池配置键,用于检测连接池配置变更。 +func buildPoolKey(settings poolSettings, protocolMode string) string { + base := fmt.Sprintf( + "idle:%d|idle_host:%d|max:%d|idle_timeout:%s|header_timeout:%s", + settings.maxIdleConns, + settings.maxIdleConnsPerHost, + settings.maxConnsPerHost, + settings.idleConnTimeout, + settings.responseHeaderTimeout, + ) + if protocolMode == "" || protocolMode == upstreamProtocolModeDefault { + return base + } + return base + "|proto:" + protocolMode } // buildCacheKey 构建客户端缓存键 @@ -648,15 +708,245 @@ func (s *httpUpstreamService) buildPoolKey(isolation string, accountConcurrency // - proxy 模式: "proxy:{proxyKey}" // - account 模式: "account:{accountID}" // - account_proxy 模式: "account:{accountID}|proxy:{proxyKey}" -func buildCacheKey(isolation, proxyKey string, accountID int64) string { +func buildCacheKey(isolation, proxyKey string, accountID int64, protocolMode string) string { + var base string switch isolation { case config.ConnectionPoolIsolationAccount: - return fmt.Sprintf("account:%d", accountID) + base = fmt.Sprintf("account:%d", accountID) case config.ConnectionPoolIsolationAccountProxy: - return fmt.Sprintf("account:%d|proxy:%s", accountID, proxyKey) + base = fmt.Sprintf("account:%d|proxy:%s", accountID, proxyKey) default: - return fmt.Sprintf("proxy:%s", proxyKey) + base = fmt.Sprintf("proxy:%s", proxyKey) } + if protocolMode != "" && protocolMode != upstreamProtocolModeDefault { + base += "|proto:" + protocolMode + } + return base +} + +func (s *httpUpstreamService) resolveOpenAIHTTP2Settings() openAIHTTP2Settings { + settings := openAIHTTP2Settings{ + enabled: false, + allowProxyFallbackToHTTP1: true, + fallbackErrorThreshold: defaultOpenAIHTTP2FallbackErrorThreshold, + fallbackWindow: defaultOpenAIHTTP2FallbackWindow, + fallbackTTL: defaultOpenAIHTTP2FallbackTTL, + } + if s == nil || s.cfg == nil { + return settings + } + cfg := s.cfg.Gateway.OpenAIHTTP2 + settings.enabled = cfg.Enabled + settings.allowProxyFallbackToHTTP1 = cfg.AllowProxyFallbackToHTTP1 + if cfg.FallbackErrorThreshold > 0 { + settings.fallbackErrorThreshold = cfg.FallbackErrorThreshold + } + if cfg.FallbackWindowSeconds > 0 { + settings.fallbackWindow = time.Duration(cfg.FallbackWindowSeconds) * time.Second + } + if cfg.FallbackTTLSeconds > 0 { + settings.fallbackTTL = time.Duration(cfg.FallbackTTLSeconds) * time.Second + } + return settings +} + +func (s *httpUpstreamService) resolveProtocolMode(profile service.HTTPUpstreamProfile, proxyKey string, parsedProxy *url.URL) string { + if profile != service.HTTPUpstreamProfileOpenAI { + return upstreamProtocolModeDefault + } + settings := s.resolveOpenAIHTTP2Settings() + if !settings.enabled { + return upstreamProtocolModeOpenAIH1 + } + if parsedProxy == nil { + return upstreamProtocolModeOpenAIH2 + } + scheme := strings.ToLower(parsedProxy.Scheme) + if scheme != "http" && scheme != "https" { + return upstreamProtocolModeOpenAIH2 + } + if settings.allowProxyFallbackToHTTP1 && s.isOpenAIHTTP2FallbackActive(proxyKey) { + return upstreamProtocolModeOpenAIH1Fallback + } + return upstreamProtocolModeOpenAIH2 +} + +func (s *httpUpstreamService) isOpenAIHTTP2FallbackActive(proxyKey string) bool { + raw, ok := s.openAIHTTP2Fallbacks.Load(proxyKey) + if !ok { + return false + } + state, ok := raw.(*openAIHTTP2FallbackState) + if !ok || state == nil { + return false + } + return state.isFallbackActive(time.Now()) +} + +func (s *httpUpstreamService) getOrCreateOpenAIHTTP2FallbackState(proxyKey string) *openAIHTTP2FallbackState { + state := &openAIHTTP2FallbackState{} + actual, _ := s.openAIHTTP2Fallbacks.LoadOrStore(proxyKey, state) + cached, ok := actual.(*openAIHTTP2FallbackState) + if !ok || cached == nil { + return state + } + return cached +} + +func isHTTPProxyKey(proxyKey string) bool { + return strings.HasPrefix(proxyKey, "http://") || strings.HasPrefix(proxyKey, "https://") +} + +func isOpenAIHTTP2CompatibilityError(err error) bool { + if err == nil { + return false + } + if isUpstreamTimeoutError(err) { + return false + } + msg := strings.ToLower(err.Error()) + if msg == "" { + return false + } + markers := []string{ + "alpn", + "no application protocol", + "protocol error", + "stream error", + "goaway", + "refused_stream", + "frame too large", + } + for _, marker := range markers { + if strings.Contains(msg, marker) { + return true + } + } + return false +} + +func isUpstreamTimeoutError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) { + return true + } + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return true + } + msg := strings.ToLower(err.Error()) + if msg == "" { + return false + } + timeoutMarkers := []string{ + "timeout awaiting response headers", + "i/o timeout", + "context deadline exceeded", + "client.timeout exceeded while awaiting headers", + "tls handshake timeout", + } + for _, marker := range timeoutMarkers { + if strings.Contains(msg, marker) { + return true + } + } + return false +} + +func (s *httpUpstreamService) recordOpenAIHTTP2Failure(profile service.HTTPUpstreamProfile, protocolMode, proxyKey string, err error) { + if profile != service.HTTPUpstreamProfileOpenAI || protocolMode != upstreamProtocolModeOpenAIH2 { + return + } + settings := s.resolveOpenAIHTTP2Settings() + if !settings.enabled || !settings.allowProxyFallbackToHTTP1 { + return + } + if !isHTTPProxyKey(proxyKey) || !isOpenAIHTTP2CompatibilityError(err) { + return + } + state := s.getOrCreateOpenAIHTTP2FallbackState(proxyKey) + activated, until := state.recordFailure(time.Now(), settings.fallbackErrorThreshold, settings.fallbackWindow, settings.fallbackTTL) + if activated { + slog.Warn("openai_http2_proxy_fallback_activated", + "proxy", proxyKey, + "fallback_until", until.Format(time.RFC3339)) + } +} + +func (s *httpUpstreamService) recordOpenAIHTTP2Success(profile service.HTTPUpstreamProfile, protocolMode, proxyKey string) { + if profile != service.HTTPUpstreamProfileOpenAI || protocolMode != upstreamProtocolModeOpenAIH2 { + return + } + if !isHTTPProxyKey(proxyKey) { + return + } + raw, ok := s.openAIHTTP2Fallbacks.Load(proxyKey) + if !ok { + return + } + state, ok := raw.(*openAIHTTP2FallbackState) + if !ok || state == nil { + return + } + state.resetErrorWindow() +} + +func (s *openAIHTTP2FallbackState) isFallbackActive(now time.Time) bool { + s.mu.Lock() + defer s.mu.Unlock() + if s.fallbackUntil.IsZero() { + return false + } + if now.Before(s.fallbackUntil) { + return true + } + s.fallbackUntil = time.Time{} + return false +} + +func (s *openAIHTTP2FallbackState) resetErrorWindow() { + s.mu.Lock() + defer s.mu.Unlock() + s.windowStart = time.Time{} + s.errorCount = 0 +} + +func (s *openAIHTTP2FallbackState) recordFailure(now time.Time, threshold int, window, ttl time.Duration) (bool, time.Time) { + if threshold <= 0 { + threshold = defaultOpenAIHTTP2FallbackErrorThreshold + } + if window <= 0 { + window = defaultOpenAIHTTP2FallbackWindow + } + if ttl <= 0 { + ttl = defaultOpenAIHTTP2FallbackTTL + } + + s.mu.Lock() + defer s.mu.Unlock() + + if !s.fallbackUntil.IsZero() && now.Before(s.fallbackUntil) { + return false, s.fallbackUntil + } + if !s.fallbackUntil.IsZero() && !now.Before(s.fallbackUntil) { + s.fallbackUntil = time.Time{} + } + + if s.windowStart.IsZero() || now.Sub(s.windowStart) > window { + s.windowStart = now + s.errorCount = 0 + } + s.errorCount++ + if s.errorCount < threshold { + return false, time.Time{} + } + + s.fallbackUntil = now.Add(ttl) + s.windowStart = time.Time{} + s.errorCount = 0 + return true, s.fallbackUntil } // normalizeProxyURL 标准化代理 URL @@ -728,7 +1018,7 @@ func defaultPoolSettings(cfg *config.Config) poolSettings { if cfg.Gateway.IdleConnTimeoutSeconds > 0 { idleConnTimeout = time.Duration(cfg.Gateway.IdleConnTimeoutSeconds) * time.Second } - if cfg.Gateway.ResponseHeaderTimeout > 0 { + if cfg.Gateway.ResponseHeaderTimeout >= 0 { responseHeaderTimeout = time.Duration(cfg.Gateway.ResponseHeaderTimeout) * time.Second } } @@ -759,7 +1049,7 @@ func defaultPoolSettings(cfg *config.Config) poolSettings { // - MaxConnsPerHost: 每主机最大连接数(达到后新请求等待) // - IdleConnTimeout: 空闲连接超时(超时后关闭) // - ResponseHeaderTimeout: 等待响应头超时(不影响流式传输) -func buildUpstreamTransport(settings poolSettings, proxyURL *url.URL) (*http.Transport, error) { +func buildUpstreamTransport(settings poolSettings, proxyURL *url.URL, protocolMode string) (*http.Transport, error) { transport := &http.Transport{ MaxIdleConns: settings.maxIdleConns, MaxIdleConnsPerHost: settings.maxIdleConnsPerHost, @@ -767,6 +1057,17 @@ func buildUpstreamTransport(settings poolSettings, proxyURL *url.URL) (*http.Tra IdleConnTimeout: settings.idleConnTimeout, ResponseHeaderTimeout: settings.responseHeaderTimeout, } + switch protocolMode { + case upstreamProtocolModeOpenAIH2: + transport.ForceAttemptHTTP2 = true + case upstreamProtocolModeOpenAIH1: + transport.ForceAttemptHTTP2 = false + transport.TLSNextProto = make(map[string]func(string, *tls.Conn) http.RoundTripper) + case upstreamProtocolModeOpenAIH1Fallback: + // 显式禁用 HTTP/2,确保代理不兼容场景回退到 HTTP/1.1。 + transport.ForceAttemptHTTP2 = false + transport.TLSNextProto = make(map[string]func(string, *tls.Conn) http.RoundTripper) + } if err := proxyutil.ConfigureTransportProxy(transport, proxyURL); err != nil { return nil, err } diff --git a/backend/internal/repository/http_upstream_benchmark_test.go b/backend/internal/repository/http_upstream_benchmark_test.go index 89892b3b..a92105c1 100644 --- a/backend/internal/repository/http_upstream_benchmark_test.go +++ b/backend/internal/repository/http_upstream_benchmark_test.go @@ -45,7 +45,7 @@ func BenchmarkHTTPUpstreamProxyClient(b *testing.B) { settings := defaultPoolSettings(cfg) for i := 0; i < b.N; i++ { // 每次迭代都创建新客户端,包含 Transport 分配 - transport, err := buildUpstreamTransport(settings, parsedProxy) + transport, err := buildUpstreamTransport(settings, parsedProxy, upstreamProtocolModeDefault) if err != nil { b.Fatalf("创建 Transport 失败: %v", err) } diff --git a/backend/internal/repository/http_upstream_test.go b/backend/internal/repository/http_upstream_test.go index b3268463..f331e005 100644 --- a/backend/internal/repository/http_upstream_test.go +++ b/backend/internal/repository/http_upstream_test.go @@ -1,6 +1,7 @@ package repository import ( + "errors" "io" "net/http" "sync/atomic" @@ -8,6 +9,8 @@ import ( "time" "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/Wei-Shaw/sub2api/internal/pkg/tlsfingerprint" + "github.com/Wei-Shaw/sub2api/internal/service" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" ) @@ -41,12 +44,23 @@ func (s *HTTPUpstreamSuite) newService() *httpUpstreamService { } // TestDefaultResponseHeaderTimeout 测试默认响应头超时配置 -// 验证未配置时使用 300 秒默认值 +// 验证显式 0 会禁用等待响应头超时 func (s *HTTPUpstreamSuite) TestDefaultResponseHeaderTimeout() { svc := s.newService() entry := mustGetOrCreateClient(s.T(), svc, "", 0, 0) transport, ok := entry.client.Transport.(*http.Transport) require.True(s.T(), ok, "expected *http.Transport") + require.Equal(s.T(), time.Duration(0), transport.ResponseHeaderTimeout, "ResponseHeaderTimeout mismatch") +} + +// TestNilConfigResponseHeaderTimeoutFallback 验证 nil 配置使用代码级兜底值。 +func (s *HTTPUpstreamSuite) TestNilConfigResponseHeaderTimeoutFallback() { + up := NewHTTPUpstream(nil) + svc, ok := up.(*httpUpstreamService) + require.True(s.T(), ok, "expected *httpUpstreamService") + entry := mustGetOrCreateClient(s.T(), svc, "", 0, 0) + transport, ok := entry.client.Transport.(*http.Transport) + require.True(s.T(), ok, "expected *http.Transport") require.Equal(s.T(), 300*time.Second, transport.ResponseHeaderTimeout, "ResponseHeaderTimeout mismatch") } @@ -65,10 +79,130 @@ func (s *HTTPUpstreamSuite) TestCustomResponseHeaderTimeout() { // 验证解析失败时拒绝回退到直连模式 func (s *HTTPUpstreamSuite) TestGetOrCreateClient_InvalidURLReturnsError() { svc := s.newService() - _, err := svc.getClientEntry("://bad-proxy-url", 1, 1, false, false) + _, err := svc.getClientEntry("://bad-proxy-url", 1, 1, service.HTTPUpstreamProfileDefault, false, false) require.Error(s.T(), err, "expected error for invalid proxy URL") } +func (s *HTTPUpstreamSuite) TestOpenAIProfileDefaultsToHTTP2AndNoHeaderTimeout() { + s.cfg.Gateway = config.GatewayConfig{ + ResponseHeaderTimeout: 600, + OpenAIHTTP2: config.GatewayOpenAIHTTP2Config{ + Enabled: true, + AllowProxyFallbackToHTTP1: true, + }, + } + svc := s.newService() + entry, err := svc.getClientEntry("", 1, 1, service.HTTPUpstreamProfileOpenAI, false, false) + require.NoError(s.T(), err) + transport, ok := entry.client.Transport.(*http.Transport) + require.True(s.T(), ok, "expected *http.Transport") + require.Equal(s.T(), time.Duration(0), transport.ResponseHeaderTimeout, "OpenAI profile should not inherit generic header timeout") + require.True(s.T(), transport.ForceAttemptHTTP2, "OpenAI profile should prefer HTTP/2") + require.Equal(s.T(), upstreamProtocolModeOpenAIH2, entry.protocolMode) +} + +func (s *HTTPUpstreamSuite) TestOpenAIProfileCustomHeaderTimeout() { + s.cfg.Gateway = config.GatewayConfig{ + ResponseHeaderTimeout: 600, + OpenAIResponseHeaderTimeout: 1800, + OpenAIHTTP2: config.GatewayOpenAIHTTP2Config{ + Enabled: true, + }, + } + svc := s.newService() + entry, err := svc.getClientEntry("", 1, 1, service.HTTPUpstreamProfileOpenAI, false, false) + require.NoError(s.T(), err) + transport, ok := entry.client.Transport.(*http.Transport) + require.True(s.T(), ok, "expected *http.Transport") + require.Equal(s.T(), 1800*time.Second, transport.ResponseHeaderTimeout) +} + +func (s *HTTPUpstreamSuite) TestOpenAIProfileTLSFingerprintDoesNotInheritGenericHeaderTimeout() { + s.cfg.Gateway = config.GatewayConfig{ + ResponseHeaderTimeout: 600, + OpenAIHTTP2: config.GatewayOpenAIHTTP2Config{ + Enabled: true, + }, + } + svc := s.newService() + entry, err := svc.getClientEntryWithTLS("", 1, 1, &tlsfingerprint.Profile{Name: "test"}, service.HTTPUpstreamProfileOpenAI, false, false) + require.NoError(s.T(), err) + transport, ok := entry.client.Transport.(*http.Transport) + require.True(s.T(), ok, "expected *http.Transport") + require.Equal(s.T(), time.Duration(0), transport.ResponseHeaderTimeout, "OpenAI TLS path should not inherit generic header timeout") +} + +func (s *HTTPUpstreamSuite) TestOpenAIProfileHTTP2DisabledUsesHTTP1Transport() { + s.cfg.Gateway = config.GatewayConfig{ + OpenAIHTTP2: config.GatewayOpenAIHTTP2Config{Enabled: false}, + } + svc := s.newService() + entry, err := svc.getClientEntry("", 1, 1, service.HTTPUpstreamProfileOpenAI, false, false) + require.NoError(s.T(), err) + transport, ok := entry.client.Transport.(*http.Transport) + require.True(s.T(), ok, "expected *http.Transport") + require.False(s.T(), transport.ForceAttemptHTTP2, "OpenAI HTTP/2 disabled should not force H2") + require.NotNil(s.T(), transport.TLSNextProto, "HTTP/1 mode should disable automatic H2 negotiation") + require.Equal(s.T(), upstreamProtocolModeOpenAIH1, entry.protocolMode) +} + +func (s *HTTPUpstreamSuite) TestOpenAIHeaderTimeoutChangeRebuildsClient() { + s.cfg.Gateway = config.GatewayConfig{ + OpenAIHTTP2: config.GatewayOpenAIHTTP2Config{Enabled: true}, + } + svc := s.newService() + entry1, err := svc.getClientEntry("", 1, 1, service.HTTPUpstreamProfileOpenAI, false, false) + require.NoError(s.T(), err) + + s.cfg.Gateway.OpenAIResponseHeaderTimeout = 1800 + entry2, err := svc.getClientEntry("", 1, 1, service.HTTPUpstreamProfileOpenAI, false, false) + require.NoError(s.T(), err) + require.NotSame(s.T(), entry1, entry2, "OpenAI header timeout changes must rebuild cached client") + transport, ok := entry2.client.Transport.(*http.Transport) + require.True(s.T(), ok, "expected *http.Transport") + require.Equal(s.T(), 1800*time.Second, transport.ResponseHeaderTimeout) +} + +func (s *HTTPUpstreamSuite) TestOpenAIHTTP2TimeoutDoesNotActivateProxyFallback() { + s.cfg.Gateway = config.GatewayConfig{ + OpenAIHTTP2: config.GatewayOpenAIHTTP2Config{ + Enabled: true, + AllowProxyFallbackToHTTP1: true, + FallbackErrorThreshold: 1, + FallbackWindowSeconds: 60, + FallbackTTLSeconds: 600, + }, + } + svc := s.newService() + proxyURL := "http://proxy.local:8080" + svc.recordOpenAIHTTP2Failure(service.HTTPUpstreamProfileOpenAI, upstreamProtocolModeOpenAIH2, proxyURL, errors.New("http2: timeout awaiting response headers")) + require.False(s.T(), svc.isOpenAIHTTP2FallbackActive(proxyURL), "header timeout should not be treated as H2 compatibility failure") +} + +func (s *HTTPUpstreamSuite) TestOpenAIHTTP2ProxyCompatibilityErrorActivatesFallback() { + s.cfg.Gateway = config.GatewayConfig{ + OpenAIHTTP2: config.GatewayOpenAIHTTP2Config{ + Enabled: true, + AllowProxyFallbackToHTTP1: true, + FallbackErrorThreshold: 1, + FallbackWindowSeconds: 60, + FallbackTTLSeconds: 600, + }, + } + svc := s.newService() + proxyURL := "http://proxy.local:8080" + svc.recordOpenAIHTTP2Failure(service.HTTPUpstreamProfileOpenAI, upstreamProtocolModeOpenAIH2, proxyURL, errors.New("http2: protocol error")) + require.True(s.T(), svc.isOpenAIHTTP2FallbackActive(proxyURL)) + + entry, err := svc.getClientEntry(proxyURL, 1, 1, service.HTTPUpstreamProfileOpenAI, false, false) + require.NoError(s.T(), err) + transport, ok := entry.client.Transport.(*http.Transport) + require.True(s.T(), ok, "expected *http.Transport") + require.False(s.T(), transport.ForceAttemptHTTP2) + require.NotNil(s.T(), transport.TLSNextProto) + require.Equal(s.T(), upstreamProtocolModeOpenAIH1Fallback, entry.protocolMode) +} + // TestNormalizeProxyURL_Canonicalizes 测试代理 URL 规范化 // 验证等价地址能够映射到同一缓存键 func (s *HTTPUpstreamSuite) TestNormalizeProxyURL_Canonicalizes() { diff --git a/backend/internal/service/account_test_service.go b/backend/internal/service/account_test_service.go index bb448e2d..032c13b1 100644 --- a/backend/internal/service/account_test_service.go +++ b/backend/internal/service/account_test_service.go @@ -580,6 +580,7 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account if err != nil { return s.sendErrorAndEnd(c, "Failed to create request") } + req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI)) // Set common headers req.Header.Set("Content-Type", "application/json") @@ -659,6 +660,7 @@ func (s *AccountTestService) testOpenAIChatCompletionsConnection( if err != nil { return s.sendErrorAndEnd(c, "Failed to create Chat Completions request") } + req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI)) req.Header.Set("Content-Type", "application/json") req.Header.Set("Accept", "text/event-stream") req.Header.Set("Authorization", "Bearer "+authToken) @@ -739,6 +741,7 @@ func (s *AccountTestService) testOpenAICompactConnection(c *gin.Context, account if err != nil { return s.sendErrorAndEnd(c, "Failed to create request") } + req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI)) req.Header.Set("Content-Type", "application/json") req.Header.Set("Accept", "application/json") @@ -1505,6 +1508,7 @@ func (s *AccountTestService) testOpenAIImageAPIKey(c *gin.Context, ctx context.C if err != nil { return s.sendErrorAndEnd(c, "Failed to create request") } + req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI)) req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+authToken) @@ -1593,6 +1597,7 @@ func (s *AccountTestService) testOpenAIImageOAuth(c *gin.Context, ctx context.Co if err != nil { return s.sendErrorAndEnd(c, "Failed to create request") } + req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI)) req.Host = "chatgpt.com" req.Header.Set("Authorization", "Bearer "+authToken) req.Header.Set("Content-Type", "application/json") diff --git a/backend/internal/service/account_test_service_openai_compact_test.go b/backend/internal/service/account_test_service_openai_compact_test.go index 9eb98fdc..c9849e04 100644 --- a/backend/internal/service/account_test_service_openai_compact_test.go +++ b/backend/internal/service/account_test_service_openai_compact_test.go @@ -57,6 +57,7 @@ func TestAccountTestService_TestAccountConnection_OpenAICompactOAuthSuccessPersi require.Equal(t, "application/json", upstream.lastReq.Header.Get("Accept")) require.Equal(t, codexCLIVersion, upstream.lastReq.Header.Get("Version")) require.NotEmpty(t, upstream.lastReq.Header.Get("Session_Id")) + require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.lastReq.Context())) require.Equal(t, codexCLIUserAgent, upstream.lastReq.Header.Get("User-Agent")) require.Equal(t, "chatgpt-acc", upstream.lastReq.Header.Get("chatgpt-account-id")) require.Equal(t, "gpt-5.4", gjson.GetBytes(upstream.lastBody, "model").String()) diff --git a/backend/internal/service/account_test_service_openai_image_test.go b/backend/internal/service/account_test_service_openai_image_test.go index 257159c4..9c24070c 100644 --- a/backend/internal/service/account_test_service_openai_image_test.go +++ b/backend/internal/service/account_test_service_openai_image_test.go @@ -45,6 +45,8 @@ func TestAccountTestService_OpenAIImageOAuthHandlesOutputItemDoneFallback(t *tes err := svc.testOpenAIImageOAuth(c, context.Background(), account, "gpt-image-2", "draw a cat") require.NoError(t, err) + require.NotNil(t, upstream.lastReq) + require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.lastReq.Context())) require.Contains(t, rec.Body.String(), "Calling Codex /responses image tool") require.Contains(t, rec.Body.String(), "data:image/png;base64,aGVsbG8=") require.Contains(t, rec.Body.String(), "\"success\":true") @@ -83,6 +85,7 @@ func TestAccountTestService_OpenAIImageAPIKeyUsesConfiguredV1BaseURL(t *testing. err := svc.testOpenAIImageAPIKey(c, context.Background(), account, "gpt-image-2", "draw a cat") require.NoError(t, err) require.NotNil(t, upstream.lastReq) + require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.lastReq.Context())) require.Equal(t, "https://image-upstream.example/v1/images/generations", upstream.lastReq.URL.String()) require.Equal(t, "Bearer test-api-key", upstream.lastReq.Header.Get("Authorization")) require.Contains(t, rec.Body.String(), "data:image/png;base64,aGVsbG8=") diff --git a/backend/internal/service/account_test_service_openai_test.go b/backend/internal/service/account_test_service_openai_test.go index 9844957a..910567fb 100644 --- a/backend/internal/service/account_test_service_openai_test.go +++ b/backend/internal/service/account_test_service_openai_test.go @@ -129,6 +129,8 @@ func TestAccountTestService_OpenAISuccessPersistsSnapshotFromHeaders(t *testing. err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "", "") require.NoError(t, err) + require.Len(t, upstream.requests, 1) + require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.requests[0].Context())) require.NotEmpty(t, repo.updatedExtra) require.Equal(t, 42.0, repo.updatedExtra["codex_5h_used_percent"]) require.Equal(t, 88.0, repo.updatedExtra["codex_7d_used_percent"]) @@ -372,6 +374,7 @@ func TestAccountTestService_OpenAIAPIKeyResponsesUnsupportedUsesChatCompletionsP err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "hello", "") require.NoError(t, err) require.NotNil(t, upstream.lastReq) + require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.lastReq.Context())) require.Equal(t, "https://compat-upstream.example/v1/chat/completions", upstream.lastReq.URL.String()) require.Equal(t, "Bearer sk-test", upstream.lastReq.Header.Get("Authorization")) require.Equal(t, "text/event-stream", upstream.lastReq.Header.Get("Accept")) diff --git a/backend/internal/service/http_upstream_profile.go b/backend/internal/service/http_upstream_profile.go new file mode 100644 index 00000000..2d63bbd5 --- /dev/null +++ b/backend/internal/service/http_upstream_profile.go @@ -0,0 +1,42 @@ +package service + +import "context" + +// HTTPUpstreamProfile marks HTTP upstream requests that need provider-specific +// transport policy. +type HTTPUpstreamProfile string + +const ( + HTTPUpstreamProfileDefault HTTPUpstreamProfile = "" + HTTPUpstreamProfileOpenAI HTTPUpstreamProfile = "openai" +) + +type httpUpstreamProfileContextKey struct{} + +// WithHTTPUpstreamProfile injects an upstream transport profile into ctx. +func WithHTTPUpstreamProfile(ctx context.Context, profile HTTPUpstreamProfile) context.Context { + if ctx == nil { + ctx = context.Background() + } + if profile == HTTPUpstreamProfileDefault { + return ctx + } + return context.WithValue(ctx, httpUpstreamProfileContextKey{}, profile) +} + +// HTTPUpstreamProfileFromContext resolves the upstream transport profile from ctx. +func HTTPUpstreamProfileFromContext(ctx context.Context) HTTPUpstreamProfile { + if ctx == nil { + return HTTPUpstreamProfileDefault + } + profile, ok := ctx.Value(httpUpstreamProfileContextKey{}).(HTTPUpstreamProfile) + if !ok { + return HTTPUpstreamProfileDefault + } + switch profile { + case HTTPUpstreamProfileOpenAI: + return profile + default: + return HTTPUpstreamProfileDefault + } +} diff --git a/backend/internal/service/http_upstream_profile_test.go b/backend/internal/service/http_upstream_profile_test.go new file mode 100644 index 00000000..96f0cd31 --- /dev/null +++ b/backend/internal/service/http_upstream_profile_test.go @@ -0,0 +1,21 @@ +package service + +import ( + "context" + "testing" +) + +func TestWithHTTPUpstreamProfile_DefaultKeepsContext(t *testing.T) { + ctx := context.Background() + got := WithHTTPUpstreamProfile(ctx, HTTPUpstreamProfileDefault) + if got != ctx { + t.Fatal("default profile should not wrap context") + } +} + +func TestWithHTTPUpstreamProfile_OpenAI(t *testing.T) { + ctx := WithHTTPUpstreamProfile(context.TODO(), HTTPUpstreamProfileOpenAI) + if profile := HTTPUpstreamProfileFromContext(ctx); profile != HTTPUpstreamProfileOpenAI { + t.Fatalf("expected profile %q, got %q", HTTPUpstreamProfileOpenAI, profile) + } +} diff --git a/backend/internal/service/openai_apikey_responses_probe.go b/backend/internal/service/openai_apikey_responses_probe.go index a4eb9252..051527f3 100644 --- a/backend/internal/service/openai_apikey_responses_probe.go +++ b/backend/internal/service/openai_apikey_responses_probe.go @@ -95,6 +95,7 @@ func (s *AccountTestService) ProbeOpenAIAPIKeyResponsesSupport(ctx context.Conte logger.LegacyPrintf("service.openai_probe", "probe_build_request_failed: account_id=%d err=%v", accountID, err) return } + req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI)) req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+apiKey) req.Header.Set("Accept", "application/json") diff --git a/backend/internal/service/openai_gateway_chat_completions_raw.go b/backend/internal/service/openai_gateway_chat_completions_raw.go index ad6d3e8d..19f99f69 100644 --- a/backend/internal/service/openai_gateway_chat_completions_raw.go +++ b/backend/internal/service/openai_gateway_chat_completions_raw.go @@ -135,6 +135,7 @@ func (s *OpenAIGatewayService) forwardAsRawChatCompletions( if err != nil { return nil, fmt.Errorf("build upstream request: %w", err) } + upstreamReq = upstreamReq.WithContext(WithHTTPUpstreamProfile(upstreamReq.Context(), HTTPUpstreamProfileOpenAI)) upstreamReq.Header.Set("Content-Type", "application/json") upstreamReq.Header.Set("Authorization", "Bearer "+apiKey) if clientStream { diff --git a/backend/internal/service/openai_gateway_chat_completions_raw_test.go b/backend/internal/service/openai_gateway_chat_completions_raw_test.go index 64449636..8e5fab20 100644 --- a/backend/internal/service/openai_gateway_chat_completions_raw_test.go +++ b/backend/internal/service/openai_gateway_chat_completions_raw_test.go @@ -116,6 +116,7 @@ func TestForwardAsRawChatCompletions_ForcesStreamUsageUpstreamAndPassesUsageDown require.Equal(t, 3, result.Usage.CacheReadInputTokens) require.NotNil(t, upstream.lastReq) require.NoError(t, upstream.lastReq.Context().Err()) + require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.lastReq.Context())) require.True(t, gjson.GetBytes(upstream.lastBody, "stream_options.include_usage").Bool()) require.Contains(t, rec.Body.String(), `"usage"`) require.Contains(t, rec.Body.String(), "data: [DONE]") diff --git a/backend/internal/service/openai_gateway_responses_chat_fallback.go b/backend/internal/service/openai_gateway_responses_chat_fallback.go index c3ebc35c..91203bc1 100644 --- a/backend/internal/service/openai_gateway_responses_chat_fallback.go +++ b/backend/internal/service/openai_gateway_responses_chat_fallback.go @@ -116,6 +116,7 @@ func (s *OpenAIGatewayService) forwardResponsesViaRawChatCompletions( if err != nil { return nil, fmt.Errorf("build upstream request: %w", err) } + upstreamReq = upstreamReq.WithContext(WithHTTPUpstreamProfile(upstreamReq.Context(), HTTPUpstreamProfileOpenAI)) upstreamReq.Header.Set("Content-Type", "application/json") upstreamReq.Header.Set("Authorization", "Bearer "+apiKey) if clientStream { diff --git a/backend/internal/service/openai_gateway_responses_chat_fallback_test.go b/backend/internal/service/openai_gateway_responses_chat_fallback_test.go index 78df2202..abb645e8 100644 --- a/backend/internal/service/openai_gateway_responses_chat_fallback_test.go +++ b/backend/internal/service/openai_gateway_responses_chat_fallback_test.go @@ -42,6 +42,7 @@ func TestForwardResponses_ForceChatCompletionsRoutesNonStreamingToChatCompletion require.NoError(t, err) require.NotNil(t, result) require.Equal(t, "http://upstream.example/v1/chat/completions", upstream.lastReq.URL.String()) + require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.lastReq.Context())) require.Equal(t, "hello", gjson.GetBytes(upstream.lastBody, "messages.0.content").String()) require.False(t, gjson.GetBytes(upstream.lastBody, "input").Exists()) require.Equal(t, "response", gjson.Get(rec.Body.String(), "object").String()) diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index f312f50d..c8c7eead 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -3229,6 +3229,7 @@ func (s *OpenAIGatewayService) buildUpstreamRequestOpenAIPassthrough( if err != nil { return nil, err } + req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI)) // 透传客户端请求头(安全白名单)。 allowTimeoutHeaders := s.isOpenAIPassthroughTimeoutHeadersAllowed() @@ -3951,6 +3952,7 @@ func (s *OpenAIGatewayService) buildUpstreamRequest(ctx context.Context, c *gin. if err != nil { return nil, err } + req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI)) // Set authentication header req.Header.Set("authorization", "Bearer "+token) diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index 013d7a08..ef35aa1a 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -1841,6 +1841,7 @@ func TestOpenAIBuildUpstreamRequestOpenAIPassthroughPreservesCompactPath(t *test require.Equal(t, "application/json", req.Header.Get("Accept")) require.Equal(t, codexCLIVersion, req.Header.Get("Version")) require.NotEmpty(t, req.Header.Get("Session_Id")) + require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(req.Context())) } func TestOpenAIBuildUpstreamRequestCompactForcesJSONAcceptForOAuth(t *testing.T) { @@ -1861,6 +1862,7 @@ func TestOpenAIBuildUpstreamRequestCompactForcesJSONAcceptForOAuth(t *testing.T) require.Equal(t, "application/json", req.Header.Get("Accept")) require.Equal(t, codexCLIVersion, req.Header.Get("Version")) require.NotEmpty(t, req.Header.Get("Session_Id")) + require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(req.Context())) } func TestOpenAIBuildUpstreamRequestOAuthMessagesBridgeUsesSessionOnly(t *testing.T) { diff --git a/backend/internal/service/openai_images.go b/backend/internal/service/openai_images.go index 95c054c9..19066f1d 100644 --- a/backend/internal/service/openai_images.go +++ b/backend/internal/service/openai_images.go @@ -743,6 +743,7 @@ func (s *OpenAIGatewayService) buildOpenAIImagesRequest( if err != nil { return nil, err } + req = req.WithContext(WithHTTPUpstreamProfile(req.Context(), HTTPUpstreamProfileOpenAI)) req.Header.Set("Authorization", "Bearer "+token) for key, values := range c.Request.Header { if !openaiPassthroughAllowedHeaders[strings.ToLower(key)] { diff --git a/backend/internal/service/openai_images_test.go b/backend/internal/service/openai_images_test.go index 52903a1b..854e9f6d 100644 --- a/backend/internal/service/openai_images_test.go +++ b/backend/internal/service/openai_images_test.go @@ -528,6 +528,7 @@ func TestOpenAIGatewayServiceForwardImages_OAuthPassesNAndReturnsAllImages(t *te require.NotNil(t, upstream.lastReq) require.Equal(t, chatgptCodexURL, upstream.lastReq.URL.String()) require.Equal(t, "chatgpt.com", upstream.lastReq.Host) + require.Equal(t, HTTPUpstreamProfileOpenAI, HTTPUpstreamProfileFromContext(upstream.lastReq.Context())) require.Equal(t, "application/json", upstream.lastReq.Header.Get("Content-Type")) require.Equal(t, "text/event-stream", upstream.lastReq.Header.Get("Accept")) require.Equal(t, "acct-123", upstream.lastReq.Header.Get("chatgpt-account-id")) diff --git a/deploy/.env.example b/deploy/.env.example index b38c6305..e80663ef 100644 --- a/deploy/.env.example +++ b/deploy/.env.example @@ -251,6 +251,14 @@ RATE_LIMIT_OVERLOAD_COOLDOWN_MINUTES=10 # # 默认:false GATEWAY_FORCE_CODEX_CLI=false +# OpenAI/Codex 等待上游响应头超时(秒);0 表示不使用本地响应头超时截断。 +GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT=0 +# OpenAI HTTP 上游默认启用 HTTP/2;如需紧急回滚可设为 false。 +GATEWAY_OPENAI_HTTP2_ENABLED=true +GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1=true +GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD=2 +GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS=60 +GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS=600 # 上游连接池:每主机最大连接数(默认 1024;流式/HTTP1.1 场景可调大,如 2400/4096) GATEWAY_MAX_CONNS_PER_HOST=2048 # 上游连接池:最大空闲连接总数(默认 2560;账号/代理隔离 + 高并发场景可调大) diff --git a/deploy/config.example.yaml b/deploy/config.example.yaml index 8e9b0e3b..bef5dc0a 100644 --- a/deploy/config.example.yaml +++ b/deploy/config.example.yaml @@ -149,6 +149,9 @@ gateway: # Timeout for waiting upstream response headers (seconds) # 等待上游响应头超时时间(秒) response_header_timeout: 600 + # OpenAI/Codex upstream response header timeout (seconds, 0=disabled) + # OpenAI/Codex 等待上游响应头超时时间(秒,0=禁用本地响应头超时) + openai_response_header_timeout: 0 # Max request body size in bytes (default: 256MB) # 请求体最大字节数(默认 256MB) max_body_size: 268435456 @@ -317,6 +320,14 @@ gateway: queue: 0.7 error_rate: 0.8 ttft: 0.5 + # OpenAI HTTP upstream protocol strategy. + # OpenAI HTTP 上游协议策略(默认 HTTP/2;代理明确不兼容时可临时回退 HTTP/1.1)。 + openai_http2: + enabled: true + allow_proxy_fallback_to_http1: true + fallback_error_threshold: 2 + fallback_window_seconds: 60 + fallback_ttl_seconds: 600 # HTTP upstream connection pool settings (HTTP/2 + multi-proxy scenario defaults) # HTTP 上游连接池配置(HTTP/2 + 多代理场景默认值) # Max idle connections across all hosts diff --git a/deploy/docker-compose.dev.yml b/deploy/docker-compose.dev.yml index b7a805b5..47e0bcad 100644 --- a/deploy/docker-compose.dev.yml +++ b/deploy/docker-compose.dev.yml @@ -40,6 +40,13 @@ services: - JWT_SECRET=${JWT_SECRET:-} - TOTP_ENCRYPTION_KEY=${TOTP_ENCRYPTION_KEY:-} - TZ=${TZ:-Asia/Shanghai} + # OpenAI HTTP upstream protocol/timeout + - GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT=${GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT:-0} + - GATEWAY_OPENAI_HTTP2_ENABLED=${GATEWAY_OPENAI_HTTP2_ENABLED:-true} + - GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1=${GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1:-true} + - GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD=${GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD:-2} + - GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS:-60} + - GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS:-600} - GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT=${GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT:-900} - GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL=${GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL:-10} - GATEWAY_IMAGE_CONCURRENCY_ENABLED=${GATEWAY_IMAGE_CONCURRENCY_ENABLED:-false} diff --git a/deploy/docker-compose.local.yml b/deploy/docker-compose.local.yml index ca915112..ec5a66da 100644 --- a/deploy/docker-compose.local.yml +++ b/deploy/docker-compose.local.yml @@ -151,6 +151,13 @@ services: # ======================================================================= # Image Generation Stream & Concurrency # ======================================================================= + # OpenAI HTTP upstream protocol/timeout + - GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT=${GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT:-0} + - GATEWAY_OPENAI_HTTP2_ENABLED=${GATEWAY_OPENAI_HTTP2_ENABLED:-true} + - GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1=${GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1:-true} + - GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD=${GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD:-2} + - GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS:-60} + - GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS:-600} - GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT=${GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT:-900} - GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL=${GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL:-10} - GATEWAY_IMAGE_CONCURRENCY_ENABLED=${GATEWAY_IMAGE_CONCURRENCY_ENABLED:-false} diff --git a/deploy/docker-compose.standalone.yml b/deploy/docker-compose.standalone.yml index 44383dbe..32afb28d 100644 --- a/deploy/docker-compose.standalone.yml +++ b/deploy/docker-compose.standalone.yml @@ -98,6 +98,13 @@ services: # ======================================================================= # Image Generation Stream & Concurrency # ======================================================================= + # OpenAI HTTP upstream protocol/timeout + - GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT=${GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT:-0} + - GATEWAY_OPENAI_HTTP2_ENABLED=${GATEWAY_OPENAI_HTTP2_ENABLED:-true} + - GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1=${GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1:-true} + - GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD=${GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD:-2} + - GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS:-60} + - GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS:-600} - GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT=${GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT:-900} - GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL=${GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL:-10} - GATEWAY_IMAGE_CONCURRENCY_ENABLED=${GATEWAY_IMAGE_CONCURRENCY_ENABLED:-false} diff --git a/deploy/docker-compose.yml b/deploy/docker-compose.yml index 37b13ac1..fd682a87 100644 --- a/deploy/docker-compose.yml +++ b/deploy/docker-compose.yml @@ -147,6 +147,13 @@ services: # ======================================================================= # Image Generation Stream & Concurrency # ======================================================================= + # OpenAI HTTP upstream protocol/timeout + - GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT=${GATEWAY_OPENAI_RESPONSE_HEADER_TIMEOUT:-0} + - GATEWAY_OPENAI_HTTP2_ENABLED=${GATEWAY_OPENAI_HTTP2_ENABLED:-true} + - GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1=${GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1:-true} + - GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD=${GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD:-2} + - GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS:-60} + - GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS:-600} - GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT=${GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT:-900} - GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL=${GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL:-10} - GATEWAY_IMAGE_CONCURRENCY_ENABLED=${GATEWAY_IMAGE_CONCURRENCY_ENABLED:-false}