From ed7ef863478ae46557f24a344b2eccfac22e153c Mon Sep 17 00:00:00 2001 From: weak-fox <827367480@qq.com> Date: Fri, 15 May 2026 10:41:57 +0800 Subject: [PATCH 01/16] test: add capacity retry regressions --- ...nai_gateway_service_codex_cli_only_test.go | 58 +++++++++++++++++++ .../service/openai_gateway_service_test.go | 41 +++++++++++++ 2 files changed, 99 insertions(+) diff --git a/backend/internal/service/openai_gateway_service_codex_cli_only_test.go b/backend/internal/service/openai_gateway_service_codex_cli_only_test.go index fe58e92f..951860cd 100644 --- a/backend/internal/service/openai_gateway_service_codex_cli_only_test.go +++ b/backend/internal/service/openai_gateway_service_codex_cli_only_test.go @@ -218,6 +218,12 @@ func TestIsOpenAITransientProcessingError(t *testing.T) { nil, )) + require.True(t, isOpenAITransientProcessingError( + http.StatusBadRequest, + "Selected model is at capacity. Please try a different model.", + []byte(`{"error":{"message":"Selected model is at capacity. Please try a different model.","type":"invalid_request_error"}}`), + )) + require.True(t, isOpenAITransientProcessingError( http.StatusBadRequest, "", @@ -332,3 +338,55 @@ func TestOpenAIGatewayService_Forward_TransientProcessingErrorTriggersFailover(t require.Contains(t, string(failoverErr.ResponseBody), "An error occurred while processing your request") require.False(t, c.Writer.Written(), "service 层应返回 failover 错误给上层换号,而不是直接向客户端写响应") } + +func TestOpenAIGatewayService_Forward_ModelCapacityErrorTriggersFailoverAndSameAccountRetry(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil)) + c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0") + c.Request.Header.Set("Content-Type", "application/json") + + upstream := &httpUpstreamRecorder{ + resp: &http.Response{ + StatusCode: http.StatusBadRequest, + Header: http.Header{ + "Content-Type": []string{"application/json"}, + "x-request-id": []string{"rid-capacity-400"}, + }, + Body: io.NopCloser(strings.NewReader(`{"error":{"message":"Selected model is at capacity. Please try a different model.","type":"invalid_request_error"}}`)), + }, + } + svc := &OpenAIGatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{ForceCodexCLI: false}, + }, + httpUpstream: upstream, + } + account := &Account{ + ID: 1001, + Name: "codex max套餐", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + "pool_mode": true, + }, + Status: StatusActive, + Schedulable: true, + RateMultiplier: f64p(1), + } + body := []byte(`{"model":"gpt-5.4","stream":false,"input":[{"type":"text","text":"hello"}]}`) + + _, err := svc.Forward(context.Background(), c, account, body) + require.Error(t, err) + + var failoverErr *UpstreamFailoverError + require.ErrorAs(t, err, &failoverErr) + require.Equal(t, http.StatusBadRequest, failoverErr.StatusCode) + require.True(t, failoverErr.RetryableOnSameAccount) + require.Contains(t, string(failoverErr.ResponseBody), "Selected model is at capacity") + require.False(t, c.Writer.Written(), "service 层应返回 failover 错误给上层重试/换号,而不是直接向客户端写响应") +} diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index 84a2fe71..15fe85ad 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -1116,6 +1116,47 @@ func TestOpenAIStreamingResponseFailedBeforeOutputReturnsFailover(t *testing.T) require.Empty(t, rec.Body.String()) } +func TestOpenAIStreamingResponseFailedBeforeOutputCapacityErrorReturnsFailover(t *testing.T) { + gin.SetMode(gin.TestMode) + cfg := &config.Config{ + Gateway: config.GatewayConfig{ + StreamDataIntervalTimeout: 0, + StreamKeepaliveInterval: 0, + MaxLineSize: defaultMaxLineSize, + }, + } + svc := &OpenAIGatewayService{cfg: cfg} + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/", nil) + + resp := &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(strings.Join([]string{ + "event: response.created", + `data: {"type":"response.created","response":{"id":"resp_1"}}`, + "", + "event: response.in_progress", + `data: {"type":"response.in_progress","response":{"id":"resp_1"}}`, + "", + "event: response.failed", + `data: {"type":"response.failed","error":{"message":"Selected model is at capacity. Please try a different model.","type":"invalid_request_error"}}`, + "", + }, "\n"))), + Header: http.Header{"X-Request-Id": []string{"rid-capacity-failed"}}, + } + + _, err := svc.handleStreamingResponse(c.Request.Context(), resp, c, &Account{ID: 1, Platform: PlatformOpenAI, Name: "acc"}, time.Now(), "model", "model") + require.Error(t, err) + var failoverErr *UpstreamFailoverError + require.ErrorAs(t, err, &failoverErr) + require.Equal(t, http.StatusBadGateway, failoverErr.StatusCode) + require.Contains(t, string(failoverErr.ResponseBody), "Selected model is at capacity") + require.False(t, c.Writer.Written()) + require.Empty(t, rec.Body.String()) +} + func TestOpenAIStreamingPreambleOnlyMissingTerminalReturnsFailover(t *testing.T) { gin.SetMode(gin.TestMode) cfg := &config.Config{ From 9f07741c139e05463983b236faf9a2b269af0fe4 Mon Sep 17 00:00:00 2001 From: weak-fox <827367480@qq.com> Date: Fri, 15 May 2026 10:43:29 +0800 Subject: [PATCH 02/16] fix: retry model capacity transient errors --- backend/internal/service/openai_gateway_service.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index e12b208e..6cda65c0 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -1113,6 +1113,9 @@ func isOpenAITransientProcessingError(upstreamStatusCode int, upstreamMsg string if strings.Contains(lower, "an error occurred while processing your request") { return true } + if strings.Contains(lower, "selected model is at capacity") { + return true + } return strings.Contains(lower, "you can retry your request") && strings.Contains(lower, "help.openai.com") && strings.Contains(lower, "request id") @@ -3400,6 +3403,9 @@ func openAIStreamDataStartsClientOutput(data, eventType string) bool { } func openAIStreamFailedEventShouldFailover(payload []byte, message string) bool { + if isOpenAITransientProcessingError(http.StatusBadRequest, message, payload) { + return true + } code := strings.ToLower(strings.TrimSpace(gjson.GetBytes(payload, "response.error.code").String())) if code == "" { code = strings.ToLower(strings.TrimSpace(gjson.GetBytes(payload, "error.code").String())) From 348a487739cf5d0b7e4b44f76003a38adba7a89c Mon Sep 17 00:00:00 2001 From: yetone Date: Fri, 15 May 2026 23:29:56 +0800 Subject: [PATCH 03/16] fix(codex-transform): preserve underscore when rewriting call_* tool-call ids MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `fixCallIDPrefix` builds malformed ids when the input has the standard OpenAI `call_` prefix: input: call_YYen1qxDejd2myJwcTCf7Nyp output: fcYYen1qxDejd2myJwcTCf7Nyp ← no underscore between 'fc' and the nanoid ChatGPT's codex backend then rejects the replayed item with: 400 Invalid 'input[N].id': 'fcYYen1qxDejd2myJwcTCf7Nyp'. Expected an ID that contains letters, numbers, underscores, or dashes, but this value contained additional characters. Sub2api wraps that into 502 to the client. Clients using the OpenAI SDK on the OAuth/codex path see every multi-hop turn (after the first tool call) fail because the item_reference rewritten this way gets sent on every subsequent hop. The other two branches of the same function correctly emit `fc_` (line 1029: pass-through when already `fc*`; line 1035 fallback: `fc_" + id`). Only the `call_` → `fc_` rewrite was missing the underscore — looks like a copy-paste slip during the original commit. Fix: change `"fc"` to `"fc_"` on the call_ branch. One character. Repro: client (OpenAI SDK) sends a function_call_output whose call_id is `call_` (default OpenAI format). The sub2api request body also contains an item_reference whose id mirrors the call_id (also `call_`). On the codex OAuth path, this rewrite fires for the item_reference's id, producing the malformed value. Affects: `platform=openai type=oauth` accounts whose clients use the official OpenAI SDK / Responses API conventions (id prefix `call_`). API-key accounts and bridge-mode requests are untouched. --- backend/internal/service/openai_codex_transform.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/internal/service/openai_codex_transform.go b/backend/internal/service/openai_codex_transform.go index a3b69dee..5b5f2bc3 100644 --- a/backend/internal/service/openai_codex_transform.go +++ b/backend/internal/service/openai_codex_transform.go @@ -1030,7 +1030,7 @@ func filterCodexInputWithOptions(input []any, opts codexInputFilterOptions) []an return id } if strings.HasPrefix(id, "call_") { - return "fc" + strings.TrimPrefix(id, "call_") + return "fc_" + strings.TrimPrefix(id, "call_") } return "fc_" + id } From b0c7723393a43ffca8277f597f375e148ba3ff2f Mon Sep 17 00:00:00 2001 From: yetone Date: Sat, 16 May 2026 00:50:35 +0800 Subject: [PATCH 04/16] fix(admin/settings): make tab shell readable in dark mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Vue's scoped-CSS compiler was dropping the `:global(.dark) .settings-tabs-shell` rules in the production build, so the tab strip kept its light-mode white background and the inactive tab labels (text-gray-300) showed at ~1.6:1 contrast — effectively unreadable. Hoist the three dark-mode overrides into an unscoped ` + + From 0393bd7c82da1ca385af0744e9e19c1490c93bc0 Mon Sep 17 00:00:00 2001 From: name <136912576+is7Qin@users.noreply.github.com> Date: Sat, 16 May 2026 02:37:55 +0800 Subject: [PATCH 05/16] Fix OpenAI compat usage parsing --- backend/internal/pkg/apicompat/types.go | 31 ++++++++++ .../service/openai_compat_model_test.go | 57 +++++++++++++++++++ .../service/openai_gateway_service.go | 49 +++++++++++----- .../service/openai_gateway_service_test.go | 19 +++++++ .../internal/service/openai_ws_forwarder.go | 12 +--- ..._ws_forwarder_hotpath_optimization_test.go | 8 +++ 6 files changed, 152 insertions(+), 24 deletions(-) diff --git a/backend/internal/pkg/apicompat/types.go b/backend/internal/pkg/apicompat/types.go index f9cd5a1c..df75ce50 100644 --- a/backend/internal/pkg/apicompat/types.go +++ b/backend/internal/pkg/apicompat/types.go @@ -306,6 +306,37 @@ type ResponsesUsage struct { OutputTokensDetails *ResponsesOutputTokensDetails `json:"output_tokens_details,omitempty"` } +func (u *ResponsesUsage) UnmarshalJSON(data []byte) error { + type responsesUsageAlias ResponsesUsage + var aux struct { + responsesUsageAlias + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + PromptTokensDetails *ResponsesInputTokensDetails `json:"prompt_tokens_details,omitempty"` + CompletionTokensDetails *ResponsesOutputTokensDetails `json:"completion_tokens_details,omitempty"` + } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + *u = ResponsesUsage(aux.responsesUsageAlias) + if u.InputTokens == 0 && aux.PromptTokens != 0 { + u.InputTokens = aux.PromptTokens + } + if u.OutputTokens == 0 && aux.CompletionTokens != 0 { + u.OutputTokens = aux.CompletionTokens + } + if u.InputTokensDetails == nil && aux.PromptTokensDetails != nil { + u.InputTokensDetails = aux.PromptTokensDetails + } + if u.OutputTokensDetails == nil && aux.CompletionTokensDetails != nil { + u.OutputTokensDetails = aux.CompletionTokensDetails + } + if u.TotalTokens == 0 && (u.InputTokens != 0 || u.OutputTokens != 0) { + u.TotalTokens = u.InputTokens + u.OutputTokens + } + return nil +} + // ResponsesInputTokensDetails breaks down input token usage. type ResponsesInputTokensDetails struct { CachedTokens int `json:"cached_tokens,omitempty"` diff --git a/backend/internal/service/openai_compat_model_test.go b/backend/internal/service/openai_compat_model_test.go index e222b093..6aa4ef09 100644 --- a/backend/internal/service/openai_compat_model_test.go +++ b/backend/internal/service/openai_compat_model_test.go @@ -183,6 +183,63 @@ func TestForwardAsAnthropic_NormalizesRoutingAndEffortForGpt54XHigh(t *testing.T t.Logf("response body: %s", rec.Body.String()) } +func TestForwardAsAnthropic_MappedClaudeModelAcceptsChatUsageShape(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"claude-opus-4-7","max_tokens":16,"messages":[{"role":"user","content":"compact this"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := strings.Join([]string{ + `data: {"type":"response.created","response":{"id":"resp_compact","model":"gpt-5.5","status":"in_progress","output":[]}}`, + "", + `data: {"type":"response.output_text.delta","delta":"ok"}`, + "", + `data: {"type":"response.completed","response":{"id":"resp_compact","object":"response","model":"gpt-5.5","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"prompt_tokens":31,"completion_tokens":9,"total_tokens":40,"prompt_tokens_details":{"cached_tokens":11}}}}`, + "", + "data: [DONE]", + "", + }, "\n") + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_compact_usage"}}, + Body: io.NopCloser(strings.NewReader(upstreamBody)), + }} + + svc := &OpenAIGatewayService{ + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + account := &Account{ + ID: 1, + Name: "openai-apikey", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + "base_url": "https://api.openai.com/v1", + "model_mapping": map[string]any{ + "gpt-5.5": "gpt-5.5", + }, + }, + } + + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.5") + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, "claude-opus-4-7", result.Model) + require.Equal(t, "gpt-5.5", result.BillingModel) + require.Equal(t, "gpt-5.5", result.UpstreamModel) + require.Equal(t, 31, result.Usage.InputTokens) + require.Equal(t, 9, result.Usage.OutputTokens) + require.Equal(t, 11, result.Usage.CacheReadInputTokens) + require.Equal(t, "gpt-5.5", gjson.GetBytes(upstream.lastBody, "model").String()) +} + func TestForwardAsAnthropic_InjectsPromptCacheKeyForAPIKeyMessagesDispatch(t *testing.T) { t.Parallel() gin.SetMode(gin.TestMode) diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index e12b208e..3a2cb0c3 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -4639,28 +4639,47 @@ func (s *OpenAIGatewayService) parseSSEUsageBytes(data []byte, usage *OpenAIUsag return } - usage.InputTokens = int(gjson.GetBytes(data, "response.usage.input_tokens").Int()) - usage.OutputTokens = int(gjson.GetBytes(data, "response.usage.output_tokens").Int()) - usage.CacheReadInputTokens = int(gjson.GetBytes(data, "response.usage.input_tokens_details.cached_tokens").Int()) - usage.ImageOutputTokens = int(gjson.GetBytes(data, "response.usage.output_tokens_details.image_tokens").Int()) + if parsedUsage, ok := extractOpenAIUsageFromJSONBytes(data); ok { + *usage = parsedUsage + } } func extractOpenAIUsageFromJSONBytes(body []byte) (OpenAIUsage, bool) { if len(body) == 0 || !gjson.ValidBytes(body) { return OpenAIUsage{}, false } - values := gjson.GetManyBytes( - body, - "usage.input_tokens", - "usage.output_tokens", - "usage.input_tokens_details.cached_tokens", - "usage.output_tokens_details.image_tokens", - ) + if usage, ok := openAIUsageFromGJSON(gjson.GetBytes(body, "usage")); ok { + return usage, true + } + return openAIUsageFromGJSON(gjson.GetBytes(body, "response.usage")) +} + +func openAIUsageFromGJSON(value gjson.Result) (OpenAIUsage, bool) { + if !value.Exists() || !value.IsObject() { + return OpenAIUsage{}, false + } + inputTokens := value.Get("input_tokens").Int() + if inputTokens == 0 { + inputTokens = value.Get("prompt_tokens").Int() + } + outputTokens := value.Get("output_tokens").Int() + if outputTokens == 0 { + outputTokens = value.Get("completion_tokens").Int() + } + cacheReadTokens := value.Get("input_tokens_details.cached_tokens").Int() + if cacheReadTokens == 0 { + cacheReadTokens = value.Get("prompt_tokens_details.cached_tokens").Int() + } + imageOutputTokens := value.Get("output_tokens_details.image_tokens").Int() + if imageOutputTokens == 0 { + imageOutputTokens = value.Get("completion_tokens_details.image_tokens").Int() + } return OpenAIUsage{ - InputTokens: int(values[0].Int()), - OutputTokens: int(values[1].Int()), - CacheReadInputTokens: int(values[2].Int()), - ImageOutputTokens: int(values[3].Int()), + InputTokens: int(inputTokens), + OutputTokens: int(outputTokens), + CacheCreationInputTokens: int(value.Get("cache_creation_input_tokens").Int()), + CacheReadInputTokens: int(cacheReadTokens), + ImageOutputTokens: int(imageOutputTokens), }, true } diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index 84a2fe71..708a7146 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -2174,6 +2174,25 @@ func TestParseSSEUsage_SelectiveParsing(t *testing.T) { require.Equal(t, 13, usage.InputTokens) require.Equal(t, 15, usage.OutputTokens) require.Equal(t, 4, usage.CacheReadInputTokens) + + svc.parseSSEUsage(`{"type":"response.completed","response":{"usage":{"prompt_tokens":21,"completion_tokens":8,"prompt_tokens_details":{"cached_tokens":6}}}}`, usage) + require.Equal(t, 21, usage.InputTokens) + require.Equal(t, 8, usage.OutputTokens) + require.Equal(t, 6, usage.CacheReadInputTokens) +} + +func TestExtractOpenAIUsageFromJSONBytes_AcceptsResponseAndChatUsageShapes(t *testing.T) { + usage, ok := extractOpenAIUsageFromJSONBytes([]byte(`{"id":"resp_1","usage":{"input_tokens":3,"output_tokens":5,"input_tokens_details":{"cached_tokens":2}}}`)) + require.True(t, ok) + require.Equal(t, 3, usage.InputTokens) + require.Equal(t, 5, usage.OutputTokens) + require.Equal(t, 2, usage.CacheReadInputTokens) + + usage, ok = extractOpenAIUsageFromJSONBytes([]byte(`{"type":"response.completed","response":{"usage":{"prompt_tokens":13,"completion_tokens":7,"prompt_tokens_details":{"cached_tokens":4}}}}`)) + require.True(t, ok) + require.Equal(t, 13, usage.InputTokens) + require.Equal(t, 7, usage.OutputTokens) + require.Equal(t, 4, usage.CacheReadInputTokens) } func TestExtractCodexFinalResponse_SampleReplay(t *testing.T) { diff --git a/backend/internal/service/openai_ws_forwarder.go b/backend/internal/service/openai_ws_forwarder.go index 77cf7d95..192ff90a 100644 --- a/backend/internal/service/openai_ws_forwarder.go +++ b/backend/internal/service/openai_ws_forwarder.go @@ -399,15 +399,9 @@ func parseOpenAIWSResponseUsageFromCompletedEvent(message []byte, usage *OpenAIU if usage == nil || len(message) == 0 { return } - values := gjson.GetManyBytes( - message, - "response.usage.input_tokens", - "response.usage.output_tokens", - "response.usage.input_tokens_details.cached_tokens", - ) - usage.InputTokens = int(values[0].Int()) - usage.OutputTokens = int(values[1].Int()) - usage.CacheReadInputTokens = int(values[2].Int()) + if parsedUsage, ok := extractOpenAIUsageFromJSONBytes(message); ok { + *usage = parsedUsage + } } func parseOpenAIWSErrorEventFields(message []byte) (code string, errType string, errMessage string) { diff --git a/backend/internal/service/openai_ws_forwarder_hotpath_optimization_test.go b/backend/internal/service/openai_ws_forwarder_hotpath_optimization_test.go index 76167603..0350bde9 100644 --- a/backend/internal/service/openai_ws_forwarder_hotpath_optimization_test.go +++ b/backend/internal/service/openai_ws_forwarder_hotpath_optimization_test.go @@ -29,6 +29,14 @@ func TestParseOpenAIWSResponseUsageFromCompletedEvent(t *testing.T) { require.Equal(t, 11, usage.InputTokens) require.Equal(t, 7, usage.OutputTokens) require.Equal(t, 3, usage.CacheReadInputTokens) + + parseOpenAIWSResponseUsageFromCompletedEvent( + []byte(`{"type":"response.completed","response":{"usage":{"prompt_tokens":19,"completion_tokens":5,"prompt_tokens_details":{"cached_tokens":4}}}}`), + usage, + ) + require.Equal(t, 19, usage.InputTokens) + require.Equal(t, 5, usage.OutputTokens) + require.Equal(t, 4, usage.CacheReadInputTokens) } func TestOpenAIWSErrorEventHelpers_ConsistentWithWrapper(t *testing.T) { From 44995404ef409772f6779efe4ba1f6b77c07a58f Mon Sep 17 00:00:00 2001 From: wucm667 Date: Sun, 17 May 2026 11:19:47 +0800 Subject: [PATCH 06/16] fix(docker): pin frontend builder pnpm to v9 `corepack prepare pnpm@latest` now resolves to pnpm 11, which promotes ERR_PNPM_IGNORED_BUILDS to a hard error and breaks the frontend stage of `docker build`. Pin pnpm to v9 to match the CI workflow (pnpm/action-setup version: 9) and keep image builds reproducible. Fixes #2442 --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 7befb464..d556008b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,8 +20,8 @@ FROM ${NODE_IMAGE} AS frontend-builder WORKDIR /app/frontend -# Install pnpm -RUN corepack enable && corepack prepare pnpm@latest --activate +# Install pnpm (pinned to v9 to match CI and keep builds reproducible) +RUN corepack enable && corepack prepare pnpm@9 --activate # Install dependencies first (better caching) COPY frontend/package.json frontend/pnpm-lock.yaml ./ From cc5328c4917cfb7a0b221ac5636f546bca43b565 Mon Sep 17 00:00:00 2001 From: lyen1688 Date: Sun, 17 May 2026 15:33:34 +0800 Subject: [PATCH 07/16] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20OpenAI=20Responses?= =?UTF-8?q?=20SSE=20=E7=BB=88=E6=AD=A2=E4=BA=8B=E4=BB=B6=E8=AF=86=E5=88=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/openai_compat_model_test.go | 190 ++++++++++++++++++ .../openai_gateway_chat_completions.go | 37 +++- .../openai_gateway_chat_completions_test.go | 114 +++++++++++ .../service/openai_gateway_messages.go | 49 ++++- .../service/openai_gateway_service.go | 70 +++++++ .../service/openai_gateway_service_test.go | 26 +++ 6 files changed, 474 insertions(+), 12 deletions(-) diff --git a/backend/internal/service/openai_compat_model_test.go b/backend/internal/service/openai_compat_model_test.go index e222b093..0ba2a63f 100644 --- a/backend/internal/service/openai_compat_model_test.go +++ b/backend/internal/service/openai_compat_model_test.go @@ -1360,6 +1360,135 @@ func TestForwardAsAnthropic_TerminalUsageWithoutUpstreamCloseReturns(t *testing. } } +func TestForwardAsAnthropic_EventNamedTerminalWithoutUpstreamCloseReturns(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Writer = &openAICompatFailingWriter{ResponseWriter: c.Writer, failAfter: 0} + body := []byte(`{"model":"gpt-5.4","max_tokens":16,"messages":[{"role":"user","content":"hello"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := []byte(strings.Join([]string{ + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}`, + ``, + ``, + }, "\n")) + upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) + defer func() { + require.NoError(t, upstreamStream.Close()) + }() + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_messages_event_named_terminal"}}, + Body: upstreamStream, + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + type forwardResult struct { + result *OpenAIForwardResult + err error + } + resultCh := make(chan forwardResult, 1) + go func() { + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.1") + resultCh <- forwardResult{result: result, err: err} + }() + + select { + case got := <-resultCh: + require.NoError(t, got.err) + require.NotNil(t, got.result) + require.Equal(t, 15, got.result.Usage.InputTokens) + require.Equal(t, 6, got.result.Usage.OutputTokens) + require.Equal(t, 5, got.result.Usage.CacheReadInputTokens) + case <-time.After(time.Second): + require.Fail(t, "ForwardAsAnthropic should use SSE event names when data payloads omit type") + } +} + +func TestForwardAsAnthropic_EventNamedTerminalWithKeepaliveReturns(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Writer = &openAICompatFailingWriter{ResponseWriter: c.Writer, failAfter: 0} + body := []byte(`{"model":"gpt-5.4","max_tokens":16,"messages":[{"role":"user","content":"hello"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := []byte(strings.Join([]string{ + `: upstream ping`, + ``, + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}`, + ``, + ``, + }, "\n")) + upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) + defer func() { + require.NoError(t, upstreamStream.Close()) + }() + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_messages_event_named_keepalive"}}, + Body: upstreamStream, + }} + + svc := &OpenAIGatewayService{ + cfg: &config.Config{Gateway: config.GatewayConfig{ + StreamKeepaliveInterval: 5, + }}, + httpUpstream: upstream, + } + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + type forwardResult struct { + result *OpenAIForwardResult + err error + } + resultCh := make(chan forwardResult, 1) + go func() { + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.1") + resultCh <- forwardResult{result: result, err: err} + }() + + select { + case got := <-resultCh: + require.NoError(t, got.err) + require.NotNil(t, got.result) + require.Equal(t, 15, got.result.Usage.InputTokens) + require.Equal(t, 6, got.result.Usage.OutputTokens) + require.Equal(t, 5, got.result.Usage.CacheReadInputTokens) + case <-time.After(time.Second): + require.Fail(t, "ForwardAsAnthropic keepalive path should use SSE event names when data payloads omit type") + } +} + func TestForwardAsAnthropic_BufferedTerminalWithoutUpstreamCloseReturns(t *testing.T) { gin.SetMode(gin.TestMode) @@ -1416,6 +1545,67 @@ func TestForwardAsAnthropic_BufferedTerminalWithoutUpstreamCloseReturns(t *testi } } +func TestForwardAsAnthropic_BufferedEventNamedTerminalWithoutUpstreamCloseReturns(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.4","max_tokens":16,"messages":[{"role":"user","content":"hello"}],"stream":false}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := []byte(strings.Join([]string{ + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":15,"output_tokens":6,"total_tokens":21,"input_tokens_details":{"cached_tokens":5}}}}`, + ``, + ``, + }, "\n")) + upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) + defer func() { + require.NoError(t, upstreamStream.Close()) + }() + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_messages_buffered_event_named"}}, + Body: upstreamStream, + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + type forwardResult struct { + result *OpenAIForwardResult + err error + } + resultCh := make(chan forwardResult, 1) + go func() { + result, err := svc.ForwardAsAnthropic(context.Background(), c, account, body, "", "gpt-5.1") + resultCh <- forwardResult{result: result, err: err} + }() + + select { + case got := <-resultCh: + require.NoError(t, got.err) + require.NotNil(t, got.result) + require.Equal(t, 15, got.result.Usage.InputTokens) + require.Equal(t, 6, got.result.Usage.OutputTokens) + require.Equal(t, 5, got.result.Usage.CacheReadInputTokens) + require.Contains(t, rec.Body.String(), `"stop_reason":"end_turn"`) + case <-time.After(time.Second): + require.Fail(t, "ForwardAsAnthropic buffered response should use SSE event names when data payloads omit type") + } +} + func TestForwardAsAnthropic_DoneSentinelWithoutTerminalReturnsError(t *testing.T) { gin.SetMode(gin.TestMode) diff --git a/backend/internal/service/openai_gateway_chat_completions.go b/backend/internal/service/openai_gateway_chat_completions.go index 84d85c74..5b3c0e6f 100644 --- a/backend/internal/service/openai_gateway_chat_completions.go +++ b/backend/internal/service/openai_gateway_chat_completions.go @@ -554,6 +554,13 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( missingTerminalErr := func() (*OpenAIForwardResult, error) { return resultWithUsage(), fmt.Errorf("stream usage incomplete: missing terminal event") } + processFrame := func(frame openAICompatSSEFrame) bool { + payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType) + if strings.TrimSpace(payload) == "[DONE]" { + return false + } + return processDataLine(payload) + } // Determine keepalive interval keepaliveInterval := time.Duration(0) @@ -563,16 +570,17 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( // No keepalive: fast synchronous path if streamInterval <= 0 && keepaliveInterval <= 0 { + var parser openAICompatSSEFrameParser for scanner.Scan() { line := scanner.Text() - payload, ok := extractOpenAISSEDataLine(line) + frame, ok := parser.AddLine(line) if !ok { continue } - if strings.TrimSpace(payload) == "[DONE]" { + if strings.TrimSpace(frame.Data) == "[DONE]" { return missingTerminalErr() } - if processDataLine(payload) { + if processFrame(frame) { return finalizeStream() } } @@ -580,6 +588,14 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( handleScanErr(err) return resultWithUsage(), fmt.Errorf("stream usage incomplete: %w", err) } + if frame, ok := parser.Finish(); ok { + if strings.TrimSpace(frame.Data) == "[DONE]" { + return missingTerminalErr() + } + if processFrame(frame) { + return finalizeStream() + } + } return missingTerminalErr() } @@ -624,11 +640,20 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( keepaliveCh = keepaliveTicker.C } lastDataAt := time.Now() + var parser openAICompatSSEFrameParser for { select { case ev, ok := <-events: if !ok { + if frame, ok := parser.Finish(); ok { + if strings.TrimSpace(frame.Data) == "[DONE]" { + return missingTerminalErr() + } + if processFrame(frame) { + return finalizeStream() + } + } return missingTerminalErr() } if ev.err != nil { @@ -637,14 +662,14 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse( } lastDataAt = time.Now() line := ev.line - payload, ok := extractOpenAISSEDataLine(line) + frame, ok := parser.AddLine(line) if !ok { continue } - if strings.TrimSpace(payload) == "[DONE]" { + if strings.TrimSpace(frame.Data) == "[DONE]" { return missingTerminalErr() } - if processDataLine(payload) { + if processFrame(frame) { return finalizeStream() } diff --git a/backend/internal/service/openai_gateway_chat_completions_test.go b/backend/internal/service/openai_gateway_chat_completions_test.go index b0d1fa31..a26091a3 100644 --- a/backend/internal/service/openai_gateway_chat_completions_test.go +++ b/backend/internal/service/openai_gateway_chat_completions_test.go @@ -236,6 +236,120 @@ func TestForwardAsChatCompletions_TerminalUsageWithoutUpstreamCloseReturns(t *te } } +func TestForwardAsChatCompletions_EventNamedTerminalWithoutUpstreamCloseReturns(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.4","messages":[{"role":"user","content":"hello"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := []byte(strings.Join([]string{ + `event: response.created`, + `data: {"response":{"id":"resp_1","model":"gpt-5.4","status":"in_progress","output":[]}}`, + ``, + `event: response.output_text.delta`, + `data: {"delta":"ok"}`, + ``, + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":17,"output_tokens":8,"total_tokens":25,"input_tokens_details":{"cached_tokens":6}}}}`, + ``, + ``, + }, "\n")) + upstreamStream := newOpenAICompatBlockingReadCloser(upstreamBody) + defer func() { + require.NoError(t, upstreamStream.Close()) + }() + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_event_named_terminal"}}, + Body: upstreamStream, + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + type forwardResult struct { + result *OpenAIForwardResult + err error + } + resultCh := make(chan forwardResult, 1) + go func() { + result, err := svc.ForwardAsChatCompletions(context.Background(), c, account, body, "", "gpt-5.1") + resultCh <- forwardResult{result: result, err: err} + }() + + select { + case got := <-resultCh: + require.NoError(t, got.err) + require.NotNil(t, got.result) + require.Equal(t, 17, got.result.Usage.InputTokens) + require.Equal(t, 8, got.result.Usage.OutputTokens) + require.Equal(t, 6, got.result.Usage.CacheReadInputTokens) + require.Contains(t, rec.Body.String(), `"content":"ok"`) + case <-time.After(time.Second): + require.Fail(t, "ForwardAsChatCompletions should use SSE event names when data payloads omit type") + } +} + +func TestForwardAsChatCompletions_EventTypeDoesNotLeakAcrossFrames(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.4","messages":[{"role":"user","content":"hello"}],"stream":true}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstreamBody := strings.Join([]string{ + `event: response.created`, + `data: {"response":{"id":"resp_1","model":"gpt-5.4","status":"in_progress","output":[]}}`, + ``, + `data: {"type":"response.output_text.delta","delta":"ok"}`, + ``, + `event: response.completed`, + `data: {"response":{"id":"resp_1","object":"response","model":"gpt-5.4","status":"completed","output":[{"type":"message","id":"msg_1","role":"assistant","status":"completed","content":[{"type":"output_text","text":"ok"}]}],"usage":{"input_tokens":17,"output_tokens":8,"total_tokens":25,"input_tokens_details":{"cached_tokens":6}}}}`, + ``, + `data: [DONE]`, + ``, + }, "\n") + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_chat_event_boundary"}}, + Body: io.NopCloser(strings.NewReader(upstreamBody)), + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + + result, err := svc.ForwardAsChatCompletions(context.Background(), c, account, body, "", "gpt-5.1") + require.NoError(t, err) + require.NotNil(t, result) + require.Contains(t, rec.Body.String(), `"content":"ok"`) + require.Contains(t, rec.Body.String(), `data: [DONE]`) +} + func TestForwardAsChatCompletions_BufferedTerminalWithoutUpstreamCloseReturns(t *testing.T) { gin.SetMode(gin.TestMode) diff --git a/backend/internal/service/openai_gateway_messages.go b/backend/internal/service/openai_gateway_messages.go index aefa8fd2..6d74f7dd 100644 --- a/backend/internal/service/openai_gateway_messages.go +++ b/backend/internal/service/openai_gateway_messages.go @@ -560,10 +560,24 @@ func (s *OpenAIGatewayService) readOpenAICompatBufferedTerminal( }() defer close(done) + var parser openAICompatSSEFrameParser for { select { case ev, ok := <-events: if !ok { + if frame, ok := parser.Finish(); ok { + payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType) + var event apicompat.ResponsesStreamEvent + if err := json.Unmarshal([]byte(payload), &event); err == nil { + acc.ProcessEvent(&event) + if isOpenAICompatResponsesTerminalEvent(event.Type) && event.Response != nil { + if event.Response.Usage != nil { + usage = copyOpenAIUsageFromResponsesUsage(event.Response.Usage) + } + return event.Response, usage, acc, nil + } + } + } return nil, usage, acc, nil } resetTimeout() @@ -580,10 +594,11 @@ func (s *OpenAIGatewayService) readOpenAICompatBufferedTerminal( if isOpenAICompatDoneSentinelLine(ev.line) { return nil, usage, acc, nil } - payload, ok := extractOpenAISSEDataLine(ev.line) - if !ok || payload == "" { + frame, ok := parser.AddLine(ev.line) + if !ok { continue } + payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType) var event apicompat.ResponsesStreamEvent if err := json.Unmarshal([]byte(payload), &event); err != nil { @@ -772,6 +787,10 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( missingTerminalErr := func() (*OpenAIForwardResult, error) { return resultWithUsage(), fmt.Errorf("stream usage incomplete: missing terminal event") } + processFrame := func(frame openAICompatSSEFrame) bool { + payload := openAICompatPayloadWithEventType(frame.Data, frame.EventType) + return processDataLine(payload) + } // ── Determine keepalive interval ── keepaliveInterval := time.Duration(0) @@ -781,16 +800,17 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( // ── No keepalive: fast synchronous path (no goroutine overhead) ── if streamInterval <= 0 && keepaliveInterval <= 0 { + var parser openAICompatSSEFrameParser for scanner.Scan() { line := scanner.Text() if isOpenAICompatDoneSentinelLine(line) { return missingTerminalErr() } - payload, ok := extractOpenAISSEDataLine(line) + frame, ok := parser.AddLine(line) if !ok { continue } - if processDataLine(payload) { + if processFrame(frame) { return finalizeStream() } } @@ -798,6 +818,14 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( handleScanErr(err) return resultWithUsage(), fmt.Errorf("stream usage incomplete: %w", err) } + if frame, ok := parser.Finish(); ok { + if strings.TrimSpace(frame.Data) == "[DONE]" { + return missingTerminalErr() + } + if processFrame(frame) { + return finalizeStream() + } + } return missingTerminalErr() } @@ -842,12 +870,21 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( keepaliveCh = keepaliveTicker.C } lastDataAt := time.Now() + var parser openAICompatSSEFrameParser for { select { case ev, ok := <-events: if !ok { // Upstream closed + if frame, ok := parser.Finish(); ok { + if strings.TrimSpace(frame.Data) == "[DONE]" { + return missingTerminalErr() + } + if processFrame(frame) { + return finalizeStream() + } + } return missingTerminalErr() } if ev.err != nil { @@ -859,11 +896,11 @@ func (s *OpenAIGatewayService) handleAnthropicStreamingResponse( if isOpenAICompatDoneSentinelLine(line) { return missingTerminalErr() } - payload, ok := extractOpenAISSEDataLine(line) + frame, ok := parser.AddLine(line) if !ok { continue } - if processDataLine(payload) { + if processFrame(frame) { return finalizeStream() } diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index e12b208e..a2276353 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -4578,6 +4578,76 @@ func extractOpenAISSEDataLine(line string) (string, bool) { return line[start:], true } +func extractOpenAISSEEventLine(line string) (string, bool) { + if !strings.HasPrefix(line, "event:") { + return "", false + } + start := len("event:") + for start < len(line) { + if line[start] != ' ' && line[start] != ' ' { + break + } + start++ + } + return strings.TrimSpace(line[start:]), true +} + +type openAICompatSSEFrame struct { + EventType string + Data string +} + +type openAICompatSSEFrameParser struct { + eventType string + dataLines []string +} + +func (p *openAICompatSSEFrameParser) AddLine(line string) (openAICompatSSEFrame, bool) { + if line == "" { + return p.dispatch() + } + if strings.HasPrefix(line, ":") { + return openAICompatSSEFrame{}, false + } + if eventType, ok := extractOpenAISSEEventLine(line); ok { + p.eventType = eventType + return openAICompatSSEFrame{}, false + } + if data, ok := extractOpenAISSEDataLine(line); ok { + p.dataLines = append(p.dataLines, data) + } + return openAICompatSSEFrame{}, false +} + +func (p *openAICompatSSEFrameParser) Finish() (openAICompatSSEFrame, bool) { + return p.dispatch() +} + +func (p *openAICompatSSEFrameParser) dispatch() (openAICompatSSEFrame, bool) { + frame := openAICompatSSEFrame{ + EventType: p.eventType, + Data: strings.Join(p.dataLines, "\n"), + } + p.eventType = "" + p.dataLines = nil + return frame, frame.Data != "" +} + +func openAICompatPayloadWithEventType(payload, eventType string) string { + eventType = strings.TrimSpace(eventType) + if eventType == "" || strings.TrimSpace(payload) == "" || strings.TrimSpace(payload) == "[DONE]" { + return payload + } + if gjson.Get(payload, "type").Exists() { + return payload + } + patched, err := sjson.Set(payload, "type", eventType) + if err != nil { + return payload + } + return patched +} + func (s *OpenAIGatewayService) replaceModelInSSELine(line, fromModel, toModel string) string { data, ok := extractOpenAISSEDataLine(line) if !ok { diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index 84a2fe71..d636cf27 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -2293,3 +2293,29 @@ func TestHandleSSEToJSON_ResponseFailedReturnsProtocolError(t *testing.T) { require.Contains(t, rec.Body.String(), "upstream rejected request") require.Contains(t, rec.Header().Get("Content-Type"), "application/json") } + +func TestOpenAICompatSSEFrameParserResetsEventTypeAtFrameBoundary(t *testing.T) { + var parser openAICompatSSEFrameParser + + frame, ok := parser.AddLine("event: response.created") + require.False(t, ok) + require.Empty(t, frame) + + frame, ok = parser.AddLine(`data: {"response":{"id":"resp_1"}}`) + require.False(t, ok) + require.Empty(t, frame) + + frame, ok = parser.AddLine("") + require.True(t, ok) + require.Equal(t, "response.created", frame.EventType) + require.JSONEq(t, `{"response":{"id":"resp_1"}}`, frame.Data) + + frame, ok = parser.AddLine(`data: {"delta":"ok"}`) + require.False(t, ok) + require.Empty(t, frame.EventType) + + frame, ok = parser.AddLine("") + require.True(t, ok) + require.Empty(t, frame.EventType) + require.JSONEq(t, `{"delta":"ok"}`, frame.Data) +} From 1b03240515a465f878f421cd04f00550ed70e0ac Mon Sep 17 00:00:00 2001 From: Yuhao Jiang Date: Sun, 17 May 2026 14:58:42 -0500 Subject: [PATCH 08/16] =?UTF-8?q?fix(payment):=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E6=94=AF=E4=BB=98=E5=AE=9D=E5=AE=98=E6=96=B9=E6=89=AB=E7=A0=81?= =?UTF-8?q?=E4=BA=8C=E7=BB=B4=E7=A0=81=E7=94=9F=E6=88=90=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 支付宝官方服务商在 precreate(当面付)不可用回退到 page.pay 时, 错误地把网页跳转 URL 当作可扫码二维码内容返回。前端用 QRCode 库 把这段 URL 渲染成二维码后,支付宝 APP 无法识别(扫到的只是个 HTTP URL,不是支付二维码),用户必须点"重新打开支付页面"跳转到支付宝 收银台才能扫到真正可用的二维码。 - 后端 alipay.go:createPagePayTrade 不再把 PayURL 塞给 QRCode; createDesktopTrade 在 paymentMode == "redirect" 时跳过 precreate 直接走 page.pay,避免没开通"当面付"的商户走一次无用的 API 调用 - 前端管理端 PaymentProviderDialog:让支付宝官方实例可在"支付模式" 中选择"跳转",开启后始终在新标签页打开支付宝收银台 - ProviderCard 的 modeLabel 增加 redirect 分支 - 补充 TestCreateTradeRedirectModeSkipsPrecreate 单元测试 Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/internal/payment/provider/alipay.go | 25 ++++++-- .../internal/payment/provider/alipay_test.go | 59 ++++++++++++++++++- .../payment/PaymentProviderDialog.vue | 50 ++++++++++++++-- .../src/components/payment/ProviderCard.vue | 3 +- .../src/components/payment/providerConfig.ts | 5 ++ 5 files changed, 130 insertions(+), 12 deletions(-) diff --git a/backend/internal/payment/provider/alipay.go b/backend/internal/payment/provider/alipay.go index 1234b568..c4c6e634 100644 --- a/backend/internal/payment/provider/alipay.go +++ b/backend/internal/payment/provider/alipay.go @@ -105,10 +105,16 @@ func (a *Alipay) MerchantIdentityMetadata() map[string]string { // CreatePayment creates an Alipay payment using the following routing: // - Mobile (H5): alipay.trade.wap.pay — browser redirect into Alipay. -// - Desktop: prefer alipay.trade.precreate to get a scan payload directly. -// - Desktop fallback: if precreate is unavailable for the merchant, fall back -// to alipay.trade.page.pay and expose both pay_url and qr_code so the -// frontend can render a QR while still allowing direct page open. +// - Desktop, default: prefer alipay.trade.precreate (FACE_TO_FACE_PAYMENT) to +// get a scannable QR payload. If precreate is unavailable for the merchant, +// fall back to alipay.trade.page.pay and expose pay_url only — the frontend +// opens the Alipay checkout in a new tab. +// - Desktop, paymentMode == "redirect": skip precreate and go straight to +// alipay.trade.page.pay so the frontend always opens the Alipay checkout +// in a new tab. Use this when the merchant has not enabled FACE_TO_FACE_PAYMENT. +// +// Note: alipay.trade.page.pay returns a checkout page URL, not a scannable +// payment QR. Never expose it via the QRCode field. func (a *Alipay) CreatePayment(ctx context.Context, req payment.CreatePaymentRequest) (*payment.CreatePaymentResponse, error) { client, err := a.getClient() if err != nil { @@ -150,6 +156,13 @@ func (a *Alipay) createWapTrade(client *alipay.Client, req payment.CreatePayment } func (a *Alipay) createDesktopTrade(ctx context.Context, client *alipay.Client, req payment.CreatePaymentRequest, notifyURL, returnURL string) (*payment.CreatePaymentResponse, error) { + // Explicit redirect mode: merchant opted into "always open the Alipay + // checkout page in a new tab" via the provider instance's payment_mode. + // Skip precreate to avoid a wasted API call. + if strings.EqualFold(strings.TrimSpace(a.config["paymentMode"]), "redirect") { + return a.createPagePayTrade(client, req, notifyURL, returnURL) + } + resp, precreateErr := a.createPrecreateTrade(ctx, client, req, notifyURL) if precreateErr == nil { return resp, nil @@ -204,10 +217,12 @@ func (a *Alipay) createPagePayTrade(client *alipay.Client, req payment.CreatePay if err != nil { return nil, fmt.Errorf("alipay TradePagePay: %w", err) } + // Only PayURL is exposed: alipay.trade.page.pay returns a checkout page URL + // that must be opened in a browser, not a scannable payment QR. Setting it + // as QRCode would let the frontend render an unscannable image. return &payment.CreatePaymentResponse{ TradeNo: req.OrderID, PayURL: payURL.String(), - QRCode: payURL.String(), }, nil } diff --git a/backend/internal/payment/provider/alipay_test.go b/backend/internal/payment/provider/alipay_test.go index fdc8eec1..9f8aec53 100644 --- a/backend/internal/payment/provider/alipay_test.go +++ b/backend/internal/payment/provider/alipay_test.go @@ -189,8 +189,63 @@ func TestCreateTradeUsesPagePayForDesktop(t *testing.T) { if resp.PayURL == "" { t.Fatal("expected pay_url for desktop page pay") } - if resp.QRCode != resp.PayURL { - t.Fatalf("qr_code = %q, want same as pay_url %q", resp.QRCode, resp.PayURL) + // page.pay returns a checkout page URL, not a scannable QR payload — + // it must never be exposed via QRCode (the frontend would render an + // unscannable image from it). + if resp.QRCode != "" { + t.Fatalf("qr_code = %q, want empty for page pay", resp.QRCode) + } +} + +// When the provider instance is configured with paymentMode == "redirect", +// the desktop flow must skip precreate and go straight to page.pay. +func TestCreateTradeRedirectModeSkipsPrecreate(t *testing.T) { + origPreCreate := alipayTradePreCreate + origPagePay := alipayTradePagePay + t.Cleanup(func() { + alipayTradePreCreate = origPreCreate + alipayTradePagePay = origPagePay + }) + + preCreateCalls := 0 + pagePayCalls := 0 + alipayTradePreCreate = func(ctx context.Context, client *alipay.Client, param alipay.TradePreCreate) (*alipay.TradePreCreateRsp, error) { + preCreateCalls++ + return &alipay.TradePreCreateRsp{ + Error: alipay.Error{Code: alipay.CodeSuccess}, + QRCode: "https://qr.alipay.example.com/precreate-token", + }, nil + } + alipayTradePagePay = func(client *alipay.Client, param alipay.TradePagePay) (*url.URL, error) { + pagePayCalls++ + if param.ProductCode != alipayProductCodePagePay { + t.Fatalf("product_code = %q, want %q", param.ProductCode, alipayProductCodePagePay) + } + return url.Parse("https://openapi.alipay.com/gateway.do?page-pay") + } + + provider := &Alipay{ + config: map[string]string{"paymentMode": "redirect"}, + } + resp, err := provider.createDesktopTrade(context.Background(), &alipay.Client{}, payment.CreatePaymentRequest{ + OrderID: "sub2_103", + Amount: "12.00", + Subject: "Balance recharge", + }, "https://merchant.example.com/api/v1/payment/webhook/alipay", "https://merchant.example.com/payment/result") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if preCreateCalls != 0 { + t.Fatalf("precreate calls = %d, want 0 (redirect mode must skip precreate)", preCreateCalls) + } + if pagePayCalls != 1 { + t.Fatalf("page pay calls = %d, want 1", pagePayCalls) + } + if resp.PayURL == "" { + t.Fatal("expected pay_url for redirect mode") + } + if resp.QRCode != "" { + t.Fatalf("qr_code = %q, want empty for redirect mode", resp.QRCode) } } diff --git a/frontend/src/components/payment/PaymentProviderDialog.vue b/frontend/src/components/payment/PaymentProviderDialog.vue index 86304cf6..b6085ed0 100644 --- a/frontend/src/components/payment/PaymentProviderDialog.vue +++ b/frontend/src/components/payment/PaymentProviderDialog.vue @@ -34,7 +34,7 @@ -
+
{{ t('admin.settings.payment.paymentMode') }}