diff --git a/Dockerfile b/Dockerfile index 7befb464..d556008b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,8 +20,8 @@ FROM ${NODE_IMAGE} AS frontend-builder WORKDIR /app/frontend -# Install pnpm -RUN corepack enable && corepack prepare pnpm@latest --activate +# Install pnpm (pinned to v9 to match CI and keep builds reproducible) +RUN corepack enable && corepack prepare pnpm@9 --activate # Install dependencies first (better caching) COPY frontend/package.json frontend/pnpm-lock.yaml ./ diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 65836a7e..35de1bff 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -325,6 +325,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, reqModel, fs.FailedAccountIDs, "", int64(0)) // Gemini 不使用会话限制 if err != nil { if len(fs.FailedAccountIDs) == 0 { + markOpsRoutingCapacityLimitedIfNoAvailable(c, err) reqLog.Warn("gateway.select_account_no_available", zap.String("model", reqModel), zap.Int64p("group_id", apiKey.GroupID), @@ -374,6 +375,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { accountReleaseFunc := selection.ReleaseFunc if !selection.Acquired { if selection.WaitPlan == nil { + markOpsRoutingCapacityLimited(c) reqLog.Warn("gateway.select_account_no_slot_no_wait_plan", zap.Int64("account_id", account.ID), zap.String("model", reqModel), @@ -566,6 +568,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), currentAPIKey.GroupID, sessionKey, reqModel, fs.FailedAccountIDs, parsedReq.MetadataUserID, subject.UserID) if err != nil { if len(fs.FailedAccountIDs) == 0 { + markOpsRoutingCapacityLimitedIfNoAvailable(c, err) reqLog.Warn("gateway.select_account_no_available", zap.String("model", reqModel), zap.Int64p("group_id", currentAPIKey.GroupID), @@ -626,6 +629,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { accountReleaseFunc := selection.ReleaseFunc if !selection.Acquired { if selection.WaitPlan == nil { + markOpsRoutingCapacityLimited(c) reqLog.Warn("gateway.select_account_no_slot_no_wait_plan", zap.Int64("account_id", account.ID), zap.String("model", reqModel), @@ -1542,6 +1546,7 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) { account, err := h.gatewayService.SelectAccountForModel(c.Request.Context(), apiKey.GroupID, sessionHash, parsedReq.Model) if err != nil { reqLog.Warn("gateway.count_tokens_select_account_failed", zap.Error(err)) + markOpsRoutingCapacityLimitedIfNoAvailable(c, err) h.errorResponse(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable") return } diff --git a/backend/internal/handler/gateway_handler_chat_completions.go b/backend/internal/handler/gateway_handler_chat_completions.go index c6b73190..00c8ac37 100644 --- a/backend/internal/handler/gateway_handler_chat_completions.go +++ b/backend/internal/handler/gateway_handler_chat_completions.go @@ -169,6 +169,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) { selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionHash, reqModel, fs.FailedAccountIDs, "", int64(0)) if err != nil { if len(fs.FailedAccountIDs) == 0 { + markOpsRoutingCapacityLimitedIfNoAvailable(c, err) h.chatCompletionsErrorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error()) return } @@ -194,6 +195,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) { accountReleaseFunc := selection.ReleaseFunc if !selection.Acquired { if selection.WaitPlan == nil { + markOpsRoutingCapacityLimited(c) h.chatCompletionsErrorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts") return } diff --git a/backend/internal/handler/gateway_handler_responses.go b/backend/internal/handler/gateway_handler_responses.go index a97f572d..b8a2af8e 100644 --- a/backend/internal/handler/gateway_handler_responses.go +++ b/backend/internal/handler/gateway_handler_responses.go @@ -174,6 +174,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) { selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionHash, reqModel, fs.FailedAccountIDs, "", int64(0)) if err != nil { if len(fs.FailedAccountIDs) == 0 { + markOpsRoutingCapacityLimitedIfNoAvailable(c, err) h.responsesErrorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error()) return } @@ -199,6 +200,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) { accountReleaseFunc := selection.ReleaseFunc if !selection.Acquired { if selection.WaitPlan == nil { + markOpsRoutingCapacityLimited(c) h.responsesErrorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts") return } diff --git a/backend/internal/handler/gemini_v1beta_handler.go b/backend/internal/handler/gemini_v1beta_handler.go index 90ebe9ec..3395eeec 100644 --- a/backend/internal/handler/gemini_v1beta_handler.go +++ b/backend/internal/handler/gemini_v1beta_handler.go @@ -61,6 +61,7 @@ func (h *GatewayHandler) GeminiV1BetaListModels(c *gin.Context) { c.JSON(http.StatusOK, gemini.FallbackModelsList()) return } + markOpsRoutingCapacityLimitedIfNoAvailable(c, err) googleError(c, http.StatusServiceUnavailable, "No available Gemini accounts: "+err.Error()) return } @@ -113,6 +114,7 @@ func (h *GatewayHandler) GeminiV1BetaGetModel(c *gin.Context) { c.JSON(http.StatusOK, gemini.FallbackModel(modelName)) return } + markOpsRoutingCapacityLimitedIfNoAvailable(c, err) googleError(c, http.StatusServiceUnavailable, "No available Gemini accounts: "+err.Error()) return } @@ -372,6 +374,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, modelName, fs.FailedAccountIDs, "", int64(0)) // Gemini 不使用会话限制 if err != nil { if len(fs.FailedAccountIDs) == 0 { + markOpsRoutingCapacityLimitedIfNoAvailable(c, err) googleError(c, http.StatusServiceUnavailable, "No available Gemini accounts: "+err.Error()) return } @@ -419,6 +422,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { accountReleaseFunc := selection.ReleaseFunc if !selection.Acquired { if selection.WaitPlan == nil { + markOpsRoutingCapacityLimited(c) googleError(c, http.StatusServiceUnavailable, "No available Gemini accounts") return } diff --git a/backend/internal/handler/openai_chat_completions.go b/backend/internal/handler/openai_chat_completions.go index de384710..c85cd35d 100644 --- a/backend/internal/handler/openai_chat_completions.go +++ b/backend/internal/handler/openai_chat_completions.go @@ -143,6 +143,7 @@ func (h *OpenAIGatewayHandler) ChatCompletions(c *gin.Context) { zap.Int("excluded_account_count", len(failedAccountIDs)), ) if len(failedAccountIDs) == 0 { + markOpsRoutingCapacityLimitedIfNoAvailable(c, err) h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable", streamStarted) return } else { @@ -155,6 +156,7 @@ func (h *OpenAIGatewayHandler) ChatCompletions(c *gin.Context) { } } if selection == nil || selection.Account == nil { + markOpsRoutingCapacityLimited(c) h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted) return } diff --git a/backend/internal/handler/openai_gateway_handler.go b/backend/internal/handler/openai_gateway_handler.go index 6b07b7ba..dcd737af 100644 --- a/backend/internal/handler/openai_gateway_handler.go +++ b/backend/internal/handler/openai_gateway_handler.go @@ -282,6 +282,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) { zap.Int("excluded_account_count", len(failedAccountIDs)), ) if len(failedAccountIDs) == 0 { + markOpsRoutingCapacityLimitedIfNoAvailable(c, err) if errors.Is(err, service.ErrNoAvailableCompactAccounts) { h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "compact_not_supported", "No available OpenAI accounts support /responses/compact", streamStarted) return @@ -297,6 +298,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) { return } if selection == nil || selection.Account == nil { + markOpsRoutingCapacityLimited(c) h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted) return } @@ -677,6 +679,7 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) { ) if len(failedAccountIDs) == 0 { if err != nil { + markOpsRoutingCapacityLimitedIfNoAvailable(c, err) h.anthropicStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable", streamStarted) return } @@ -690,6 +693,7 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) { } } if selection == nil || selection.Account == nil { + markOpsRoutingCapacityLimited(c) h.anthropicStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted) return } @@ -992,6 +996,7 @@ func (h *OpenAIGatewayHandler) acquireResponsesAccountSlot( reqLog *zap.Logger, ) (func(), bool) { if selection == nil || selection.Account == nil { + markOpsRoutingCapacityLimited(c) h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", *streamStarted) return nil, false } @@ -1002,6 +1007,7 @@ func (h *OpenAIGatewayHandler) acquireResponsesAccountSlot( return wrapReleaseOnDone(ctx, selection.ReleaseFunc), true } if selection.WaitPlan == nil { + markOpsRoutingCapacityLimited(c) h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", *streamStarted) return nil, false } diff --git a/backend/internal/handler/openai_images.go b/backend/internal/handler/openai_images.go index 08a6b6e8..be19a035 100644 --- a/backend/internal/handler/openai_images.go +++ b/backend/internal/handler/openai_images.go @@ -157,6 +157,7 @@ func (h *OpenAIGatewayHandler) Images(c *gin.Context) { zap.Int("excluded_account_count", len(failedAccountIDs)), ) if len(failedAccountIDs) == 0 { + markOpsRoutingCapacityLimitedIfNoAvailable(c, err) h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available compatible accounts", streamStarted) return } @@ -168,6 +169,7 @@ func (h *OpenAIGatewayHandler) Images(c *gin.Context) { return } if selection == nil || selection.Account == nil { + markOpsRoutingCapacityLimited(c) h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available compatible accounts", streamStarted) return } diff --git a/backend/internal/handler/ops_error_logger.go b/backend/internal/handler/ops_error_logger.go index 93554912..398124cc 100644 --- a/backend/internal/handler/ops_error_logger.go +++ b/backend/internal/handler/ops_error_logger.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "log" "runtime" "runtime/debug" @@ -22,10 +23,11 @@ import ( ) const ( - opsModelKey = "ops_model" - opsStreamKey = "ops_stream" - opsRequestBodyKey = "ops_request_body" - opsAccountIDKey = "ops_account_id" + opsModelKey = "ops_model" + opsStreamKey = "ops_stream" + opsRequestBodyKey = "ops_request_body" + opsAccountIDKey = "ops_account_id" + opsRoutingCapacityLimitedKey = "ops_routing_capacity_limited" opsUpstreamModelKey = "ops_upstream_model" opsRequestTypeKey = "ops_request_type" @@ -45,6 +47,8 @@ const ( opsCodeSubscriptionNotFound = "SUBSCRIPTION_NOT_FOUND" opsCodeSubscriptionInvalid = "SUBSCRIPTION_INVALID" opsCodeUserInactive = "USER_INACTIVE" + opsCodeInvalidAPIKey = "INVALID_API_KEY" + opsCodeAPIKeyRequired = "API_KEY_REQUIRED" ) const ( @@ -393,6 +397,42 @@ func setOpsSelectedAccount(c *gin.Context, accountID int64, platform ...string) } } +func markOpsRoutingCapacityLimited(c *gin.Context) { + if c == nil { + return + } + c.Set(opsRoutingCapacityLimitedKey, true) +} + +func markOpsRoutingCapacityLimitedIfNoAvailable(c *gin.Context, err error) { + if !isOpsNoAvailableAccountError(err) { + return + } + markOpsRoutingCapacityLimited(c) +} + +func isOpsRoutingCapacityLimited(c *gin.Context) bool { + if c == nil { + return false + } + v, ok := c.Get(opsRoutingCapacityLimitedKey) + if !ok { + return false + } + marked, _ := v.(bool) + return marked +} + +func isOpsNoAvailableAccountError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, service.ErrNoAvailableAccounts) || errors.Is(err, service.ErrNoAvailableCompactAccounts) { + return true + } + return isOpsNoAvailableAccountMessage(err.Error()) +} + type opsCaptureWriter struct { gin.ResponseWriter limit int @@ -775,11 +815,7 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc { normalizedType := normalizeOpsErrorType(parsed.ErrorType, parsed.Code) - phase := classifyOpsPhase(normalizedType, parsed.Message, parsed.Code) - isBusinessLimited := classifyOpsIsBusinessLimited(normalizedType, phase, parsed.Code, status, parsed.Message) - - errorOwner := classifyOpsErrorOwner(phase, parsed.Message) - errorSource := classifyOpsErrorSource(phase, parsed.Message) + phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog(c, normalizedType, parsed.Message, parsed.Code, status) entry := &service.OpsInsertErrorLogInput{ RequestID: requestID, @@ -1114,6 +1150,9 @@ func classifyOpsPhase(errType, message, code string) string { msg := strings.ToLower(message) // Standardized phases: request|auth|routing|upstream|network|internal // Map billing/concurrency/response => request; scheduling => routing. + if isOpsClientAuthError(code, msg) { + return "auth" + } switch strings.TrimSpace(code) { case opsCodeInsufficientBalance, opsCodeUsageLimitExceeded, opsCodeSubscriptionNotFound, opsCodeSubscriptionInvalid: return "request" @@ -1134,7 +1173,7 @@ func classifyOpsPhase(errType, message, code string) string { case "upstream_error", "overloaded_error": return "upstream" case "api_error": - if strings.Contains(msg, opsErrNoAvailableAccounts) { + if isOpsNoAvailableAccountMessage(msg) { return "routing" } return "internal" @@ -1178,7 +1217,27 @@ func classifyOpsIsRetryable(errType string, statusCode int) bool { } } -func classifyOpsIsBusinessLimited(errType, phase, code string, status int, message string) bool { +func classifyOpsErrorLog(c *gin.Context, errType, message, code string, status int) (phase string, isBusinessLimited bool, errorOwner string, errorSource string) { + phase = classifyOpsPhase(errType, message, code) + routingCapacityLimited := isOpsRoutingCapacityLimited(c) + upstreamError := hasOpsUpstreamErrorContext(c) + if upstreamError && !routingCapacityLimited { + phase = "upstream" + } + if routingCapacityLimited { + phase = "routing" + } + localClientAuthError := !upstreamError && phase == "auth" && isOpsClientAuthError(code, strings.ToLower(message)) + isBusinessLimited = routingCapacityLimited || classifyOpsIsBusinessLimited(errType, phase, code, status, message, localClientAuthError) + errorOwner = classifyOpsErrorOwner(phase, message) + errorSource = classifyOpsErrorSource(phase, message) + return phase, isBusinessLimited, errorOwner, errorSource +} + +func classifyOpsIsBusinessLimited(errType, phase, code string, status int, message string, localClientAuthError ...bool) bool { + if len(localClientAuthError) > 0 && localClientAuthError[0] { + return true + } switch strings.TrimSpace(code) { case opsCodeInsufficientBalance, opsCodeUsageLimitExceeded, opsCodeSubscriptionNotFound, opsCodeSubscriptionInvalid, opsCodeUserInactive: return true @@ -1195,6 +1254,47 @@ func classifyOpsIsBusinessLimited(errType, phase, code string, status int, messa return false } +func isOpsClientAuthError(code string, msg string) bool { + switch strings.TrimSpace(code) { + case opsCodeInvalidAPIKey, opsCodeAPIKeyRequired: + return true + } + return strings.Contains(msg, "invalid api key") || strings.Contains(msg, "api key is required") +} + +func hasOpsUpstreamErrorContext(c *gin.Context) bool { + if c == nil { + return false + } + if v, ok := c.Get(service.OpsUpstreamStatusCodeKey); ok { + switch code := v.(type) { + case int: + if code > 0 { + return true + } + case int64: + if code > 0 { + return true + } + } + } + if v, ok := c.Get(service.OpsUpstreamErrorsKey); ok { + if events, ok := v.([]*service.OpsUpstreamErrorEvent); ok && len(events) > 0 { + return true + } + } + return false +} + +func isOpsNoAvailableAccountMessage(message string) bool { + msg := strings.ToLower(message) + return strings.Contains(msg, opsErrNoAvailableAccounts) || + strings.Contains(msg, "no available account") || + strings.Contains(msg, "no available gemini accounts") || + strings.Contains(msg, "no available openai accounts") || + strings.Contains(msg, "no available compatible accounts") +} + func classifyOpsErrorOwner(phase string, message string) string { // Standardized owners: client|provider|platform switch phase { diff --git a/backend/internal/handler/ops_error_logger_test.go b/backend/internal/handler/ops_error_logger_test.go index 6ae45110..e1df03cc 100644 --- a/backend/internal/handler/ops_error_logger_test.go +++ b/backend/internal/handler/ops_error_logger_test.go @@ -275,6 +275,187 @@ func TestNormalizeOpsErrorType(t *testing.T) { } } +func TestClassifyOpsNoAvailableAccountsExcludedFromSLA(t *testing.T) { + const message = "No available accounts" + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + + markOpsRoutingCapacityLimited(c) + + errType := normalizeOpsErrorType("api_error", "") + phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog(c, errType, message, "", http.StatusServiceUnavailable) + + require.Equal(t, "api_error", errType) + require.Equal(t, "routing", phase) + require.True(t, isBusinessLimited) + require.Equal(t, "platform", errorOwner) + require.Equal(t, "gateway", errorSource) +} + +func TestClassifyOpsRoutingCapacityMarkerExcludesMaskedSelectionFailureFromSLA(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + + markOpsRoutingCapacityLimited(c) + + phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog( + c, + "api_error", + "Service temporarily unavailable", + "", + http.StatusServiceUnavailable, + ) + + require.Equal(t, "routing", phase) + require.True(t, isBusinessLimited) + require.Equal(t, "platform", errorOwner) + require.Equal(t, "gateway", errorSource) +} + +func TestClassifyOpsAuthClientErrorsExcludedFromSLA(t *testing.T) { + tests := []struct { + name string + errType string + message string + code string + status int + }{ + { + name: "standard invalid API key", + errType: "api_error", + message: "Invalid API key", + code: "INVALID_API_KEY", + status: http.StatusUnauthorized, + }, + { + name: "standard missing API key", + errType: "api_error", + message: "API key is required in Authorization header (Bearer scheme), x-api-key header, or x-goog-api-key header", + code: "API_KEY_REQUIRED", + status: http.StatusUnauthorized, + }, + { + name: "google invalid API key", + errType: "api_error", + message: "Invalid API key", + code: "401", + status: http.StatusUnauthorized, + }, + { + name: "google missing API key", + errType: "api_error", + message: "API key is required", + code: "401", + status: http.StatusUnauthorized, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + + errType := normalizeOpsErrorType(tt.errType, tt.code) + phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog(c, errType, tt.message, tt.code, tt.status) + + require.Equal(t, "api_error", errType) + require.Equal(t, "auth", phase) + require.True(t, isBusinessLimited) + require.Equal(t, "client", errorOwner) + require.Equal(t, "client_request", errorSource) + }) + } +} + +func TestClassifyOpsUnsupportedModelExcludedFromSLA(t *testing.T) { + tests := []string{ + "No available accounts: no available accounts supporting model: made-up-model", + "No available accounts: no available OpenAI accounts supporting model: made-up-model", + "No available Gemini accounts: no available Gemini accounts supporting model: made-up-model", + "No available accounts: no available accounts supporting model: made-up-model (channel pricing restriction)", + } + + for _, message := range tests { + t.Run(message, func(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + markOpsRoutingCapacityLimited(c) + + errType := normalizeOpsErrorType("api_error", "") + phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog(c, errType, message, "", http.StatusServiceUnavailable) + + require.Equal(t, "api_error", errType) + require.Equal(t, "routing", phase) + require.True(t, isBusinessLimited) + require.Equal(t, "platform", errorOwner) + require.Equal(t, "gateway", errorSource) + }) + } +} + +func TestClassifyOpsUnmarkedNoAvailableTextStillCountsForSLA(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + + phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog( + c, + "api_error", + "No available accounts", + "", + http.StatusServiceUnavailable, + ) + + require.Equal(t, "routing", phase) + require.False(t, isBusinessLimited) + require.Equal(t, "platform", errorOwner) + require.Equal(t, "gateway", errorSource) +} + +func TestClassifyOpsUpstreamAuthTextStillCountsForSLA(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + service.SetOpsUpstreamError(c, http.StatusUnauthorized, "Invalid API key", "") + + phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog( + c, + "api_error", + "Invalid API key", + "401", + http.StatusUnauthorized, + ) + + require.Equal(t, "upstream", phase) + require.False(t, isBusinessLimited) + require.Equal(t, "provider", errorOwner) + require.Equal(t, "upstream_http", errorSource) +} + +func TestClassifyOpsUpstreamNoAvailableTextStillCountsForSLA(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + service.SetOpsUpstreamError(c, http.StatusServiceUnavailable, "No available accounts", "") + + phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog( + c, + "api_error", + "No available accounts", + "", + http.StatusServiceUnavailable, + ) + + require.Equal(t, "upstream", phase) + require.False(t, isBusinessLimited) + require.Equal(t, "provider", errorOwner) + require.Equal(t, "upstream_http", errorSource) +} + func TestSetOpsEndpointContext_SetsContextKeys(t *testing.T) { gin.SetMode(gin.TestMode) rec := httptest.NewRecorder() diff --git a/backend/internal/payment/provider/alipay.go b/backend/internal/payment/provider/alipay.go index 1234b568..c4c6e634 100644 --- a/backend/internal/payment/provider/alipay.go +++ b/backend/internal/payment/provider/alipay.go @@ -105,10 +105,16 @@ func (a *Alipay) MerchantIdentityMetadata() map[string]string { // CreatePayment creates an Alipay payment using the following routing: // - Mobile (H5): alipay.trade.wap.pay — browser redirect into Alipay. -// - Desktop: prefer alipay.trade.precreate to get a scan payload directly. -// - Desktop fallback: if precreate is unavailable for the merchant, fall back -// to alipay.trade.page.pay and expose both pay_url and qr_code so the -// frontend can render a QR while still allowing direct page open. +// - Desktop, default: prefer alipay.trade.precreate (FACE_TO_FACE_PAYMENT) to +// get a scannable QR payload. If precreate is unavailable for the merchant, +// fall back to alipay.trade.page.pay and expose pay_url only — the frontend +// opens the Alipay checkout in a new tab. +// - Desktop, paymentMode == "redirect": skip precreate and go straight to +// alipay.trade.page.pay so the frontend always opens the Alipay checkout +// in a new tab. Use this when the merchant has not enabled FACE_TO_FACE_PAYMENT. +// +// Note: alipay.trade.page.pay returns a checkout page URL, not a scannable +// payment QR. Never expose it via the QRCode field. func (a *Alipay) CreatePayment(ctx context.Context, req payment.CreatePaymentRequest) (*payment.CreatePaymentResponse, error) { client, err := a.getClient() if err != nil { @@ -150,6 +156,13 @@ func (a *Alipay) createWapTrade(client *alipay.Client, req payment.CreatePayment } func (a *Alipay) createDesktopTrade(ctx context.Context, client *alipay.Client, req payment.CreatePaymentRequest, notifyURL, returnURL string) (*payment.CreatePaymentResponse, error) { + // Explicit redirect mode: merchant opted into "always open the Alipay + // checkout page in a new tab" via the provider instance's payment_mode. + // Skip precreate to avoid a wasted API call. + if strings.EqualFold(strings.TrimSpace(a.config["paymentMode"]), "redirect") { + return a.createPagePayTrade(client, req, notifyURL, returnURL) + } + resp, precreateErr := a.createPrecreateTrade(ctx, client, req, notifyURL) if precreateErr == nil { return resp, nil @@ -204,10 +217,12 @@ func (a *Alipay) createPagePayTrade(client *alipay.Client, req payment.CreatePay if err != nil { return nil, fmt.Errorf("alipay TradePagePay: %w", err) } + // Only PayURL is exposed: alipay.trade.page.pay returns a checkout page URL + // that must be opened in a browser, not a scannable payment QR. Setting it + // as QRCode would let the frontend render an unscannable image. return &payment.CreatePaymentResponse{ TradeNo: req.OrderID, PayURL: payURL.String(), - QRCode: payURL.String(), }, nil } diff --git a/backend/internal/payment/provider/alipay_test.go b/backend/internal/payment/provider/alipay_test.go index fdc8eec1..9f8aec53 100644 --- a/backend/internal/payment/provider/alipay_test.go +++ b/backend/internal/payment/provider/alipay_test.go @@ -189,8 +189,63 @@ func TestCreateTradeUsesPagePayForDesktop(t *testing.T) { if resp.PayURL == "" { t.Fatal("expected pay_url for desktop page pay") } - if resp.QRCode != resp.PayURL { - t.Fatalf("qr_code = %q, want same as pay_url %q", resp.QRCode, resp.PayURL) + // page.pay returns a checkout page URL, not a scannable QR payload — + // it must never be exposed via QRCode (the frontend would render an + // unscannable image from it). + if resp.QRCode != "" { + t.Fatalf("qr_code = %q, want empty for page pay", resp.QRCode) + } +} + +// When the provider instance is configured with paymentMode == "redirect", +// the desktop flow must skip precreate and go straight to page.pay. +func TestCreateTradeRedirectModeSkipsPrecreate(t *testing.T) { + origPreCreate := alipayTradePreCreate + origPagePay := alipayTradePagePay + t.Cleanup(func() { + alipayTradePreCreate = origPreCreate + alipayTradePagePay = origPagePay + }) + + preCreateCalls := 0 + pagePayCalls := 0 + alipayTradePreCreate = func(ctx context.Context, client *alipay.Client, param alipay.TradePreCreate) (*alipay.TradePreCreateRsp, error) { + preCreateCalls++ + return &alipay.TradePreCreateRsp{ + Error: alipay.Error{Code: alipay.CodeSuccess}, + QRCode: "https://qr.alipay.example.com/precreate-token", + }, nil + } + alipayTradePagePay = func(client *alipay.Client, param alipay.TradePagePay) (*url.URL, error) { + pagePayCalls++ + if param.ProductCode != alipayProductCodePagePay { + t.Fatalf("product_code = %q, want %q", param.ProductCode, alipayProductCodePagePay) + } + return url.Parse("https://openapi.alipay.com/gateway.do?page-pay") + } + + provider := &Alipay{ + config: map[string]string{"paymentMode": "redirect"}, + } + resp, err := provider.createDesktopTrade(context.Background(), &alipay.Client{}, payment.CreatePaymentRequest{ + OrderID: "sub2_103", + Amount: "12.00", + Subject: "Balance recharge", + }, "https://merchant.example.com/api/v1/payment/webhook/alipay", "https://merchant.example.com/payment/result") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if preCreateCalls != 0 { + t.Fatalf("precreate calls = %d, want 0 (redirect mode must skip precreate)", preCreateCalls) + } + if pagePayCalls != 1 { + t.Fatalf("page pay calls = %d, want 1", pagePayCalls) + } + if resp.PayURL == "" { + t.Fatal("expected pay_url for redirect mode") + } + if resp.QRCode != "" { + t.Fatalf("qr_code = %q, want empty for redirect mode", resp.QRCode) } } diff --git a/backend/internal/pkg/apicompat/types.go b/backend/internal/pkg/apicompat/types.go index f9cd5a1c..df75ce50 100644 --- a/backend/internal/pkg/apicompat/types.go +++ b/backend/internal/pkg/apicompat/types.go @@ -306,6 +306,37 @@ type ResponsesUsage struct { OutputTokensDetails *ResponsesOutputTokensDetails `json:"output_tokens_details,omitempty"` } +func (u *ResponsesUsage) UnmarshalJSON(data []byte) error { + type responsesUsageAlias ResponsesUsage + var aux struct { + responsesUsageAlias + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + PromptTokensDetails *ResponsesInputTokensDetails `json:"prompt_tokens_details,omitempty"` + CompletionTokensDetails *ResponsesOutputTokensDetails `json:"completion_tokens_details,omitempty"` + } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + *u = ResponsesUsage(aux.responsesUsageAlias) + if u.InputTokens == 0 && aux.PromptTokens != 0 { + u.InputTokens = aux.PromptTokens + } + if u.OutputTokens == 0 && aux.CompletionTokens != 0 { + u.OutputTokens = aux.CompletionTokens + } + if u.InputTokensDetails == nil && aux.PromptTokensDetails != nil { + u.InputTokensDetails = aux.PromptTokensDetails + } + if u.OutputTokensDetails == nil && aux.CompletionTokensDetails != nil { + u.OutputTokensDetails = aux.CompletionTokensDetails + } + if u.TotalTokens == 0 && (u.InputTokens != 0 || u.OutputTokens != 0) { + u.TotalTokens = u.InputTokens + u.OutputTokens + } + return nil +} + // ResponsesInputTokensDetails breaks down input token usage. type ResponsesInputTokensDetails struct { CachedTokens int `json:"cached_tokens,omitempty"` diff --git a/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go b/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go index 428231ee..87cfc4f8 100644 --- a/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go +++ b/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go @@ -1138,6 +1138,99 @@ func TestGatewayService_AnthropicAPIKeyPassthrough_StreamingDataIntervalTimeout( require.False(t, result.clientDisconnect) } +func TestGatewayService_AnthropicAPIKeyPassthrough_StreamingSendsKeepaliveDuringIdle(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + svc := &GatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{ + StreamKeepaliveInterval: 1, + MaxLineSize: defaultMaxLineSize, + }, + }, + rateLimitService: &RateLimitService{}, + } + + pr, pw := io.Pipe() + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: pr, + } + + done := make(chan struct{}) + go func() { + defer close(done) + time.Sleep(1200 * time.Millisecond) + _, _ = pw.Write([]byte(strings.Join([]string{ + `data: {"type":"message_start","message":{"usage":{"input_tokens":3}}}`, + "", + `data: {"type":"message_delta","usage":{"output_tokens":2}}`, + "", + "data: [DONE]", + "", + }, "\n"))) + _ = pw.Close() + }() + + result, err := svc.handleStreamingResponseAnthropicAPIKeyPassthrough(context.Background(), resp, c, &Account{ID: 8}, time.Now(), "claude-3-7-sonnet-20250219") + _ = pr.Close() + <-done + + require.NoError(t, err) + require.NotNil(t, result) + require.Contains(t, rec.Body.String(), "event: ping\ndata: {\"type\": \"ping\"}\n\n") + require.Contains(t, rec.Body.String(), "data: [DONE]") +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_StreamingKeepaliveDoesNotInterleavePartialEvent(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + svc := &GatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{ + StreamKeepaliveInterval: 1, + MaxLineSize: defaultMaxLineSize, + }, + }, + rateLimitService: &RateLimitService{}, + } + + pr, pw := io.Pipe() + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: pr, + } + + done := make(chan struct{}) + go func() { + defer close(done) + _, _ = pw.Write([]byte(`data: {"type":"message_start","message":{"usage":{"input_tokens":4}}}` + "\n")) + time.Sleep(1200 * time.Millisecond) + _, _ = pw.Write([]byte("\n")) + _, _ = pw.Write([]byte("data: [DONE]\n\n")) + _ = pw.Close() + }() + + result, err := svc.handleStreamingResponseAnthropicAPIKeyPassthrough(context.Background(), resp, c, &Account{ID: 9}, time.Now(), "claude-3-7-sonnet-20250219") + _ = pr.Close() + <-done + + require.NoError(t, err) + require.NotNil(t, result) + body := rec.Body.String() + require.NotContains(t, body, `data: {"type":"message_start","message":{"usage":{"input_tokens":4}}}`+"\n"+"event: ping") + require.NotContains(t, body, "event: ping") + require.Contains(t, body, "data: [DONE]") +} + func TestGatewayService_AnthropicAPIKeyPassthrough_StreamingReadError(t *testing.T) { gin.SetMode(gin.TestMode) rec := httptest.NewRecorder() diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 6151d78e..b8cbf715 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -5357,6 +5357,22 @@ func (s *GatewayService) handleStreamingResponseAnthropicAPIKeyPassthrough( intervalCh = intervalTicker.C } + keepaliveInterval := time.Duration(0) + if s.cfg != nil && s.cfg.Gateway.StreamKeepaliveInterval > 0 { + keepaliveInterval = time.Duration(s.cfg.Gateway.StreamKeepaliveInterval) * time.Second + } + var keepaliveTicker *time.Ticker + if keepaliveInterval > 0 { + keepaliveTicker = time.NewTicker(keepaliveInterval) + defer keepaliveTicker.Stop() + } + var keepaliveCh <-chan time.Time + if keepaliveTicker != nil { + keepaliveCh = keepaliveTicker.C + } + lastDataAt := time.Now() + inPartialEvent := false + for { select { case ev, ok := <-events: @@ -5422,6 +5438,10 @@ func (s *GatewayService) handleStreamingResponseAnthropicAPIKeyPassthrough( } else if line == "" { // 按 SSE 事件边界刷出,减少每行 flush 带来的 syscall 开销。 flusher.Flush() + lastDataAt = time.Now() + inPartialEvent = false + } else { + inPartialEvent = true } } @@ -5438,6 +5458,21 @@ func (s *GatewayService) handleStreamingResponseAnthropicAPIKeyPassthrough( s.rateLimitService.HandleStreamTimeout(ctx, account, model) } return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout") + + case <-keepaliveCh: + if clientDisconnected || inPartialEvent { + continue + } + if time.Since(lastDataAt) < keepaliveInterval { + continue + } + if _, err := fmt.Fprint(w, "event: ping\ndata: {\"type\": \"ping\"}\n\n"); err != nil { + clientDisconnected = true + logger.LegacyPrintf("service.gateway", "[Anthropic passthrough] Client disconnected during keepalive ping, continue draining upstream for usage: account=%d", account.ID) + continue + } + flusher.Flush() + lastDataAt = time.Now() } } } diff --git a/backend/internal/service/openai_codex_transform.go b/backend/internal/service/openai_codex_transform.go index a3b69dee..5b5f2bc3 100644 --- a/backend/internal/service/openai_codex_transform.go +++ b/backend/internal/service/openai_codex_transform.go @@ -1030,7 +1030,7 @@ func filterCodexInputWithOptions(input []any, opts codexInputFilterOptions) []an return id } if strings.HasPrefix(id, "call_") { - return "fc" + strings.TrimPrefix(id, "call_") + return "fc_" + strings.TrimPrefix(id, "call_") } return "fc_" + id } diff --git a/backend/internal/service/openai_codex_transform_test.go b/backend/internal/service/openai_codex_transform_test.go index 9c72760a..6fc7f49d 100644 --- a/backend/internal/service/openai_codex_transform_test.go +++ b/backend/internal/service/openai_codex_transform_test.go @@ -41,7 +41,7 @@ func TestApplyCodexOAuthTransform_ToolContinuationPreservesInput(t *testing.T) { second, ok := input[1].(map[string]any) require.True(t, ok) require.Equal(t, "o1", second["id"]) - require.Equal(t, "fc1", second["call_id"]) + require.Equal(t, "fc_1", second["call_id"]) } func TestApplyCodexOAuthTransform_MessagesBridgePromptCacheKeyIsHeaderOnly(t *testing.T) { @@ -120,11 +120,11 @@ func TestApplyCodexOAuthTransform_ToolContinuationNormalizesToolReferenceIDsOnly first, ok := input[0].(map[string]any) require.True(t, ok) - require.Equal(t, "fc1", first["id"]) + require.Equal(t, "fc_1", first["id"]) second, ok := input[1].(map[string]any) require.True(t, ok) - require.Equal(t, "fc1", second["call_id"]) + require.Equal(t, "fc_1", second["call_id"]) } func TestApplyCodexOAuthTransform_ToolSearchOutputPreservesCallID(t *testing.T) { @@ -144,7 +144,7 @@ func TestApplyCodexOAuthTransform_ToolSearchOutputPreservesCallID(t *testing.T) first, ok := input[0].(map[string]any) require.True(t, ok) require.Equal(t, "tool_search_output", first["type"]) - require.Equal(t, "fc1", first["call_id"]) + require.Equal(t, "fc_1", first["call_id"]) } func TestApplyCodexOAuthTransform_CustomAndMCPToolOutputsPreserveCallID(t *testing.T) { @@ -164,11 +164,11 @@ func TestApplyCodexOAuthTransform_CustomAndMCPToolOutputsPreserveCallID(t *testi first, ok := input[0].(map[string]any) require.True(t, ok) - require.Equal(t, "fccustom", first["call_id"]) + require.Equal(t, "fc_custom", first["call_id"]) second, ok := input[1].(map[string]any) require.True(t, ok) - require.Equal(t, "fcmcp", second["call_id"]) + require.Equal(t, "fc_mcp", second["call_id"]) } func TestApplyCodexOAuthTransform_ImageAndWebSearchCallsDoNotGainCallID(t *testing.T) { @@ -221,7 +221,7 @@ func TestApplyCodexOAuthTransform_ConvertsToolRoleMessageToFunctionCallOutput(t item, ok := input[0].(map[string]any) require.True(t, ok) require.Equal(t, "function_call_output", item["type"]) - require.Equal(t, "fc1", item["call_id"]) + require.Equal(t, "fc_1", item["call_id"]) require.Equal(t, "ok", item["output"]) _, hasRole := item["role"] require.False(t, hasRole) @@ -340,7 +340,7 @@ func TestApplyCodexOAuthTransform_AddsFallbackNameForFunctionCallInput(t *testin require.True(t, ok) require.Equal(t, "function_call", item["type"]) require.Equal(t, "tool", item["name"]) - require.Equal(t, "fc1", item["call_id"]) + require.Equal(t, "fc_1", item["call_id"]) } func TestApplyCodexOAuthTransform_PreservesFunctionCallInputName(t *testing.T) { @@ -359,7 +359,7 @@ func TestApplyCodexOAuthTransform_PreservesFunctionCallInputName(t *testing.T) { item, ok := input[0].(map[string]any) require.True(t, ok) require.Equal(t, "shell", item["name"]) - require.Equal(t, "fc1", item["call_id"]) + require.Equal(t, "fc_1", item["call_id"]) } func TestApplyCodexOAuthTransform_PreservesMCPToolCallIDAndName(t *testing.T) { @@ -384,7 +384,7 @@ func TestApplyCodexOAuthTransform_PreservesMCPToolCallIDAndName(t *testing.T) { require.True(t, ok) require.Equal(t, "mcp_tool_call", item["type"]) require.Equal(t, "remote_tool", item["name"]) - require.Equal(t, "fcabc", item["call_id"]) + require.Equal(t, "fc_abc", item["call_id"]) } func TestCodexInputItemRequiresNameTypesAllowCallID(t *testing.T) { diff --git a/backend/internal/service/openai_compat_model_test.go b/backend/internal/service/openai_compat_model_test.go index e222b093..f8b9d360 100644 --- a/backend/internal/service/openai_compat_model_test.go +++ b/backend/internal/service/openai_compat_model_test.go @@ -183,6 +183,63 @@ func TestForwardAsAnthropic_NormalizesRoutingAndEffortForGpt54XHigh(t *testing.T t.Logf("response body: %s", rec.Body.String()) } +func TestForwardAsAnthropic_MappedClaudeModelAcceptsChatUsageShape(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"claude-opus-4-7","max_tokens":16,"messages":[{"role":"user","content":"compact this"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := strings.Join([]string{ + `data: {"type":"response.created","response":{"id":"resp_compact","model":"gpt-5.5","status":"in_progress","output":[]}}`, + "", + `data: {"type":"response.output_text.delta","delta":"ok"}`, + "", + `data: {"type":"response.completed","response":{"id":"resp_compact","object":"response","model":"gpt-5.5","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"prompt_tokens":31,"completion_tokens":9,"total_tokens":40,"prompt_tokens_details":{"cached_tokens":11}}}}`, + "", + "data: [DONE]", + "", + }, "\n") + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_compact_usage"}}, + Body: io.NopCloser(strings.NewReader(upstreamBody)), + }} + + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-apikey", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + "base_url": "https://api.openai.com/v1", + "model_mapping": map[string]any{ + "gpt-5.5": "gpt-5.5", + }, + }, + } + + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.5") + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, "claude-opus-4-7", result.Model) + require.Equal(t, "gpt-5.5", result.BillingModel) + require.Equal(t, "gpt-5.5", result.UpstreamModel) + require.Equal(t, 31, result.Usage.InputTokens) + require.Equal(t, 9, result.Usage.OutputTokens) + require.Equal(t, 11, result.Usage.CacheReadInputTokens) + require.Equal(t, "gpt-5.5", gjson.GetBytes(upstream.lastBody, "model").String()) +} + func TestForwardAsAnthropic_InjectsPromptCacheKeyForAPIKeyMessagesDispatch(t *testing.T) { t.Parallel() gin.SetMode(gin.TestMode) @@ -1360,6 +1417,135 @@ func TestForwardAsAnthropic_TerminalUsageWithoutUpstreamCloseReturns(t *testing. } } +func TestForwardAsAnthropic_EventNamedTerminalWithoutUpstreamCloseReturns(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Writer = &openAICompatFailingWriter{ResponseWriter: c.Writer, failAfter: 0} + body := []byte(`{"model":"gpt-5.4","max_tokens":16,"messages":[{"role":"user","content":"hello"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := []byte(strings.Join([]string{ + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}`, + ``, + ``, + }, "\n")) + upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) + defer func() { + require.NoError(t, upstreamStream.Close()) + }() + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_messages_event_named_terminal"}}, + Body: upstreamStream, + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + type forwardResult struct { + result *OpenAIForwardResult + err error + } + resultCh := make(chan forwardResult, 1) + go func() { + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.1") + resultCh <- forwardResult{result: result, err: err} + }() + + select { + case got := <-resultCh: + require.NoError(t, got.err) + require.NotNil(t, got.result) + require.Equal(t, 15, got.result.Usage.InputTokens) + require.Equal(t, 6, got.result.Usage.OutputTokens) + require.Equal(t, 5, got.result.Usage.CacheReadInputTokens) + case <-time.After(time.Second): + require.Fail(t, "ForwardAsAnthropic should use SSE event names when data payloads omit type") + } +} + +func TestForwardAsAnthropic_EventNamedTerminalWithKeepaliveReturns(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Writer = &openAICompatFailingWriter{ResponseWriter: c.Writer, failAfter: 0} + body := []byte(`{"model":"gpt-5.4","max_tokens":16,"messages":[{"role":"user","content":"hello"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := []byte(strings.Join([]string{ + `: upstream ping`, + ``, + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}`, + ``, + ``, + }, "\n")) + upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) + defer func() { + require.NoError(t, upstreamStream.Close()) + }() + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_messages_event_named_keepalive"}}, + Body: upstreamStream, + }} + + svc := &OpenAIGatewayService{ + cfg: &config.Config{Gateway: config.GatewayConfig{ + StreamKeepaliveInterval: 5, + }}, + httpUpstream: upstream, + } + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + type forwardResult struct { + result *OpenAIForwardResult + err error + } + resultCh := make(chan forwardResult, 1) + go func() { + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.1") + resultCh <- forwardResult{result: result, err: err} + }() + + select { + case got := <-resultCh: + require.NoError(t, got.err) + require.NotNil(t, got.result) + require.Equal(t, 15, got.result.Usage.InputTokens) + require.Equal(t, 6, got.result.Usage.OutputTokens) + require.Equal(t, 5, got.result.Usage.CacheReadInputTokens) + case <-time.After(time.Second): + require.Fail(t, "ForwardAsAnthropic keepalive path should use SSE event names when data payloads omit type") + } +} + func TestForwardAsAnthropic_BufferedTerminalWithoutUpstreamCloseReturns(t *testing.T) { gin.SetMode(gin.TestMode) @@ -1416,6 +1602,67 @@ func TestForwardAsAnthropic_BufferedTerminalWithoutUpstreamCloseReturns(t *testi } } +func TestForwardAsAnthropic_BufferedEventNamedTerminalWithoutUpstreamCloseReturns(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.4","max_tokens":16,"messages":[{"role":"user","content":"hello"}],"stream":false}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := []byte(strings.Join([]string{ + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}`, + ``, + ``, + }, "\n")) + upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) + defer func() { + require.NoError(t, upstreamStream.Close()) + }() + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_messages_buffered_event_named"}}, + Body: upstreamStream, + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + type forwardResult struct { + result *OpenAIForwardResult + err error + } + resultCh := make(chan forwardResult, 1) + go func() { + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.1") + resultCh <- forwardResult{result: result, err: err} + }() + + select { + case got := <-resultCh: + require.NoError(t, got.err) + require.NotNil(t, got.result) + require.Equal(t, 15, got.result.Usage.InputTokens) + require.Equal(t, 6, got.result.Usage.OutputTokens) + require.Equal(t, 5, got.result.Usage.CacheReadInputTokens) + require.Contains(t, rec.Body.String(), `"stop_reason":"end_turn"`) + case <-time.After(time.Second): + require.Fail(t, "ForwardAsAnthropic buffered response should use SSE event names when data payloads omit type") + } +} + func TestForwardAsAnthropic_DoneSentinelWithoutTerminalReturnsError(t *testing.T) { gin.SetMode(gin.TestMode) diff --git a/backend/internal/service/openai_gateway_chat_completions.go b/backend/internal/service/openai_gateway_chat_completions.go index 84d85c74..5b3c0e6f 100644 --- a/backend/internal/service/openai_gateway_chat_completions.go +++ b/backend/internal/service/openai_gateway_chat_completions.go @@ -554,6 +554,13 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( missingTerminalErr := func() (*OpenAIForwardResult, error) { return resultWithUsage(), fmt.Errorf("stream usage incomplete: missing terminal event") } + processFrame := func(frame openAICompatSSEFrame) bool { + payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType) + if strings.TrimSpace(payload) == "[DONE]" { + return false + } + return processDataLine(payload) + } // Determine keepalive interval keepaliveInterval := time.Duration(0) @@ -563,16 +570,17 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( // No keepalive: fast synchronous path if streamInterval <= 0 && keepaliveInterval <= 0 { + var parser openAICompatSSEFrameParser for scanner.Scan() { line := scanner.Text() - payload, ok := extractOpenAISSEDataLine(line) + frame, ok := parser.AddLine(line) if !ok { continue } - if strings.TrimSpace(payload) == "[DONE]" { + if strings.TrimSpace(frame.Data) == "[DONE]" { return missingTerminalErr() } - if processDataLine(payload) { + if processFrame(frame) { return finalizeStream() } } @@ -580,6 +588,14 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( handleScanErr(err) return resultWithUsage(), fmt.Errorf("stream usage incomplete: %w", err) } + if frame, ok := parser.Finish(); ok { + if strings.TrimSpace(frame.Data) == "[DONE]" { + return missingTerminalErr() + } + if processFrame(frame) { + return finalizeStream() + } + } return missingTerminalErr() } @@ -624,11 +640,20 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( keepaliveCh = keepaliveTicker.C } lastDataAt := time.Now() + var parser openAICompatSSEFrameParser for { select { case ev, ok := <-events: if !ok { + if frame, ok := parser.Finish(); ok { + if strings.TrimSpace(frame.Data) == "[DONE]" { + return missingTerminalErr() + } + if processFrame(frame) { + return finalizeStream() + } + } return missingTerminalErr() } if ev.err != nil { @@ -637,14 +662,14 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( } lastDataAt = time.Now() line := ev.line - payload, ok := extractOpenAISSEDataLine(line) + frame, ok := parser.AddLine(line) if !ok { continue } - if strings.TrimSpace(payload) == "[DONE]" { + if strings.TrimSpace(frame.Data) == "[DONE]" { return missingTerminalErr() } - if processDataLine(payload) { + if processFrame(frame) { return finalizeStream() } diff --git a/backend/internal/service/openai_gateway_chat_completions_test.go b/backend/internal/service/openai_gateway_chat_completions_test.go index b0d1fa31..a26091a3 100644 --- a/backend/internal/service/openai_gateway_chat_completions_test.go +++ b/backend/internal/service/openai_gateway_chat_completions_test.go @@ -236,6 +236,120 @@ func TestForwardAsChatCompletions_TerminalUsageWithoutUpstreamCloseReturns(t *te } } +func TestForwardAsChatCompletions_EventNamedTerminalWithoutUpstreamCloseReturns(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.4","messages":[{"role":"user","content":"hello"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := []byte(strings.Join([]string{ + `event: response.created`, + `data: {"response":{"id":"resp_1","model":"gpt-5.4","status":"in_progress","output":[]}}`, + ``, + `event: response.output_text.delta`, + `data: {"delta":"ok"}`, + ``, + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":17,"output_tokens":8,"total_tokens":25,"input_tokens_details":{"cached_tokens":6}}}}`, + ``, + ``, + }, "\n")) + upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) + defer func() { + require.NoError(t, upstreamStream.Close()) + }() + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_event_named_terminal"}}, + Body: upstreamStream, + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + type forwardResult struct { + result *OpenAIForwardResult + err error + } + resultCh := make(chan forwardResult, 1) + go func() { + result, err := svc.ForwardAsChatCompletions(context.Background(), c, account, body, "", "gpt-5.1") + resultCh <- forwardResult{result: result, err: err} + }() + + select { + case got := <-resultCh: + require.NoError(t, got.err) + require.NotNil(t, got.result) + require.Equal(t, 17, got.result.Usage.InputTokens) + require.Equal(t, 8, got.result.Usage.OutputTokens) + require.Equal(t, 6, got.result.Usage.CacheReadInputTokens) + require.Contains(t, rec.Body.String(), `"content":"ok"`) + case <-time.After(time.Second): + require.Fail(t, "ForwardAsChatCompletions should use SSE event names when data payloads omit type") + } +} + +func TestForwardAsChatCompletions_EventTypeDoesNotLeakAcrossFrames(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.4","messages":[{"role":"user","content":"hello"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := strings.Join([]string{ + `event: response.created`, + `data: {"response":{"id":"resp_1","model":"gpt-5.4","status":"in_progress","output":[]}}`, + ``, + `data: {"type":"response.output_text.delta","delta":"ok"}`, + ``, + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":17,"output_tokens":8,"total_tokens":25,"input_tokens_details":{"cached_tokens":6}}}}`, + ``, + `data: [DONE]`, + ``, + }, "\n") + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_event_boundary"}}, + Body: io.NopCloser(strings.NewReader(upstreamBody)), + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + result, err := svc.ForwardAsChatCompletions(context.Background(), c, account, body, "", "gpt-5.1") + require.NoError(t, err) + require.NotNil(t, result) + require.Contains(t, rec.Body.String(), `"content":"ok"`) + require.Contains(t, rec.Body.String(), `data: [DONE]`) +} + func TestForwardAsChatCompletions_BufferedTerminalWithoutUpstreamCloseReturns(t *testing.T) { gin.SetMode(gin.TestMode) diff --git a/backend/internal/service/openai_gateway_messages.go b/backend/internal/service/openai_gateway_messages.go index aefa8fd2..6d74f7dd 100644 --- a/backend/internal/service/openai_gateway_messages.go +++ b/backend/internal/service/openai_gateway_messages.go @@ -560,10 +560,24 @@ func (s *OpenAIGatewayService) readOpenAICompatBufferedTerminal( }() defer close(done) + var parser openAICompatSSEFrameParser for { select { case ev, ok := <-events: if !ok { + if frame, ok := parser.Finish(); ok { + payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType) + var event apicompat.ResponsesStreamEvent + if err := json.Unmarshal([]byte(payload), &event); err == nil { + acc.ProcessEvent(&event) + if isOpenAICompatResponsesTerminalEvent(event.Type) && event.Response != nil { + if event.Response.Usage != nil { + usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage) + } + return event.Response, usage, acc, nil + } + } + } return nil, usage, acc, nil } resetTimeout() @@ -580,10 +594,11 @@ func (s *OpenAIGatewayService) readOpenAICompatBufferedTerminal( if isOpenAICompatDoneSentinelLine(ev.line) { return nil, usage, acc, nil } - payload, ok := extractOpenAISSEDataLine(ev.line) - if !ok || payload == "" { + frame, ok := parser.AddLine(ev.line) + if !ok { continue } + payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType) var event apicompat.ResponsesStreamEvent if err := json.Unmarshal([]byte(payload), &event); err != nil { @@ -772,6 +787,10 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( missingTerminalErr := func() (*OpenAIForwardResult, error) { return resultWithUsage(), fmt.Errorf("stream usage incomplete: missing terminal event") } + processFrame := func(frame openAICompatSSEFrame) bool { + payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType) + return processDataLine(payload) + } // ── Determine keepalive interval ── keepaliveInterval := time.Duration(0) @@ -781,16 +800,17 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( // ── No keepalive: fast synchronous path (no goroutine overhead) ── if streamInterval <= 0 && keepaliveInterval <= 0 { + var parser openAICompatSSEFrameParser for scanner.Scan() { line := scanner.Text() if isOpenAICompatDoneSentinelLine(line) { return missingTerminalErr() } - payload, ok := extractOpenAISSEDataLine(line) + frame, ok := parser.AddLine(line) if !ok { continue } - if processDataLine(payload) { + if processFrame(frame) { return finalizeStream() } } @@ -798,6 +818,14 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( handleScanErr(err) return resultWithUsage(), fmt.Errorf("stream usage incomplete: %w", err) } + if frame, ok := parser.Finish(); ok { + if strings.TrimSpace(frame.Data) == "[DONE]" { + return missingTerminalErr() + } + if processFrame(frame) { + return finalizeStream() + } + } return missingTerminalErr() } @@ -842,12 +870,21 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( keepaliveCh = keepaliveTicker.C } lastDataAt := time.Now() + var parser openAICompatSSEFrameParser for { select { case ev, ok := <-events: if !ok { // Upstream closed + if frame, ok := parser.Finish(); ok { + if strings.TrimSpace(frame.Data) == "[DONE]" { + return missingTerminalErr() + } + if processFrame(frame) { + return finalizeStream() + } + } return missingTerminalErr() } if ev.err != nil { @@ -859,11 +896,11 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( if isOpenAICompatDoneSentinelLine(line) { return missingTerminalErr() } - payload, ok := extractOpenAISSEDataLine(line) + frame, ok := parser.AddLine(line) if !ok { continue } - if processDataLine(payload) { + if processFrame(frame) { return finalizeStream() } diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index e12b208e..07153083 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -1113,6 +1113,9 @@ func isOpenAITransientProcessingError(upstreamStatusCode int, upstreamMsg string if strings.Contains(lower, "an error occurred while processing your request") { return true } + if strings.Contains(lower, "selected model is at capacity") { + return true + } return strings.Contains(lower, "you can retry your request") && strings.Contains(lower, "help.openai.com") && strings.Contains(lower, "request id") @@ -3400,6 +3403,9 @@ func openAIStreamDataStartsClientOutput(data, eventType string) bool { } func openAIStreamFailedEventShouldFailover(payload []byte, message string) bool { + if isOpenAITransientProcessingError(http.StatusBadRequest, message, payload) { + return true + } code := strings.ToLower(strings.TrimSpace(gjson.GetBytes(payload, "response.error.code").String())) if code == "" { code = strings.ToLower(strings.TrimSpace(gjson.GetBytes(payload, "error.code").String())) @@ -4578,6 +4584,76 @@ func extractOpenAISSEDataLine(line string) (string, bool) { return line[start:], true } +func extractOpenAISSEEventLine(line string) (string, bool) { + if !strings.HasPrefix(line, "event:") { + return "", false + } + start := len("event:") + for start < len(line) { + if line[start] != ' ' && line[start] != ' ' { + break + } + start++ + } + return strings.TrimSpace(line[start:]), true +} + +type openAICompatSSEFrame struct { + EventType string + Data string +} + +type openAICompatSSEFrameParser struct { + eventType string + dataLines []string +} + +func (p *openAICompatSSEFrameParser) AddLine(line string) (openAICompatSSEFrame, bool) { + if line == "" { + return p.dispatch() + } + if strings.HasPrefix(line, ":") { + return openAICompatSSEFrame{}, false + } + if eventType, ok := extractOpenAISSEEventLine(line); ok { + p.eventType = eventType + return openAICompatSSEFrame{}, false + } + if data, ok := extractOpenAISSEDataLine(line); ok { + p.dataLines = append(p.dataLines, data) + } + return openAICompatSSEFrame{}, false +} + +func (p *openAICompatSSEFrameParser) Finish() (openAICompatSSEFrame, bool) { + return p.dispatch() +} + +func (p *openAICompatSSEFrameParser) dispatch() (openAICompatSSEFrame, bool) { + frame := openAICompatSSEFrame{ + EventType: p.eventType, + Data: strings.Join(p.dataLines, "\n"), + } + p.eventType = "" + p.dataLines = nil + return frame, frame.Data != "" +} + +func openAICompatPayloadWithEventType(payload, eventType string) string { + eventType = strings.TrimSpace(eventType) + if eventType == "" || strings.TrimSpace(payload) == "" || strings.TrimSpace(payload) == "[DONE]" { + return payload + } + if gjson.Get(payload, "type").Exists() { + return payload + } + patched, err := sjson.Set(payload, "type", eventType) + if err != nil { + return payload + } + return patched +} + func (s *OpenAIGatewayService) replaceModelInSSELine(line, fromModel, toModel string) string { data, ok := extractOpenAISSEDataLine(line) if !ok { @@ -4639,28 +4715,47 @@ func (s *OpenAIGatewayService) parseSSEUsageBytes(data []byte, usage *OpenAIUsag return } - usage.InputTokens = int(gjson.GetBytes(data, "response.usage.input_tokens").Int()) - usage.OutputTokens = int(gjson.GetBytes(data, "response.usage.output_tokens").Int()) - usage.CacheReadInputTokens = int(gjson.GetBytes(data, "response.usage.input_tokens_details.cached_tokens").Int()) - usage.ImageOutputTokens = int(gjson.GetBytes(data, "response.usage.output_tokens_details.image_tokens").Int()) + if parsedUsage, ok := extractOpenAIUsageFromJSONBytes(data); ok { + *usage = parsedUsage + } } func extractOpenAIUsageFromJSONBytes(body []byte) (OpenAIUsage, bool) { if len(body) == 0 || !gjson.ValidBytes(body) { return OpenAIUsage{}, false } - values := gjson.GetManyBytes( - body, - "usage.input_tokens", - "usage.output_tokens", - "usage.input_tokens_details.cached_tokens", - "usage.output_tokens_details.image_tokens", - ) + if usage, ok := openAIUsageFromGJSON(gjson.GetBytes(body, "usage")); ok { + return usage, true + } + return openAIUsageFromGJSON(gjson.GetBytes(body, "response.usage")) +} + +func openAIUsageFromGJSON(value gjson.Result) (OpenAIUsage, bool) { + if !value.Exists() || !value.IsObject() { + return OpenAIUsage{}, false + } + inputTokens := value.Get("input_tokens").Int() + if inputTokens == 0 { + inputTokens = value.Get("prompt_tokens").Int() + } + outputTokens := value.Get("output_tokens").Int() + if outputTokens == 0 { + outputTokens = value.Get("completion_tokens").Int() + } + cacheReadTokens := value.Get("input_tokens_details.cached_tokens").Int() + if cacheReadTokens == 0 { + cacheReadTokens = value.Get("prompt_tokens_details.cached_tokens").Int() + } + imageOutputTokens := value.Get("output_tokens_details.image_tokens").Int() + if imageOutputTokens == 0 { + imageOutputTokens = value.Get("completion_tokens_details.image_tokens").Int() + } return OpenAIUsage{ - InputTokens: int(values[0].Int()), - OutputTokens: int(values[1].Int()), - CacheReadInputTokens: int(values[2].Int()), - ImageOutputTokens: int(values[3].Int()), + InputTokens: int(inputTokens), + OutputTokens: int(outputTokens), + CacheCreationInputTokens: int(value.Get("cache_creation_input_tokens").Int()), + CacheReadInputTokens: int(cacheReadTokens), + ImageOutputTokens: int(imageOutputTokens), }, true } diff --git a/backend/internal/service/openai_gateway_service_codex_cli_only_test.go b/backend/internal/service/openai_gateway_service_codex_cli_only_test.go index fe58e92f..951860cd 100644 --- a/backend/internal/service/openai_gateway_service_codex_cli_only_test.go +++ b/backend/internal/service/openai_gateway_service_codex_cli_only_test.go @@ -218,6 +218,12 @@ func TestIsOpenAITransientProcessingError(t *testing.T) { nil, )) + require.True(t, isOpenAITransientProcessingError( + http.StatusBadRequest, + "Selected model is at capacity. Please try a different model.", + []byte(`{"error":{"message":"Selected model is at capacity. Please try a different model.","type":"invalid_request_error"}}`), + )) + require.True(t, isOpenAITransientProcessingError( http.StatusBadRequest, "", @@ -332,3 +338,55 @@ func TestOpenAIGatewayService_Forward_TransientProcessingErrorTriggersFailover(t require.Contains(t, string(failoverErr.ResponseBody), "An error occurred while processing your request") require.False(t, c.Writer.Written(), "service 层应返回 failover 错误给上层换号,而不是直接向客户端写响应") } + +func TestOpenAIGatewayService_Forward_ModelCapacityErrorTriggersFailoverAndSameAccountRetry(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil)) + c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0") + c.Request.Header.Set("Content-Type", "application/json") + + upstream := &httpUpstreamRecorder{ + resp: &http.Response{ + StatusCode: http.StatusBadRequest, + Header: http.Header{ + "Content-Type": []string{"application/json"}, + "x-request-id": []string{"rid-capacity-400"}, + }, + Body: io.NopCloser(strings.NewReader(`{"error":{"message":"Selected model is at capacity. Please try a different model.","type":"invalid_request_error"}}`)), + }, + } + svc := &OpenAIGatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{ForceCodexCLI: false}, + }, + httpUpstream: upstream, + } + account := &Account{ + ID: 1001, + Name: "codex max套餐", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + "pool_mode": true, + }, + Status: StatusActive, + Schedulable: true, + RateMultiplier: f64p(1), + } + body := []byte(`{"model":"gpt-5.4","stream":false,"input":[{"type":"text","text":"hello"}]}`) + + _, err := svc.Forward(context.Background(), c, account, body) + require.Error(t, err) + + var failoverErr *UpstreamFailoverError + require.ErrorAs(t, err, &failoverErr) + require.Equal(t, http.StatusBadRequest, failoverErr.StatusCode) + require.True(t, failoverErr.RetryableOnSameAccount) + require.Contains(t, string(failoverErr.ResponseBody), "Selected model is at capacity") + require.False(t, c.Writer.Written(), "service 层应返回 failover 错误给上层重试/换号,而不是直接向客户端写响应") +} diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index 84a2fe71..013d7a08 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -1116,6 +1116,47 @@ func TestOpenAIStreamingResponseFailedBeforeOutputReturnsFailover(t *testing.T) require.Empty(t, rec.Body.String()) } +func TestOpenAIStreamingResponseFailedBeforeOutputCapacityErrorReturnsFailover(t *testing.T) { + gin.SetMode(gin.TestMode) + cfg := &config.Config{ + Gateway: config.GatewayConfig{ + StreamDataIntervalTimeout: 0, + StreamKeepaliveInterval: 0, + MaxLineSize: defaultMaxLineSize, + }, + } + svc := &OpenAIGatewayService{cfg: cfg} + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/", nil) + + resp := &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(strings.Join([]string{ + "event: response.created", + `data: {"type":"response.created","response":{"id":"resp_1"}}`, + "", + "event: response.in_progress", + `data: {"type":"response.in_progress","response":{"id":"resp_1"}}`, + "", + "event: response.failed", + `data: {"type":"response.failed","error":{"message":"Selected model is at capacity. Please try a different model.","type":"invalid_request_error"}}`, + "", + }, "\n"))), + Header: http.Header{"X-Request-Id": []string{"rid-capacity-failed"}}, + } + + _, err := svc.handleStreamingResponse(c.Request.Context(), resp, c, &Account{ID: 1, Platform: PlatformOpenAI, Name: "acc"}, time.Now(), "model", "model") + require.Error(t, err) + var failoverErr *UpstreamFailoverError + require.ErrorAs(t, err, &failoverErr) + require.Equal(t, http.StatusBadGateway, failoverErr.StatusCode) + require.Contains(t, string(failoverErr.ResponseBody), "Selected model is at capacity") + require.False(t, c.Writer.Written()) + require.Empty(t, rec.Body.String()) +} + func TestOpenAIStreamingPreambleOnlyMissingTerminalReturnsFailover(t *testing.T) { gin.SetMode(gin.TestMode) cfg := &config.Config{ @@ -2174,6 +2215,25 @@ func TestParseSSEUsage_SelectiveParsing(t *testing.T) { require.Equal(t, 13, usage.InputTokens) require.Equal(t, 15, usage.OutputTokens) require.Equal(t, 4, usage.CacheReadInputTokens) + + svc.parseSSEUsage(`{"type":"response.completed","response":{"usage":{"prompt_tokens":21,"completion_tokens":8,"prompt_tokens_details":{"cached_tokens":6}}}}`, usage) + require.Equal(t, 21, usage.InputTokens) + require.Equal(t, 8, usage.OutputTokens) + require.Equal(t, 6, usage.CacheReadInputTokens) +} + +func TestExtractOpenAIUsageFromJSONBytes_AcceptsResponseAndChatUsageShapes(t *testing.T) { + usage, ok := extractOpenAIUsageFromJSONBytes([]byte(`{"id":"resp_1","usage":{"input_tokens":3,"output_tokens":5,"input_tokens_details":{"cached_tokens":2}}}`)) + require.True(t, ok) + require.Equal(t, 3, usage.InputTokens) + require.Equal(t, 5, usage.OutputTokens) + require.Equal(t, 2, usage.CacheReadInputTokens) + + usage, ok = extractOpenAIUsageFromJSONBytes([]byte(`{"type":"response.completed","response":{"usage":{"prompt_tokens":13,"completion_tokens":7,"prompt_tokens_details":{"cached_tokens":4}}}}`)) + require.True(t, ok) + require.Equal(t, 13, usage.InputTokens) + require.Equal(t, 7, usage.OutputTokens) + require.Equal(t, 4, usage.CacheReadInputTokens) } func TestExtractCodexFinalResponse_SampleReplay(t *testing.T) { @@ -2293,3 +2353,29 @@ func TestHandleSSEToJSON_ResponseFailedReturnsProtocolError(t *testing.T) { require.Contains(t, rec.Body.String(), "upstream rejected request") require.Contains(t, rec.Header().Get("Content-Type"), "application/json") } + +func TestOpenAICompatSSEFrameParserResetsEventTypeAtFrameBoundary(t *testing.T) { + var parser openAICompatSSEFrameParser + + frame, ok := parser.AddLine("event: response.created") + require.False(t, ok) + require.Empty(t, frame) + + frame, ok = parser.AddLine(`data: {"response":{"id":"resp_1"}}`) + require.False(t, ok) + require.Empty(t, frame) + + frame, ok = parser.AddLine("") + require.True(t, ok) + require.Equal(t, "response.created", frame.EventType) + require.JSONEq(t, `{"response":{"id":"resp_1"}}`, frame.Data) + + frame, ok = parser.AddLine(`data: {"delta":"ok"}`) + require.False(t, ok) + require.Empty(t, frame.EventType) + + frame, ok = parser.AddLine("") + require.True(t, ok) + require.Empty(t, frame.EventType) + require.JSONEq(t, `{"delta":"ok"}`, frame.Data) +} diff --git a/backend/internal/service/openai_ws_forwarder.go b/backend/internal/service/openai_ws_forwarder.go index 77cf7d95..192ff90a 100644 --- a/backend/internal/service/openai_ws_forwarder.go +++ b/backend/internal/service/openai_ws_forwarder.go @@ -399,15 +399,9 @@ func parseOpenAIWSResponseUsageFromCompletedEvent(message []byte, usage *OpenAIU if usage == nil || len(message) == 0 { return } - values := gjson.GetManyBytes( - message, - "response.usage.input_tokens", - "response.usage.output_tokens", - "response.usage.input_tokens_details.cached_tokens", - ) - usage.InputTokens = int(values[0].Int()) - usage.OutputTokens = int(values[1].Int()) - usage.CacheReadInputTokens = int(values[2].Int()) + if parsedUsage, ok := extractOpenAIUsageFromJSONBytes(message); ok { + *usage = parsedUsage + } } func parseOpenAIWSErrorEventFields(message []byte) (code string, errType string, errMessage string) { diff --git a/backend/internal/service/openai_ws_forwarder_hotpath_optimization_test.go b/backend/internal/service/openai_ws_forwarder_hotpath_optimization_test.go index 76167603..0350bde9 100644 --- a/backend/internal/service/openai_ws_forwarder_hotpath_optimization_test.go +++ b/backend/internal/service/openai_ws_forwarder_hotpath_optimization_test.go @@ -29,6 +29,14 @@ func TestParseOpenAIWSResponseUsageFromCompletedEvent(t *testing.T) { require.Equal(t, 11, usage.InputTokens) require.Equal(t, 7, usage.OutputTokens) require.Equal(t, 3, usage.CacheReadInputTokens) + + parseOpenAIWSResponseUsageFromCompletedEvent( + []byte(`{"type":"response.completed","response":{"usage":{"prompt_tokens":19,"completion_tokens":5,"prompt_tokens_details":{"cached_tokens":4}}}}`), + usage, + ) + require.Equal(t, 19, usage.InputTokens) + require.Equal(t, 5, usage.OutputTokens) + require.Equal(t, 4, usage.CacheReadInputTokens) } func TestOpenAIWSErrorEventHelpers_ConsistentWithWrapper(t *testing.T) { diff --git a/frontend/src/components/payment/PaymentProviderDialog.vue b/frontend/src/components/payment/PaymentProviderDialog.vue index 86304cf6..b6085ed0 100644 --- a/frontend/src/components/payment/PaymentProviderDialog.vue +++ b/frontend/src/components/payment/PaymentProviderDialog.vue @@ -34,7 +34,7 @@ -
+
{{ t('admin.settings.payment.paymentMode') }}