diff --git a/backend/cmd/server/VERSION b/backend/cmd/server/VERSION index c21e67e6..c29f5f75 100644 --- a/backend/cmd/server/VERSION +++ b/backend/cmd/server/VERSION @@ -1 +1 @@ -0.1.113 +0.1.114 diff --git a/backend/internal/domain/constants.go b/backend/internal/domain/constants.go index 429486c3..a57f7067 100644 --- a/backend/internal/domain/constants.go +++ b/backend/internal/domain/constants.go @@ -71,6 +71,7 @@ const ( // 与前端 useModelWhitelist.ts 中的 antigravityDefaultMappings 保持一致 var DefaultAntigravityModelMapping = map[string]string{ // Claude 白名单 + "claude-opus-4-7": "claude-opus-4-7", // 官方模型 "claude-opus-4-6-thinking": "claude-opus-4-6-thinking", // 官方模型 "claude-opus-4-6": "claude-opus-4-6-thinking", // 简称映射 "claude-opus-4-5-thinking": "claude-opus-4-6-thinking", // 迁移旧模型 @@ -120,6 +121,7 @@ var DefaultAntigravityModelMapping = map[string]string{ // aws_region 自动调整为匹配的区域前缀(如 eu.、apac.、jp. 等) var DefaultBedrockModelMapping = map[string]string{ // Claude Opus + "claude-opus-4-7": "us.anthropic.claude-opus-4-7-v1", "claude-opus-4-6-thinking": "us.anthropic.claude-opus-4-6-v1", "claude-opus-4-6": "us.anthropic.claude-opus-4-6-v1", "claude-opus-4-5-thinking": "us.anthropic.claude-opus-4-5-20251101-v1:0", diff --git a/backend/internal/pkg/antigravity/claude_types.go b/backend/internal/pkg/antigravity/claude_types.go index 8ad1c434..89b8fab9 100644 --- a/backend/internal/pkg/antigravity/claude_types.go +++ b/backend/internal/pkg/antigravity/claude_types.go @@ -162,6 +162,7 @@ var claudeModels = []modelDef{ {ID: "claude-sonnet-4-5-thinking", DisplayName: "Claude Sonnet 4.5 Thinking", CreatedAt: "2025-09-29T00:00:00Z"}, {ID: "claude-opus-4-6", DisplayName: "Claude Opus 4.6", CreatedAt: "2026-02-05T00:00:00Z"}, {ID: "claude-opus-4-6-thinking", DisplayName: "Claude Opus 4.6 Thinking", CreatedAt: "2026-02-05T00:00:00Z"}, + {ID: "claude-opus-4-7", DisplayName: "Claude Opus 4.7", CreatedAt: "2026-04-17T00:00:00Z"}, {ID: "claude-sonnet-4-6", DisplayName: "Claude Sonnet 4.6", CreatedAt: "2026-02-17T00:00:00Z"}, } diff --git a/backend/internal/pkg/antigravity/request_transformer.go b/backend/internal/pkg/antigravity/request_transformer.go index 6d71b575..eadc6bba 100644 --- a/backend/internal/pkg/antigravity/request_transformer.go +++ b/backend/internal/pkg/antigravity/request_transformer.go @@ -1023,8 +1023,12 @@ func maxOutputTokensLimit(model string) int { return maxOutputTokensUpperBound } -func isAntigravityOpus46Model(model string) bool { - return strings.HasPrefix(strings.ToLower(model), "claude-opus-4-6") +// isAntigravityOpusHighTierModel 判断是否为高阶 Opus 模型(4.6+), +// 用于 adaptive thinking 时覆写为高预算。 +func isAntigravityOpusHighTierModel(model string) bool { + lower := strings.ToLower(model) + return strings.HasPrefix(lower, "claude-opus-4-6") || + strings.HasPrefix(lower, "claude-opus-4-7") } func buildGenerationConfig(req *ClaudeRequest) *GeminiGenerationConfig { @@ -1046,12 +1050,12 @@ func buildGenerationConfig(req *ClaudeRequest) *GeminiGenerationConfig { } // - thinking.type=enabled:budget_tokens>0 用显式预算 - // - thinking.type=adaptive:仅在 Antigravity 的 Opus 4.6 上覆写为 (24576) + // - thinking.type=adaptive:在 Antigravity 的高阶 Opus(4.6+)上覆写为 (24576) budget := -1 if req.Thinking.BudgetTokens > 0 { budget = req.Thinking.BudgetTokens } - if req.Thinking.Type == "adaptive" && isAntigravityOpus46Model(req.Model) { + if req.Thinking.Type == "adaptive" && isAntigravityOpusHighTierModel(req.Model) { budget = ClaudeAdaptiveHighThinkingBudgetTokens } diff --git a/backend/internal/pkg/claude/constants.go b/backend/internal/pkg/claude/constants.go index 1952e32b..b68c5ccd 100644 --- a/backend/internal/pkg/claude/constants.go +++ b/backend/internal/pkg/claude/constants.go @@ -341,6 +341,12 @@ var DefaultModels = []Model{ DisplayName: "Claude Opus 4.6", CreatedAt: "2026-02-06T00:00:00Z", }, + { + ID: "claude-opus-4-7", + Type: "model", + DisplayName: "Claude Opus 4.7", + CreatedAt: "2026-04-17T00:00:00Z", + }, { ID: "claude-sonnet-4-6", Type: "model", diff --git a/backend/internal/service/billing_service.go b/backend/internal/service/billing_service.go index 763abadb..32a54cbe 100644 --- a/backend/internal/service/billing_service.go +++ b/backend/internal/service/billing_service.go @@ -191,6 +191,9 @@ func (s *BillingService) initFallbackPricing() { // Claude 4.6 Opus (与4.5同价) s.fallbackPrices["claude-opus-4.6"] = s.fallbackPrices["claude-opus-4.5"] + // Claude 4.7 Opus (暂与4.6同价,待官方定价更新) + s.fallbackPrices["claude-opus-4.7"] = s.fallbackPrices["claude-opus-4.6"] + // Gemini 3.1 Pro s.fallbackPrices["gemini-3.1-pro"] = &ModelPricing{ InputPricePerToken: 2e-6, // $2 per MTok @@ -278,6 +281,9 @@ func (s *BillingService) getFallbackPricing(model string) *ModelPricing { // 按模型系列匹配 if strings.Contains(modelLower, "opus") { + if strings.Contains(modelLower, "4.7") || strings.Contains(modelLower, "4-7") { + return s.fallbackPrices["claude-opus-4.7"] + } if strings.Contains(modelLower, "4.6") || strings.Contains(modelLower, "4-6") { return s.fallbackPrices["claude-opus-4.6"] } diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index cd0f213b..91740ad0 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -5180,19 +5180,8 @@ func (s *GatewayService) handleNonStreamingResponseAnthropicAPIKeyPassthrough( s.rateLimitService.UpdateSessionWindow(ctx, account, resp.Header) } - maxBytes := resolveUpstreamResponseReadLimit(s.cfg) - body, err := readUpstreamResponseBodyLimited(resp.Body, maxBytes) + body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, anthropicTooLargeError) if err != nil { - if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { - setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") - c.JSON(http.StatusBadGateway, gin.H{ - "type": "error", - "error": gin.H{ - "type": "upstream_error", - "message": "Upstream response too large", - }, - }) - } return nil, err } @@ -5558,19 +5547,8 @@ func (s *GatewayService) handleBedrockNonStreamingResponse( c *gin.Context, account *Account, ) (*ClaudeUsage, error) { - maxBytes := resolveUpstreamResponseReadLimit(s.cfg) - body, err := readUpstreamResponseBodyLimited(resp.Body, maxBytes) + body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, anthropicTooLargeError) if err != nil { - if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { - setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") - c.JSON(http.StatusBadGateway, gin.H{ - "type": "error", - "error": gin.H{ - "type": "upstream_error", - "message": "Upstream response too large", - }, - }) - } return nil, err } @@ -7277,19 +7255,8 @@ func (s *GatewayService) handleNonStreamingResponse(ctx context.Context, resp *h // 更新5h窗口状态 s.rateLimitService.UpdateSessionWindow(ctx, account, resp.Header) - maxBytes := resolveUpstreamResponseReadLimit(s.cfg) - body, err := readUpstreamResponseBodyLimited(resp.Body, maxBytes) + body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, anthropicTooLargeError) if err != nil { - if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { - setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") - c.JSON(http.StatusBadGateway, gin.H{ - "type": "error", - "error": gin.H{ - "type": "upstream_error", - "message": "Upstream response too large", - }, - }) - } return nil, err } @@ -8402,16 +8369,15 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context, } // 读取响应体 - maxReadBytes := resolveUpstreamResponseReadLimit(s.cfg) - respBody, err := readUpstreamResponseBodyLimited(resp.Body, maxReadBytes) + countTokensTooLarge := func(c *gin.Context) { + s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Upstream response too large") + } + respBody, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, countTokensTooLarge) _ = resp.Body.Close() if err != nil { - if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { - setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") - s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Upstream response too large") - return err + if !errors.Is(err, ErrUpstreamResponseBodyTooLarge) { + s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Failed to read response") } - s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Failed to read response") return err } @@ -8425,15 +8391,12 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context, retryResp, retryErr := s.httpUpstream.DoWithTLS(retryReq, proxyURL, account.ID, account.Concurrency, s.tlsFPProfileService.ResolveTLSProfile(account)) if retryErr == nil { resp = retryResp - respBody, err = readUpstreamResponseBodyLimited(resp.Body, maxReadBytes) + respBody, err = ReadUpstreamResponseBody(resp.Body, s.cfg, c, countTokensTooLarge) _ = resp.Body.Close() if err != nil { - if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { - setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") - s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Upstream response too large") - return err + if !errors.Is(err, ErrUpstreamResponseBodyTooLarge) { + s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Failed to read response") } - s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Failed to read response") return err } } @@ -8528,16 +8491,15 @@ func (s *GatewayService) forwardCountTokensAnthropicAPIKeyPassthrough(ctx contex return fmt.Errorf("upstream request failed: %w", err) } - maxReadBytes := resolveUpstreamResponseReadLimit(s.cfg) - respBody, err := readUpstreamResponseBodyLimited(resp.Body, maxReadBytes) + countTokensTooLarge := func(c *gin.Context) { + s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Upstream response too large") + } + respBody, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, countTokensTooLarge) _ = resp.Body.Close() if err != nil { - if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { - setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") - s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Upstream response too large") - return err + if !errors.Is(err, ErrUpstreamResponseBodyTooLarge) { + s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Failed to read response") } - s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Failed to read response") return err } diff --git a/backend/internal/service/gemini_messages_compat_service.go b/backend/internal/service/gemini_messages_compat_service.go index 5a9490f3..7a24071b 100644 --- a/backend/internal/service/gemini_messages_compat_service.go +++ b/backend/internal/service/gemini_messages_compat_service.go @@ -2424,18 +2424,8 @@ func (s *GeminiMessagesCompatService) handleNativeNonStreamingResponse(c *gin.Co logger.LegacyPrintf("service.gemini_messages_compat", "[GeminiAPI] ========================================") } - maxBytes := resolveUpstreamResponseReadLimit(s.cfg) - respBody, err := readUpstreamResponseBodyLimited(resp.Body, maxBytes) + respBody, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, openAITooLargeError) if err != nil { - if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { - setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") - c.JSON(http.StatusBadGateway, gin.H{ - "error": gin.H{ - "type": "upstream_error", - "message": "Upstream response too large", - }, - }) - } return nil, err } diff --git a/backend/internal/service/openai_gateway_messages.go b/backend/internal/service/openai_gateway_messages.go index a72b9bbf..2a0a72eb 100644 --- a/backend/internal/service/openai_gateway_messages.go +++ b/backend/internal/service/openai_gateway_messages.go @@ -121,6 +121,28 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic( } } + // For API key accounts (including OpenAI-compatible upstream gateways), + // ensure promptCacheKey is also propagated via the request body so that + // upstreams using the Responses API can derive a stable session identifier + // from prompt_cache_key. This makes our Anthropic /v1/messages compatibility + // path behave more like a native Responses client. + if account.Type == AccountTypeAPIKey { + if trimmedKey := strings.TrimSpace(promptCacheKey); trimmedKey != "" { + var reqBody map[string]any + if err := json.Unmarshal(responsesBody, &reqBody); err != nil { + return nil, fmt.Errorf("unmarshal for prompt cache key injection: %w", err) + } + if existing, ok := reqBody["prompt_cache_key"].(string); !ok || strings.TrimSpace(existing) == "" { + reqBody["prompt_cache_key"] = trimmedKey + updated, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("remarshal after prompt cache key injection: %w", err) + } + responsesBody = updated + } + } + } + // 5. Get access token token, _, err := s.GetAccessToken(ctx, account) if err != nil { diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index ef97daad..064191bd 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -3010,18 +3010,8 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough( resp *http.Response, c *gin.Context, ) (*OpenAIUsage, error) { - maxBytes := resolveUpstreamResponseReadLimit(s.cfg) - body, err := readUpstreamResponseBodyLimited(resp.Body, maxBytes) + body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, openAITooLargeError) if err != nil { - if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { - setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") - c.JSON(http.StatusBadGateway, gin.H{ - "error": gin.H{ - "type": "upstream_error", - "message": "Upstream response too large", - }, - }) - } return nil, err } @@ -3919,18 +3909,8 @@ func extractOpenAIUsageFromJSONBytes(body []byte) (OpenAIUsage, bool) { } func (s *OpenAIGatewayService) handleNonStreamingResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *Account, originalModel, mappedModel string) (*OpenAIUsage, error) { - maxBytes := resolveUpstreamResponseReadLimit(s.cfg) - body, err := readUpstreamResponseBodyLimited(resp.Body, maxBytes) + body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, openAITooLargeError) if err != nil { - if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { - setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") - c.JSON(http.StatusBadGateway, gin.H{ - "error": gin.H{ - "type": "upstream_error", - "message": "Upstream response too large", - }, - }) - } return nil, err } diff --git a/backend/internal/service/pricing_service.go b/backend/internal/service/pricing_service.go index 3b3f31c3..2bf48702 100644 --- a/backend/internal/service/pricing_service.go +++ b/backend/internal/service/pricing_service.go @@ -656,65 +656,95 @@ func (s *PricingService) extractBaseName(model string) string { // matchByModelFamily 基于模型系列匹配 func (s *PricingService) matchByModelFamily(model string) *LiteLLMModelPricing { - // Claude模型系列匹配规则 - familyPatterns := map[string][]string{ - "opus-4.6": {"claude-opus-4.6", "claude-opus-4-6"}, - "opus-4.5": {"claude-opus-4.5", "claude-opus-4-5"}, - "opus-4": {"claude-opus-4", "claude-3-opus"}, - "sonnet-4.5": {"claude-sonnet-4.5", "claude-sonnet-4-5"}, - "sonnet-4": {"claude-sonnet-4", "claude-3-5-sonnet"}, - "sonnet-3.5": {"claude-3-5-sonnet", "claude-3.5-sonnet"}, - "sonnet-3": {"claude-3-sonnet"}, - "haiku-3.5": {"claude-3-5-haiku", "claude-3.5-haiku"}, - "haiku-3": {"claude-3-haiku"}, + // modelFamily 定义一个模型系列的匹配和定价查找规则。 + type modelFamily struct { + name string // 系列名称 + match []string // 用于将模型归类到此系列的模式(strings.Contains 匹配) + pricing []string // 用于在定价数据中查找价格的模式(nil 则复用 match;可包含低版本 fallback) } - // 确定模型属于哪个系列 - var matchedFamily string - for family, patterns := range familyPatterns { - for _, pattern := range patterns { + // 按特异性降序排列:高版本号在前,避免 "claude-opus-4"(opus-4 系列) + // 因子串关系误匹配 "claude-opus-4-7"(opus-4.7 系列)。 + // 注意:原 map 实现存在 Go map 迭代随机性导致的同类 bug,此处改为有序切片修复。 + families := []modelFamily{ + {name: "opus-4.7", match: []string{"claude-opus-4-7", "claude-opus-4.7"}, pricing: []string{"claude-opus-4-7", "claude-opus-4.7", "claude-opus-4-6"}}, + {name: "opus-4.6", match: []string{"claude-opus-4-6", "claude-opus-4.6"}}, + {name: "opus-4.5", match: []string{"claude-opus-4-5", "claude-opus-4.5"}}, + {name: "opus-4", match: []string{"claude-opus-4", "claude-3-opus"}}, + {name: "sonnet-4.5", match: []string{"claude-sonnet-4-5", "claude-sonnet-4.5"}}, + {name: "sonnet-4", match: []string{"claude-sonnet-4", "claude-3-5-sonnet"}}, + {name: "sonnet-3.5", match: []string{"claude-3-5-sonnet", "claude-3.5-sonnet"}}, + {name: "sonnet-3", match: []string{"claude-3-sonnet"}}, + {name: "haiku-3.5", match: []string{"claude-3-5-haiku", "claude-3.5-haiku"}}, + {name: "haiku-3", match: []string{"claude-3-haiku"}}, + } + + // Phase 1: 按有序切片归类(最具体的系列优先匹配) + var matched *modelFamily + for i := range families { + for _, pattern := range families[i].match { if strings.Contains(model, pattern) || strings.Contains(model, strings.ReplaceAll(pattern, "-", "")) { - matchedFamily = family + matched = &families[i] break } } - if matchedFamily != "" { + if matched != nil { break } } - if matchedFamily == "" { - // 简单的系列匹配 - if strings.Contains(model, "opus") { - if strings.Contains(model, "4.5") || strings.Contains(model, "4-5") { - matchedFamily = "opus-4.5" - } else { - matchedFamily = "opus-4" + // Phase 2: 二次兜底——当模型 ID 不含已知模式串时,按关键字粗分 + if matched == nil { + var fallbackName string + switch { + case strings.Contains(model, "opus"): + switch { + case strings.Contains(model, "4.7") || strings.Contains(model, "4-7"): + fallbackName = "opus-4.7" + case strings.Contains(model, "4.6") || strings.Contains(model, "4-6"): + fallbackName = "opus-4.6" + case strings.Contains(model, "4.5") || strings.Contains(model, "4-5"): + fallbackName = "opus-4.5" + default: + fallbackName = "opus-4" } - } else if strings.Contains(model, "sonnet") { - if strings.Contains(model, "4.5") || strings.Contains(model, "4-5") { - matchedFamily = "sonnet-4.5" - } else if strings.Contains(model, "3-5") || strings.Contains(model, "3.5") { - matchedFamily = "sonnet-3.5" - } else { - matchedFamily = "sonnet-4" + case strings.Contains(model, "sonnet"): + switch { + case strings.Contains(model, "4.5") || strings.Contains(model, "4-5"): + fallbackName = "sonnet-4.5" + case strings.Contains(model, "3-5") || strings.Contains(model, "3.5"): + fallbackName = "sonnet-3.5" + default: + fallbackName = "sonnet-4" } - } else if strings.Contains(model, "haiku") { - if strings.Contains(model, "3-5") || strings.Contains(model, "3.5") { - matchedFamily = "haiku-3.5" - } else { - matchedFamily = "haiku-3" + case strings.Contains(model, "haiku"): + switch { + case strings.Contains(model, "3-5") || strings.Contains(model, "3.5"): + fallbackName = "haiku-3.5" + default: + fallbackName = "haiku-3" + } + } + if fallbackName != "" { + for i := range families { + if families[i].name == fallbackName { + matched = &families[i] + break + } } } } - if matchedFamily == "" { + if matched == nil { return nil } - // 在价格数据中查找该系列的模型 - patterns := familyPatterns[matchedFamily] - for _, pattern := range patterns { + // Phase 3: 在定价数据中查找该系列的价格 + lookups := matched.pricing + if lookups == nil { + lookups = matched.match + } + for _, pattern := range lookups { for key, pricing := range s.pricingData { keyLower := strings.ToLower(key) if strings.Contains(keyLower, pattern) { diff --git a/backend/internal/service/ratelimit_service.go b/backend/internal/service/ratelimit_service.go index 4d8009b7..53581574 100644 --- a/backend/internal/service/ratelimit_service.go +++ b/backend/internal/service/ratelimit_service.go @@ -152,6 +152,11 @@ func (s *RateLimitService) HandleUpstreamError(ctx context.Context, account *Acc msg := "Credit balance exhausted (400): " + upstreamMsg s.handleAuthError(ctx, account, msg) shouldDisable = true + } else if strings.Contains(strings.ToLower(upstreamMsg), "identity verification is required") { + // KYC 身份验证要求 → 永久禁用,账号需完成身份验证后才能恢复 + msg := "Identity verification required (400): " + upstreamMsg + s.handleAuthError(ctx, account, msg) + shouldDisable = true } // 其他 400 错误(如参数问题)不处理,不禁用账号 case 401: diff --git a/backend/internal/service/scheduler_snapshot_service.go b/backend/internal/service/scheduler_snapshot_service.go index d1330abb..62b6993d 100644 --- a/backend/internal/service/scheduler_snapshot_service.go +++ b/backend/internal/service/scheduler_snapshot_service.go @@ -20,6 +20,14 @@ var ( const outboxEventTimeout = 2 * time.Minute +// batchSeenKey tracks which (groupID, platform) bucket sets have already been +// rebuilt within a single pollOutbox call, to avoid redundant work when multiple +// account_changed events share the same groups. +type batchSeenKey struct { + groupID int64 + platform string +} + type SchedulerSnapshotService struct { cache SchedulerCache outboxRepo SchedulerOutboxRepository @@ -244,9 +252,10 @@ func (s *SchedulerSnapshotService) pollOutbox() { } watermarkForCheck := watermark + seen := make(map[batchSeenKey]struct{}) for _, event := range events { eventCtx, cancel := context.WithTimeout(context.Background(), outboxEventTimeout) - err := s.handleOutboxEvent(eventCtx, event) + err := s.handleOutboxEvent(eventCtx, event, seen) cancel() if err != nil { logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox handle failed: id=%d type=%s err=%v", event.ID, event.EventType, err) @@ -255,8 +264,20 @@ func (s *SchedulerSnapshotService) pollOutbox() { } lastID := events[len(events)-1].ID - if err := s.cache.SetOutboxWatermark(ctx, lastID); err != nil { - logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox watermark write failed: %v", err) + var wmErr error + for i := range 3 { + wmCtx, wmCancel := context.WithTimeout(context.Background(), 5*time.Second) + wmErr = s.cache.SetOutboxWatermark(wmCtx, lastID) + wmCancel() + if wmErr == nil { + break + } + if i < 2 { + time.Sleep(200 * time.Millisecond) + } + } + if wmErr != nil { + logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox watermark write failed: %v", wmErr) } else { watermarkForCheck = lastID } @@ -264,18 +285,18 @@ func (s *SchedulerSnapshotService) pollOutbox() { s.checkOutboxLag(ctx, events[0], watermarkForCheck) } -func (s *SchedulerSnapshotService) handleOutboxEvent(ctx context.Context, event SchedulerOutboxEvent) error { +func (s *SchedulerSnapshotService) handleOutboxEvent(ctx context.Context, event SchedulerOutboxEvent, seen map[batchSeenKey]struct{}) error { switch event.EventType { case SchedulerOutboxEventAccountLastUsed: return s.handleLastUsedEvent(ctx, event.Payload) case SchedulerOutboxEventAccountBulkChanged: - return s.handleBulkAccountEvent(ctx, event.Payload) + return s.handleBulkAccountEvent(ctx, event.Payload, seen) case SchedulerOutboxEventAccountGroupsChanged: - return s.handleAccountEvent(ctx, event.AccountID, event.Payload) + return s.handleAccountEvent(ctx, event.AccountID, event.Payload, seen) case SchedulerOutboxEventAccountChanged: - return s.handleAccountEvent(ctx, event.AccountID, event.Payload) + return s.handleAccountEvent(ctx, event.AccountID, event.Payload, seen) case SchedulerOutboxEventGroupChanged: - return s.handleGroupEvent(ctx, event.GroupID) + return s.handleGroupEvent(ctx, event.GroupID, seen) case SchedulerOutboxEventFullRebuild: return s.triggerFullRebuild("outbox") default: @@ -309,7 +330,7 @@ func (s *SchedulerSnapshotService) handleLastUsedEvent(ctx context.Context, payl return s.cache.UpdateLastUsed(ctx, updates) } -func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, payload map[string]any) error { +func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, payload map[string]any, seen map[batchSeenKey]struct{}) error { if payload == nil { return nil } @@ -323,15 +344,15 @@ func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, p } ids := make([]int64, 0, len(rawIDs)) - seen := make(map[int64]struct{}, len(rawIDs)) + seenIDs := make(map[int64]struct{}, len(rawIDs)) for _, id := range rawIDs { if id <= 0 { continue } - if _, exists := seen[id]; exists { + if _, exists := seenIDs[id]; exists { continue } - seen[id] = struct{}{} + seenIDs[id] = struct{}{} ids = append(ids, id) } if len(ids) == 0 { @@ -384,10 +405,10 @@ func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, p for gid := range rebuildGroupSet { rebuildGroupIDs = append(rebuildGroupIDs, gid) } - return s.rebuildByGroupIDs(ctx, rebuildGroupIDs, "account_bulk_change") + return s.rebuildByGroupIDs(ctx, rebuildGroupIDs, "account_bulk_change", seen) } -func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accountID *int64, payload map[string]any) error { +func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accountID *int64, payload map[string]any, seen map[batchSeenKey]struct{}) error { if accountID == nil || *accountID <= 0 { return nil } @@ -408,7 +429,7 @@ func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accou return err } } - return s.rebuildByGroupIDs(ctx, groupIDs, "account_miss") + return s.rebuildByGroupIDs(ctx, groupIDs, "account_miss", seen) } return err } @@ -420,18 +441,18 @@ func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accou if len(groupIDs) == 0 { groupIDs = account.GroupIDs } - return s.rebuildByAccount(ctx, account, groupIDs, "account_change") + return s.rebuildByAccount(ctx, account, groupIDs, "account_change", seen) } -func (s *SchedulerSnapshotService) handleGroupEvent(ctx context.Context, groupID *int64) error { +func (s *SchedulerSnapshotService) handleGroupEvent(ctx context.Context, groupID *int64, seen map[batchSeenKey]struct{}) error { if groupID == nil || *groupID <= 0 { return nil } groupIDs := []int64{*groupID} - return s.rebuildByGroupIDs(ctx, groupIDs, "group_change") + return s.rebuildByGroupIDs(ctx, groupIDs, "group_change", seen) } -func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account *Account, groupIDs []int64, reason string) error { +func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account *Account, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error { if account == nil { return nil } @@ -441,21 +462,21 @@ func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account } var firstErr error - if err := s.rebuildBucketsForPlatform(ctx, account.Platform, groupIDs, reason); err != nil && firstErr == nil { + if err := s.rebuildBucketsForPlatform(ctx, account.Platform, groupIDs, reason, seen); err != nil && firstErr == nil { firstErr = err } if account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled() { - if err := s.rebuildBucketsForPlatform(ctx, PlatformAnthropic, groupIDs, reason); err != nil && firstErr == nil { + if err := s.rebuildBucketsForPlatform(ctx, PlatformAnthropic, groupIDs, reason, seen); err != nil && firstErr == nil { firstErr = err } - if err := s.rebuildBucketsForPlatform(ctx, PlatformGemini, groupIDs, reason); err != nil && firstErr == nil { + if err := s.rebuildBucketsForPlatform(ctx, PlatformGemini, groupIDs, reason, seen); err != nil && firstErr == nil { firstErr = err } } return firstErr } -func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupIDs []int64, reason string) error { +func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error { groupIDs = s.normalizeGroupIDs(groupIDs) if len(groupIDs) == 0 { return nil @@ -463,19 +484,30 @@ func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupI platforms := []string{PlatformAnthropic, PlatformGemini, PlatformOpenAI, PlatformAntigravity} var firstErr error for _, platform := range platforms { - if err := s.rebuildBucketsForPlatform(ctx, platform, groupIDs, reason); err != nil && firstErr == nil { + if err := s.rebuildBucketsForPlatform(ctx, platform, groupIDs, reason, seen); err != nil && firstErr == nil { firstErr = err } } return firstErr } -func (s *SchedulerSnapshotService) rebuildBucketsForPlatform(ctx context.Context, platform string, groupIDs []int64, reason string) error { +func (s *SchedulerSnapshotService) rebuildBucketsForPlatform(ctx context.Context, platform string, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error { if platform == "" { return nil } var firstErr error for _, gid := range groupIDs { + // Within a single poll batch, skip (groupID, platform) pairs that were + // already rebuilt. The first rebuild loads fresh DB data for all accounts + // in the group, so subsequent rebuilds for the same group+platform within + // the same batch are redundant. + if seen != nil { + key := batchSeenKey{gid, platform} + if _, exists := seen[key]; exists { + continue + } + seen[key] = struct{}{} + } if err := s.rebuildBucket(ctx, SchedulerBucket{GroupID: gid, Platform: platform, Mode: SchedulerModeSingle}, reason); err != nil && firstErr == nil { firstErr = err } diff --git a/backend/internal/service/upstream_response_limit.go b/backend/internal/service/upstream_response_limit.go index aecf69a3..a0444d52 100644 --- a/backend/internal/service/upstream_response_limit.go +++ b/backend/internal/service/upstream_response_limit.go @@ -4,8 +4,10 @@ import ( "errors" "fmt" "io" + "net/http" "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/gin-gonic/gin" ) var ErrUpstreamResponseBodyTooLarge = errors.New("upstream response body too large") @@ -36,3 +38,44 @@ func readUpstreamResponseBodyLimited(reader io.Reader, maxBytes int64) ([]byte, } return body, nil } + +// TooLargeWriter 在响应超限时向客户端写格式化的错误响应。 +type TooLargeWriter func(c *gin.Context) + +// ReadUpstreamResponseBody 读取上游非流式响应体。 +// 超限时自动记录 ops error 并调用 onTooLarge 向客户端写错误。 +func ReadUpstreamResponseBody(reader io.Reader, cfg *config.Config, c *gin.Context, onTooLarge TooLargeWriter) ([]byte, error) { + maxBytes := resolveUpstreamResponseReadLimit(cfg) + body, err := readUpstreamResponseBodyLimited(reader, maxBytes) + if err != nil { + if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { + setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") + if onTooLarge != nil { + onTooLarge(c) + } + } + return nil, err + } + return body, nil +} + +// anthropicTooLargeError 以 Anthropic Messages API 格式写入超限错误。 +func anthropicTooLargeError(c *gin.Context) { + c.JSON(http.StatusBadGateway, gin.H{ + "type": "error", + "error": gin.H{ + "type": "upstream_error", + "message": "Upstream response too large", + }, + }) +} + +// openAITooLargeError 以 OpenAI / Gemini 格式写入超限错误。 +func openAITooLargeError(c *gin.Context) { + c.JSON(http.StatusBadGateway, gin.H{ + "error": gin.H{ + "type": "upstream_error", + "message": "Upstream response too large", + }, + }) +} diff --git a/backend/internal/service/upstream_response_limit_test.go b/backend/internal/service/upstream_response_limit_test.go index b9e5cc6d..09283189 100644 --- a/backend/internal/service/upstream_response_limit_test.go +++ b/backend/internal/service/upstream_response_limit_test.go @@ -4,8 +4,10 @@ import ( "bytes" "errors" "testing" + "testing/iotest" "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/gin-gonic/gin" "github.com/stretchr/testify/require" ) @@ -35,3 +37,44 @@ func TestReadUpstreamResponseBodyLimited(t *testing.T) { require.True(t, errors.Is(err, ErrUpstreamResponseBodyTooLarge)) }) } + +func TestReadUpstreamResponseBody(t *testing.T) { + t.Run("within limit", func(t *testing.T) { + body, err := ReadUpstreamResponseBody(bytes.NewReader([]byte("ok")), nil, nil, nil) + require.NoError(t, err) + require.Equal(t, []byte("ok"), body) + }) + + t.Run("exceeds limit calls onTooLarge", func(t *testing.T) { + cfg := &config.Config{} + cfg.Gateway.UpstreamResponseReadMaxBytes = 3 + + called := false + onTooLarge := func(_ *gin.Context) { called = true } + + body, err := ReadUpstreamResponseBody(bytes.NewReader([]byte("toolong")), cfg, nil, onTooLarge) + require.Nil(t, body) + require.True(t, errors.Is(err, ErrUpstreamResponseBodyTooLarge)) + require.True(t, called) + }) + + t.Run("nil onTooLarge does not panic", func(t *testing.T) { + cfg := &config.Config{} + cfg.Gateway.UpstreamResponseReadMaxBytes = 3 + + body, err := ReadUpstreamResponseBody(bytes.NewReader([]byte("toolong")), cfg, nil, nil) + require.Nil(t, body) + require.True(t, errors.Is(err, ErrUpstreamResponseBodyTooLarge)) + }) + + t.Run("io error does not call onTooLarge", func(t *testing.T) { + called := false + onTooLarge := func(_ *gin.Context) { called = true } + + body, err := ReadUpstreamResponseBody(iotest.ErrReader(errors.New("disk failure")), nil, nil, onTooLarge) + require.Nil(t, body) + require.Error(t, err) + require.False(t, errors.Is(err, ErrUpstreamResponseBodyTooLarge)) + require.False(t, called) + }) +} diff --git a/frontend/src/composables/useModelWhitelist.ts b/frontend/src/composables/useModelWhitelist.ts index d16ed977..a282ae7d 100644 --- a/frontend/src/composables/useModelWhitelist.ts +++ b/frontend/src/composables/useModelWhitelist.ts @@ -43,6 +43,7 @@ export const claudeModels = [ 'claude-sonnet-4-5-20250929', 'claude-haiku-4-5-20251001', 'claude-opus-4-5-20251101', 'claude-opus-4-6', + 'claude-opus-4-7', 'claude-sonnet-4-6', 'claude-2.1', 'claude-2.0', 'claude-instant-1.2' ] @@ -66,6 +67,7 @@ const antigravityModels = [ // Claude 4.5+ 系列 'claude-opus-4-6', 'claude-opus-4-6-thinking', + 'claude-opus-4-7', 'claude-opus-4-5-thinking', 'claude-sonnet-4-6', 'claude-sonnet-4-5', @@ -250,6 +252,7 @@ const anthropicPresetMappings = [ { label: 'Sonnet 4.6', from: 'claude-sonnet-4-6', to: 'claude-sonnet-4-6', color: 'bg-indigo-100 text-indigo-700 hover:bg-indigo-200 dark:bg-indigo-900/30 dark:text-indigo-400' }, { label: 'Opus 4.5', from: 'claude-opus-4-5-20251101', to: 'claude-opus-4-5-20251101', color: 'bg-purple-100 text-purple-700 hover:bg-purple-200 dark:bg-purple-900/30 dark:text-purple-400' }, { label: 'Opus 4.6', from: 'claude-opus-4-6', to: 'claude-opus-4-6', color: 'bg-purple-100 text-purple-700 hover:bg-purple-200 dark:bg-purple-900/30 dark:text-purple-400' }, + { label: 'Opus 4.7', from: 'claude-opus-4-7', to: 'claude-opus-4-7', color: 'bg-purple-100 text-purple-700 hover:bg-purple-200 dark:bg-purple-900/30 dark:text-purple-400' }, { label: 'Haiku 3.5', from: 'claude-3-5-haiku-20241022', to: 'claude-3-5-haiku-20241022', color: 'bg-green-100 text-green-700 hover:bg-green-200 dark:bg-green-900/30 dark:text-green-400' }, { label: 'Haiku 4.5', from: 'claude-haiku-4-5-20251001', to: 'claude-haiku-4-5-20251001', color: 'bg-emerald-100 text-emerald-700 hover:bg-emerald-200 dark:bg-emerald-900/30 dark:text-emerald-400' }, { label: 'Opus->Sonnet', from: 'claude-opus-4-6', to: 'claude-sonnet-4-5-20250929', color: 'bg-amber-100 text-amber-700 hover:bg-amber-200 dark:bg-amber-900/30 dark:text-amber-400' } @@ -309,12 +312,14 @@ const antigravityPresetMappings = [ { label: 'Sonnet 4.6', from: 'claude-sonnet-4-6', to: 'claude-sonnet-4-6', color: 'bg-cyan-100 text-cyan-700 hover:bg-cyan-200 dark:bg-cyan-900/30 dark:text-cyan-400' }, { label: 'Sonnet 4.5', from: 'claude-sonnet-4-5', to: 'claude-sonnet-4-5', color: 'bg-cyan-100 text-cyan-700 hover:bg-cyan-200 dark:bg-cyan-900/30 dark:text-cyan-400' }, { label: 'Opus 4.6', from: 'claude-opus-4-6', to: 'claude-opus-4-6-thinking', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' }, - { label: 'Opus 4.6-thinking', from: 'claude-opus-4-6-thinking', to: 'claude-opus-4-6-thinking', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' } + { label: 'Opus 4.6-thinking', from: 'claude-opus-4-6-thinking', to: 'claude-opus-4-6-thinking', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' }, + { label: 'Opus 4.7', from: 'claude-opus-4-7', to: 'claude-opus-4-7', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' } ] // Bedrock 预设映射(与后端 DefaultBedrockModelMapping 保持一致) const bedrockPresetMappings = [ { label: 'Opus 4.6', from: 'claude-opus-4-6', to: 'us.anthropic.claude-opus-4-6-v1', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' }, + { label: 'Opus 4.7', from: 'claude-opus-4-7', to: 'us.anthropic.claude-opus-4-7-v1', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' }, { label: 'Sonnet 4.6', from: 'claude-sonnet-4-6', to: 'us.anthropic.claude-sonnet-4-6', color: 'bg-cyan-100 text-cyan-700 hover:bg-cyan-200 dark:bg-cyan-900/30 dark:text-cyan-400' }, { label: 'Opus 4.5', from: 'claude-opus-4-5-thinking', to: 'us.anthropic.claude-opus-4-5-20251101-v1:0', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' }, { label: 'Sonnet 4.5', from: 'claude-sonnet-4-5', to: 'us.anthropic.claude-sonnet-4-5-20250929-v1:0', color: 'bg-cyan-100 text-cyan-700 hover:bg-cyan-200 dark:bg-cyan-900/30 dark:text-cyan-400' },