From 2eb622f2f6b27c2bffbd51a52ce7ac6b8c541e73 Mon Sep 17 00:00:00 2001 From: name <136912576+is7Qin@users.noreply.github.com> Date: Tue, 19 May 2026 19:37:41 +0800 Subject: [PATCH] Remove ops retry replay storage --- backend/internal/handler/admin/ops_handler.go | 206 +---- backend/internal/handler/gateway_handler.go | 8 +- .../gateway_handler_chat_completions.go | 4 +- .../handler/gateway_handler_responses.go | 4 +- .../internal/handler/gemini_v1beta_handler.go | 2 +- .../handler/openai_chat_completions.go | 2 +- .../handler/openai_gateway_handler.go | 8 +- backend/internal/handler/openai_images.go | 8 +- backend/internal/handler/ops_error_logger.go | 90 +-- .../internal/handler/ops_error_logger_test.go | 76 -- backend/internal/repository/ops_repo.go | 458 +---------- .../ops_repo_replay_cleanup_test.go | 44 ++ backend/internal/server/routes/admin.go | 5 - .../service/antigravity_gateway_service.go | 5 - ...teway_anthropic_apikey_passthrough_test.go | 9 - backend/internal/service/gateway_service.go | 7 - .../gemini_chat_completions_compat_service.go | 4 - .../service/gemini_messages_compat_service.go | 12 - .../service/openai_gateway_service.go | 23 +- backend/internal/service/openai_images.go | 6 +- .../service/openai_images_responses.go | 2 - .../internal/service/ops_cleanup_executor.go | 4 +- .../internal/service/ops_cleanup_service.go | 1 - backend/internal/service/ops_models.go | 62 -- backend/internal/service/ops_port.go | 48 +- .../internal/service/ops_repo_mock_test.go | 18 +- backend/internal/service/ops_retry.go | 726 ------------------ .../service/ops_retry_context_test.go | 47 -- backend/internal/service/ops_service.go | 85 +- .../service/ops_service_batch_test.go | 9 +- .../service/ops_service_prepare_queue_test.go | 60 -- .../service/ops_service_redaction_test.go | 4 +- .../internal/service/ops_upstream_context.go | 31 - .../service/ops_upstream_context_test.go | 40 - backend/internal/service/slice_helpers.go | 10 + .../136_remove_ops_retry_replay.sql | 16 + 36 files changed, 133 insertions(+), 2011 deletions(-) create mode 100644 backend/internal/repository/ops_repo_replay_cleanup_test.go delete mode 100644 backend/internal/service/ops_retry.go delete mode 100644 backend/internal/service/ops_retry_context_test.go delete mode 100644 backend/internal/service/ops_service_prepare_queue_test.go create mode 100644 backend/internal/service/slice_helpers.go create mode 100644 backend/migrations/136_remove_ops_retry_replay.sql diff --git a/backend/internal/handler/admin/ops_handler.go b/backend/internal/handler/admin/ops_handler.go index 44accc8f..418c302f 100644 --- a/backend/internal/handler/admin/ops_handler.go +++ b/backend/internal/handler/admin/ops_handler.go @@ -1,9 +1,7 @@ package admin import ( - "errors" "fmt" - "io" "net/http" "strconv" "strings" @@ -384,79 +382,6 @@ func (h *OpsHandler) ListRequestErrorUpstreamErrors(c *gin.Context) { response.Paginated(c, result.Errors, int64(result.Total), result.Page, result.PageSize) } -// RetryRequestErrorClient retries the client request based on stored request body. -// POST /api/v1/admin/ops/request-errors/:id/retry-client -func (h *OpsHandler) RetryRequestErrorClient(c *gin.Context) { - if h.opsService == nil { - response.Error(c, http.StatusServiceUnavailable, "Ops service not available") - return - } - if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil { - response.ErrorFrom(c, err) - return - } - - subject, ok := middleware.GetAuthSubjectFromContext(c) - if !ok || subject.UserID <= 0 { - response.Error(c, http.StatusUnauthorized, "Unauthorized") - return - } - - idStr := strings.TrimSpace(c.Param("id")) - id, err := strconv.ParseInt(idStr, 10, 64) - if err != nil || id <= 0 { - response.BadRequest(c, "Invalid error id") - return - } - - result, err := h.opsService.RetryError(c.Request.Context(), subject.UserID, id, service.OpsRetryModeClient, nil) - if err != nil { - response.ErrorFrom(c, err) - return - } - response.Success(c, result) -} - -// RetryRequestErrorUpstreamEvent retries a specific upstream attempt using captured upstream_request_body. -// POST /api/v1/admin/ops/request-errors/:id/upstream-errors/:idx/retry -func (h *OpsHandler) RetryRequestErrorUpstreamEvent(c *gin.Context) { - if h.opsService == nil { - response.Error(c, http.StatusServiceUnavailable, "Ops service not available") - return - } - if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil { - response.ErrorFrom(c, err) - return - } - - subject, ok := middleware.GetAuthSubjectFromContext(c) - if !ok || subject.UserID <= 0 { - response.Error(c, http.StatusUnauthorized, "Unauthorized") - return - } - - idStr := strings.TrimSpace(c.Param("id")) - id, err := strconv.ParseInt(idStr, 10, 64) - if err != nil || id <= 0 { - response.BadRequest(c, "Invalid error id") - return - } - - idxStr := strings.TrimSpace(c.Param("idx")) - idx, err := strconv.Atoi(idxStr) - if err != nil || idx < 0 { - response.BadRequest(c, "Invalid upstream idx") - return - } - - result, err := h.opsService.RetryUpstreamEvent(c.Request.Context(), subject.UserID, id, idx) - if err != nil { - response.ErrorFrom(c, err) - return - } - response.Success(c, result) -} - // ResolveRequestError toggles resolved status. // PUT /api/v1/admin/ops/request-errors/:id/resolve func (h *OpsHandler) ResolveRequestError(c *gin.Context) { @@ -564,39 +489,6 @@ func (h *OpsHandler) GetUpstreamError(c *gin.Context) { h.GetErrorLogByID(c) } -// RetryUpstreamError retries upstream error using the original account_id. -// POST /api/v1/admin/ops/upstream-errors/:id/retry -func (h *OpsHandler) RetryUpstreamError(c *gin.Context) { - if h.opsService == nil { - response.Error(c, http.StatusServiceUnavailable, "Ops service not available") - return - } - if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil { - response.ErrorFrom(c, err) - return - } - - subject, ok := middleware.GetAuthSubjectFromContext(c) - if !ok || subject.UserID <= 0 { - response.Error(c, http.StatusUnauthorized, "Unauthorized") - return - } - - idStr := strings.TrimSpace(c.Param("id")) - id, err := strconv.ParseInt(idStr, 10, 64) - if err != nil || id <= 0 { - response.BadRequest(c, "Invalid error id") - return - } - - result, err := h.opsService.RetryError(c.Request.Context(), subject.UserID, id, service.OpsRetryModeUpstream, nil) - if err != nil { - response.ErrorFrom(c, err) - return - } - response.Success(c, result) -} - // ResolveUpstreamError toggles resolved status. // PUT /api/v1/admin/ops/upstream-errors/:id/resolve func (h *OpsHandler) ResolveUpstreamError(c *gin.Context) { @@ -706,106 +598,10 @@ func (h *OpsHandler) ListRequestDetails(c *gin.Context) { response.Paginated(c, out.Items, out.Total, out.Page, out.PageSize) } -type opsRetryRequest struct { - Mode string `json:"mode"` - PinnedAccountID *int64 `json:"pinned_account_id"` - Force bool `json:"force"` -} - type opsResolveRequest struct { Resolved bool `json:"resolved"` } -// RetryErrorRequest retries a failed request using stored request_body. -// POST /api/v1/admin/ops/errors/:id/retry -func (h *OpsHandler) RetryErrorRequest(c *gin.Context) { - if h.opsService == nil { - response.Error(c, http.StatusServiceUnavailable, "Ops service not available") - return - } - if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil { - response.ErrorFrom(c, err) - return - } - - subject, ok := middleware.GetAuthSubjectFromContext(c) - if !ok || subject.UserID <= 0 { - response.Error(c, http.StatusUnauthorized, "Unauthorized") - return - } - - idStr := strings.TrimSpace(c.Param("id")) - id, err := strconv.ParseInt(idStr, 10, 64) - if err != nil || id <= 0 { - response.BadRequest(c, "Invalid error id") - return - } - - req := opsRetryRequest{Mode: service.OpsRetryModeClient} - if err := c.ShouldBindJSON(&req); err != nil && !errors.Is(err, io.EOF) { - response.BadRequest(c, "Invalid request: "+err.Error()) - return - } - if strings.TrimSpace(req.Mode) == "" { - req.Mode = service.OpsRetryModeClient - } - - // Force flag is currently a UI-level acknowledgement. Server may still enforce safety constraints. - _ = req.Force - - // Legacy endpoint safety: only allow retrying the client request here. - // Upstream retries must go through the split endpoints. - if strings.EqualFold(strings.TrimSpace(req.Mode), service.OpsRetryModeUpstream) { - response.BadRequest(c, "upstream retry is not supported on this endpoint") - return - } - - result, err := h.opsService.RetryError(c.Request.Context(), subject.UserID, id, req.Mode, req.PinnedAccountID) - if err != nil { - response.ErrorFrom(c, err) - return - } - - response.Success(c, result) -} - -// ListRetryAttempts lists retry attempts for an error log. -// GET /api/v1/admin/ops/errors/:id/retries -func (h *OpsHandler) ListRetryAttempts(c *gin.Context) { - if h.opsService == nil { - response.Error(c, http.StatusServiceUnavailable, "Ops service not available") - return - } - if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil { - response.ErrorFrom(c, err) - return - } - - idStr := strings.TrimSpace(c.Param("id")) - id, err := strconv.ParseInt(idStr, 10, 64) - if err != nil || id <= 0 { - response.BadRequest(c, "Invalid error id") - return - } - - limit := 50 - if v := strings.TrimSpace(c.Query("limit")); v != "" { - n, err := strconv.Atoi(v) - if err != nil || n <= 0 { - response.BadRequest(c, "Invalid limit") - return - } - limit = n - } - - items, err := h.opsService.ListRetryAttemptsByErrorID(c.Request.Context(), id, limit) - if err != nil { - response.ErrorFrom(c, err) - return - } - response.Success(c, items) -} - // UpdateErrorResolution allows manual resolve/unresolve. // PUT /api/v1/admin/ops/errors/:id/resolve func (h *OpsHandler) UpdateErrorResolution(c *gin.Context) { @@ -837,7 +633,7 @@ func (h *OpsHandler) UpdateErrorResolution(c *gin.Context) { return } uid := subject.UserID - if err := h.opsService.UpdateErrorResolution(c.Request.Context(), id, req.Resolved, &uid, nil); err != nil { + if err := h.opsService.UpdateErrorResolution(c.Request.Context(), id, req.Resolved, &uid); err != nil { response.ErrorFrom(c, err) return } diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 0c88ebb4..733be95d 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -151,7 +151,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { return } - setOpsRequestContext(c, "", false, body) + setOpsRequestContext(c, "", false) parsedReq, err := service.ParseGatewayRequest(body, domain.PlatformAnthropic) if err != nil { @@ -184,7 +184,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { // 在请求上下文中记录 thinking 状态,供 Antigravity 最终模型 key 推导/模型维度限流使用 c.Request = c.Request.WithContext(service.WithThinkingEnabled(c.Request.Context(), parsedReq.ThinkingEnabled, h.metadataBridgeEnabled())) - setOpsRequestContext(c, reqModel, reqStream, body) + setOpsRequestContext(c, reqModel, reqStream) setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false))) // 验证 model 必填 @@ -1512,7 +1512,7 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) { return } - setOpsRequestContext(c, "", false, body) + setOpsRequestContext(c, "", false) parsedReq, err := service.ParseGatewayRequest(body, domain.PlatformAnthropic) if err != nil { @@ -1531,7 +1531,7 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) { return } - setOpsRequestContext(c, parsedReq.Model, parsedReq.Stream, body) + setOpsRequestContext(c, parsedReq.Model, parsedReq.Stream) setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(parsedReq.Stream, false))) // 获取订阅信息(可能为nil) diff --git a/backend/internal/handler/gateway_handler_chat_completions.go b/backend/internal/handler/gateway_handler_chat_completions.go index 7d2c2b1d..9a091fcd 100644 --- a/backend/internal/handler/gateway_handler_chat_completions.go +++ b/backend/internal/handler/gateway_handler_chat_completions.go @@ -60,7 +60,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) { return } - setOpsRequestContext(c, "", false, body) + setOpsRequestContext(c, "", false) // Validate JSON if !gjson.ValidBytes(body) { @@ -78,7 +78,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) { reqStream := gjson.GetBytes(body, "stream").Bool() reqLog = reqLog.With(zap.String("model", reqModel), zap.Bool("stream", reqStream)) - setOpsRequestContext(c, reqModel, reqStream, body) + setOpsRequestContext(c, reqModel, reqStream) setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false))) // 解析渠道级模型映射 diff --git a/backend/internal/handler/gateway_handler_responses.go b/backend/internal/handler/gateway_handler_responses.go index 03246f8b..e1a5b723 100644 --- a/backend/internal/handler/gateway_handler_responses.go +++ b/backend/internal/handler/gateway_handler_responses.go @@ -60,7 +60,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) { return } - setOpsRequestContext(c, "", false, body) + setOpsRequestContext(c, "", false) // Validate JSON if !gjson.ValidBytes(body) { @@ -78,7 +78,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) { reqStream := gjson.GetBytes(body, "stream").Bool() reqLog = reqLog.With(zap.String("model", reqModel), zap.Bool("stream", reqStream)) - setOpsRequestContext(c, reqModel, reqStream, body) + setOpsRequestContext(c, reqModel, reqStream) setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false))) // 解析渠道级模型映射 diff --git a/backend/internal/handler/gemini_v1beta_handler.go b/backend/internal/handler/gemini_v1beta_handler.go index 3395eeec..665c0677 100644 --- a/backend/internal/handler/gemini_v1beta_handler.go +++ b/backend/internal/handler/gemini_v1beta_handler.go @@ -184,7 +184,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { return } - setOpsRequestContext(c, modelName, stream, body) + setOpsRequestContext(c, modelName, stream) setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(stream, false))) if decision := h.checkContentModeration(c, reqLog, apiKey, authSubject, service.ContentModerationProtocolGemini, modelName, body); decision != nil && decision.Blocked { diff --git a/backend/internal/handler/openai_chat_completions.go b/backend/internal/handler/openai_chat_completions.go index f78c63a2..f7269214 100644 --- a/backend/internal/handler/openai_chat_completions.go +++ b/backend/internal/handler/openai_chat_completions.go @@ -78,7 +78,7 @@ func (h *OpenAIGatewayHandler) ChatCompletions(c *gin.Context) { reqLog = reqLog.With(zap.String("model", reqModel), zap.Bool("stream", reqStream)) - setOpsRequestContext(c, reqModel, reqStream, body) + setOpsRequestContext(c, reqModel, reqStream) setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false))) if decision := h.checkContentModeration(c, reqLog, apiKey, subject, service.ContentModerationProtocolOpenAIChat, reqModel, body); decision != nil && decision.Blocked { diff --git a/backend/internal/handler/openai_gateway_handler.go b/backend/internal/handler/openai_gateway_handler.go index d9e81d4d..e7ba699d 100644 --- a/backend/internal/handler/openai_gateway_handler.go +++ b/backend/internal/handler/openai_gateway_handler.go @@ -130,7 +130,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) { return } - setOpsRequestContext(c, "", false, body) + setOpsRequestContext(c, "", false) sessionHashBody := body if service.IsOpenAIResponsesCompactPathForTest(c) { if compactSeed := strings.TrimSpace(gjson.GetBytes(body, "prompt_cache_key").String()); compactSeed != "" { @@ -189,7 +189,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) { return } - setOpsRequestContext(c, reqModel, reqStream, body) + setOpsRequestContext(c, reqModel, reqStream) setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false))) if decision := h.checkContentModeration(c, reqLog, apiKey, subject, service.ContentModerationProtocolOpenAIResponses, reqModel, body); decision != nil && decision.Blocked { @@ -611,7 +611,7 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) { reqLog = reqLog.With(zap.String("model", reqModel), zap.Bool("stream", reqStream)) - setOpsRequestContext(c, reqModel, reqStream, body) + setOpsRequestContext(c, reqModel, reqStream) setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false))) if decision := h.checkContentModeration(c, reqLog, apiKey, subject, service.ContentModerationProtocolAnthropicMessages, reqModel, body); decision != nil && decision.Blocked { @@ -1174,7 +1174,7 @@ func (h *OpenAIGatewayHandler) ResponsesWebSocket(c *gin.Context) { zap.Bool("has_previous_response_id", previousResponseID != ""), zap.String("previous_response_id_kind", previousResponseIDKind), ) - setOpsRequestContext(c, reqModel, true, firstMessage) + setOpsRequestContext(c, reqModel, true) setOpsEndpointContext(c, "", int16(service.RequestTypeWSV2)) if decision := h.checkContentModeration(c, reqLog, apiKey, subject, service.ContentModerationProtocolOpenAIResponses, reqModel, firstMessage); decision != nil && decision.Blocked { diff --git a/backend/internal/handler/openai_images.go b/backend/internal/handler/openai_images.go index be19a035..1a81a59e 100644 --- a/backend/internal/handler/openai_images.go +++ b/backend/internal/handler/openai_images.go @@ -63,9 +63,9 @@ func (h *OpenAIGatewayHandler) Images(c *gin.Context) { } if isMultipartImagesContentType(c.GetHeader("Content-Type")) { - setOpsRequestContext(c, "", false, nil) + setOpsRequestContext(c, "", false) } else { - setOpsRequestContext(c, "", false, body) + setOpsRequestContext(c, "", false) } parsed, err := h.gatewayService.ParseOpenAIImagesRequest(c, body) @@ -98,9 +98,9 @@ func (h *OpenAIGatewayHandler) Images(c *gin.Context) { } if parsed.Multipart { - setOpsRequestContext(c, parsed.Model, parsed.Stream, nil) + setOpsRequestContext(c, parsed.Model, parsed.Stream) } else { - setOpsRequestContext(c, parsed.Model, parsed.Stream, body) + setOpsRequestContext(c, parsed.Model, parsed.Stream) } setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(parsed.Stream, false))) diff --git a/backend/internal/handler/ops_error_logger.go b/backend/internal/handler/ops_error_logger.go index c8803aab..1d34a062 100644 --- a/backend/internal/handler/ops_error_logger.go +++ b/backend/internal/handler/ops_error_logger.go @@ -25,7 +25,6 @@ import ( const ( opsModelKey = "ops_model" opsStreamKey = "ops_stream" - opsRequestBodyKey = "ops_request_body" opsAccountIDKey = "ops_account_id" opsRoutingCapacityLimitedKey = "ops_routing_capacity_limited" @@ -336,16 +335,13 @@ func opsErrorLogConfig() (workerCount int, queueSize int) { return workerCount, queueSize } -func setOpsRequestContext(c *gin.Context, model string, stream bool, requestBody []byte) { +func setOpsRequestContext(c *gin.Context, model string, stream bool) { if c == nil { return } model = strings.TrimSpace(model) c.Set(opsModelKey, model) c.Set(opsStreamKey, stream) - if len(requestBody) > 0 { - c.Set(opsRequestBodyKey, requestBody) - } if c.Request != nil && model != "" { ctx := context.WithValue(c.Request.Context(), ctxkey.Model, model) c.Request = c.Request.WithContext(ctx) @@ -364,22 +360,6 @@ func setOpsEndpointContext(c *gin.Context, upstreamModel string, requestType int c.Set(opsRequestTypeKey, requestType) } -func attachOpsRequestBodyToEntry(c *gin.Context, entry *service.OpsInsertErrorLogInput) { - if c == nil || entry == nil { - return - } - v, ok := c.Get(opsRequestBodyKey) - if !ok { - return - } - raw, ok := v.([]byte) - if !ok || len(raw) == 0 { - return - } - entry.RequestBodyJSON, entry.RequestBodyTruncated, entry.RequestBodyBytes = service.PrepareOpsRequestBodyForQueue(raw) - opsErrorLogSanitized.Add(1) -} - func setOpsSelectedAccount(c *gin.Context, accountID int64, platform ...string) { if c == nil || accountID <= 0 { return @@ -711,7 +691,7 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc { ErrorPhase: "upstream", ErrorType: "upstream_error", - // Severity/retryability should reflect the upstream failure, not the final client status (200). + // Severity should reflect the upstream failure, not the final client status (200). Severity: classifyOpsSeverity("upstream_error", effectiveUpstreamStatus), StatusCode: status, IsBusinessLimited: false, @@ -728,9 +708,7 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc { UpstreamErrorDetail: upstreamErrorDetail, UpstreamErrors: events, - IsRetryable: classifyOpsIsRetryable("upstream_error", effectiveUpstreamStatus), - RetryCount: 0, - CreatedAt: time.Now(), + CreatedAt: time.Now(), } applyOpsLatencyFieldsFromContext(c, entry) @@ -754,10 +732,6 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc { entry.ClientIP = &clientIP } - // Store request headers/body only when an upstream error occurred to keep overhead minimal. - entry.RequestHeadersJSON = extractOpsRetryRequestHeaders(c) - attachOpsRequestBodyToEntry(c, entry) - // Skip logging if a passthrough rule with skip_monitoring=true matched. if v, ok := c.Get(service.OpsSkipPassthroughKey); ok { if skip, _ := v.(bool); skip { @@ -870,9 +844,7 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc { ErrorSource: errorSource, ErrorOwner: errorOwner, - IsRetryable: classifyOpsIsRetryable(normalizedType, status), - RetryCount: 0, - CreatedAt: time.Now(), + CreatedAt: time.Now(), } applyOpsLatencyFieldsFromContext(c, entry) @@ -950,20 +922,10 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc { entry.ClientIP = &clientIP } - // Persist only a minimal, whitelisted set of request headers to improve retry fidelity. - // Do NOT store Authorization/Cookie/etc. - entry.RequestHeadersJSON = extractOpsRetryRequestHeaders(c) - attachOpsRequestBodyToEntry(c, entry) - enqueueOpsErrorLog(ops, entry) } } -var opsRetryRequestHeaderAllowlist = []string{ - "anthropic-beta", - "anthropic-version", -} - // isCountTokensRequest checks if the request is a count_tokens request func isCountTokensRequest(c *gin.Context) bool { if c == nil || c.Request == nil || c.Request.URL == nil { @@ -972,32 +934,6 @@ func isCountTokensRequest(c *gin.Context) bool { return strings.Contains(c.Request.URL.Path, "/count_tokens") } -func extractOpsRetryRequestHeaders(c *gin.Context) *string { - if c == nil || c.Request == nil { - return nil - } - - headers := make(map[string]string, 4) - for _, key := range opsRetryRequestHeaderAllowlist { - v := strings.TrimSpace(c.GetHeader(key)) - if v == "" { - continue - } - // Keep headers small even if a client sends something unexpected. - headers[key] = truncateString(v, 512) - } - if len(headers) == 0 { - return nil - } - - raw, err := json.Marshal(headers) - if err != nil { - return nil - } - s := string(raw) - return &s -} - func applyOpsLatencyFieldsFromContext(c *gin.Context, entry *service.OpsInsertErrorLogInput) { if c == nil || entry == nil { return @@ -1199,24 +1135,6 @@ func classifyOpsSeverity(errType string, status int) string { return "P3" } -func classifyOpsIsRetryable(errType string, statusCode int) bool { - switch errType { - case "authentication_error", "invalid_request_error": - return false - case "timeout_error": - return true - case "rate_limit_error": - // May be transient (upstream or queue); retry can help. - return true - case "billing_error", "subscription_error": - return false - case "upstream_error", "overloaded_error": - return statusCode >= 500 || statusCode == 429 || statusCode == 529 - default: - return statusCode >= 500 - } -} - func classifyOpsErrorLog(c *gin.Context, errType, message, code string, status int) (phase string, isBusinessLimited bool, errorOwner string, errorSource string) { phase = classifyOpsPhase(errType, message, code) routingCapacityLimited := isOpsRoutingCapacityLimited(c) diff --git a/backend/internal/handler/ops_error_logger_test.go b/backend/internal/handler/ops_error_logger_test.go index 2a141fdf..99a9af2f 100644 --- a/backend/internal/handler/ops_error_logger_test.go +++ b/backend/internal/handler/ops_error_logger_test.go @@ -44,49 +44,6 @@ func resetOpsErrorLoggerStateForTest(t *testing.T) { opsErrorLogDrained.Store(false) } -func TestAttachOpsRequestBodyToEntry_SanitizeAndTrim(t *testing.T) { - resetOpsErrorLoggerStateForTest(t) - gin.SetMode(gin.TestMode) - - rec := httptest.NewRecorder() - c, _ := gin.CreateTestContext(rec) - c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) - - raw := []byte(`{"access_token":"secret-token","messages":[{"role":"user","content":"hello"}]}`) - setOpsRequestContext(c, "claude-3", false, raw) - - entry := &service.OpsInsertErrorLogInput{} - attachOpsRequestBodyToEntry(c, entry) - - require.NotNil(t, entry.RequestBodyBytes) - require.Equal(t, len(raw), *entry.RequestBodyBytes) - require.NotNil(t, entry.RequestBodyJSON) - require.NotContains(t, *entry.RequestBodyJSON, "secret-token") - require.Contains(t, *entry.RequestBodyJSON, "[REDACTED]") - require.Equal(t, int64(1), OpsErrorLogSanitizedTotal()) -} - -func TestAttachOpsRequestBodyToEntry_InvalidJSONKeepsSize(t *testing.T) { - resetOpsErrorLoggerStateForTest(t) - gin.SetMode(gin.TestMode) - - rec := httptest.NewRecorder() - c, _ := gin.CreateTestContext(rec) - c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) - - raw := []byte("not-json") - setOpsRequestContext(c, "claude-3", false, raw) - - entry := &service.OpsInsertErrorLogInput{} - attachOpsRequestBodyToEntry(c, entry) - - require.Nil(t, entry.RequestBodyJSON) - require.NotNil(t, entry.RequestBodyBytes) - require.Equal(t, len(raw), *entry.RequestBodyBytes) - require.False(t, entry.RequestBodyTruncated) - require.Equal(t, int64(1), OpsErrorLogSanitizedTotal()) -} - func TestEnqueueOpsErrorLog_QueueFullDrop(t *testing.T) { resetOpsErrorLoggerStateForTest(t) @@ -108,39 +65,6 @@ func TestEnqueueOpsErrorLog_QueueFullDrop(t *testing.T) { require.Equal(t, int64(1), OpsErrorLogQueueLength()) } -func TestAttachOpsRequestBodyToEntry_EarlyReturnBranches(t *testing.T) { - resetOpsErrorLoggerStateForTest(t) - gin.SetMode(gin.TestMode) - - entry := &service.OpsInsertErrorLogInput{} - attachOpsRequestBodyToEntry(nil, entry) - attachOpsRequestBodyToEntry(&gin.Context{}, nil) - - rec := httptest.NewRecorder() - c, _ := gin.CreateTestContext(rec) - c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) - - // 无请求体 key - attachOpsRequestBodyToEntry(c, entry) - require.Nil(t, entry.RequestBodyJSON) - require.Nil(t, entry.RequestBodyBytes) - require.False(t, entry.RequestBodyTruncated) - - // 错误类型 - c.Set(opsRequestBodyKey, "not-bytes") - attachOpsRequestBodyToEntry(c, entry) - require.Nil(t, entry.RequestBodyJSON) - require.Nil(t, entry.RequestBodyBytes) - - // 空 bytes - c.Set(opsRequestBodyKey, []byte{}) - attachOpsRequestBodyToEntry(c, entry) - require.Nil(t, entry.RequestBodyJSON) - require.Nil(t, entry.RequestBodyBytes) - - require.Equal(t, int64(0), OpsErrorLogSanitizedTotal()) -} - func TestEnqueueOpsErrorLog_EarlyReturnBranches(t *testing.T) { resetOpsErrorLoggerStateForTest(t) diff --git a/backend/internal/repository/ops_repo.go b/backend/internal/repository/ops_repo.go index 5154b269..4371b8a2 100644 --- a/backend/internal/repository/ops_repo.go +++ b/backend/internal/repository/ops_repo.go @@ -54,15 +54,9 @@ INSERT INTO ops_error_logs ( upstream_latency_ms, response_latency_ms, time_to_first_token_ms, - request_body, - request_body_truncated, - request_body_bytes, - request_headers, - is_retryable, - retry_count, created_at ) VALUES ( - $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35,$36,$37,$38,$39,$40,$41,$42,$43 + $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35,$36,$37 )` func NewOpsRepository(db *sql.DB) service.OpsRepository { @@ -170,12 +164,6 @@ func opsInsertErrorLogArgs(input *service.OpsInsertErrorLogInput) []any { opsNullInt64(input.UpstreamLatencyMs), opsNullInt64(input.ResponseLatencyMs), opsNullInt64(input.TimeToFirstTokenMs), - opsNullString(input.RequestBodyJSON), - input.RequestBodyTruncated, - opsNullInt(input.RequestBodyBytes), - opsNullString(input.RequestHeadersJSON), - input.IsRetryable, - input.RetryCount, input.CreatedAt, } } @@ -222,13 +210,10 @@ SELECT COALESCE(e.upstream_status_code, e.status_code, 0), COALESCE(e.platform, ''), COALESCE(e.model, ''), - COALESCE(e.is_retryable, false), - COALESCE(e.retry_count, 0), COALESCE(e.resolved, false), e.resolved_at, e.resolved_by_user_id, COALESCE(u2.email, ''), - e.resolved_retry_id, COALESCE(e.client_request_id, ''), COALESCE(e.request_id, ''), COALESCE(e.error_message, ''), @@ -277,7 +262,6 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2) var resolvedAt sql.NullTime var resolvedBy sql.NullInt64 var resolvedByName string - var resolvedRetryID sql.NullInt64 var requestType sql.NullInt64 if err := rows.Scan( &item.ID, @@ -290,13 +274,10 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2) &statusCode, &item.Platform, &item.Model, - &item.IsRetryable, - &item.RetryCount, &item.Resolved, &resolvedAt, &resolvedBy, &resolvedByName, - &resolvedRetryID, &item.ClientRequestID, &item.RequestID, &item.Message, @@ -327,10 +308,6 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2) item.ResolvedByUserID = &v } item.ResolvedByUserName = resolvedByName - if resolvedRetryID.Valid { - v := resolvedRetryID.Int64 - item.ResolvedRetryID = &v - } item.StatusCode = int(statusCode.Int64) if clientIP.Valid { s := clientIP.String @@ -393,12 +370,9 @@ SELECT COALESCE(e.upstream_status_code, e.status_code, 0), COALESCE(e.platform, ''), COALESCE(e.model, ''), - COALESCE(e.is_retryable, false), - COALESCE(e.retry_count, 0), COALESCE(e.resolved, false), e.resolved_at, e.resolved_by_user_id, - e.resolved_retry_id, COALESCE(e.client_request_id, ''), COALESCE(e.request_id, ''), COALESCE(e.error_message, ''), @@ -428,11 +402,7 @@ SELECT e.routing_latency_ms, e.upstream_latency_ms, e.response_latency_ms, - e.time_to_first_token_ms, - COALESCE(e.request_body::text, ''), - e.request_body_truncated, - e.request_body_bytes, - COALESCE(e.request_headers::text, '') + e.time_to_first_token_ms FROM ops_error_logs e LEFT JOIN users u ON e.user_id = u.id LEFT JOIN accounts a ON e.account_id = a.id @@ -445,7 +415,6 @@ LIMIT 1` var upstreamStatusCode sql.NullInt64 var resolvedAt sql.NullTime var resolvedBy sql.NullInt64 - var resolvedRetryID sql.NullInt64 var clientIP sql.NullString var userID sql.NullInt64 var apiKeyID sql.NullInt64 @@ -456,7 +425,6 @@ LIMIT 1` var upstreamLatency sql.NullInt64 var responseLatency sql.NullInt64 var ttft sql.NullInt64 - var requestBodyBytes sql.NullInt64 var requestType sql.NullInt64 err := r.db.QueryRowContext(ctx, q, id).Scan( @@ -470,12 +438,9 @@ LIMIT 1` &statusCode, &out.Platform, &out.Model, - &out.IsRetryable, - &out.RetryCount, &out.Resolved, &resolvedAt, &resolvedBy, - &resolvedRetryID, &out.ClientRequestID, &out.RequestID, &out.Message, @@ -506,10 +471,6 @@ LIMIT 1` &upstreamLatency, &responseLatency, &ttft, - &out.RequestBody, - &out.RequestBodyTruncated, - &requestBodyBytes, - &out.RequestHeaders, ) if err != nil { return nil, err @@ -524,10 +485,6 @@ LIMIT 1` v := resolvedBy.Int64 out.ResolvedByUserID = &v } - if resolvedRetryID.Valid { - v := resolvedRetryID.Int64 - out.ResolvedRetryID = &v - } if clientIP.Valid { s := clientIP.String out.ClientIP = &s @@ -572,25 +529,11 @@ LIMIT 1` v := ttft.Int64 out.TimeToFirstTokenMs = &v } - if requestBodyBytes.Valid { - v := int(requestBodyBytes.Int64) - out.RequestBodyBytes = &v - } if requestType.Valid { v := int16(requestType.Int64) out.RequestType = &v } - // Normalize request_body to empty string when stored as JSON null. - out.RequestBody = strings.TrimSpace(out.RequestBody) - if out.RequestBody == "null" { - out.RequestBody = "" - } - // Normalize request_headers to empty string when stored as JSON null. - out.RequestHeaders = strings.TrimSpace(out.RequestHeaders) - if out.RequestHeaders == "null" { - out.RequestHeaders = "" - } // Normalize upstream_errors to empty string when stored as JSON null. out.UpstreamErrors = strings.TrimSpace(out.UpstreamErrors) if out.UpstreamErrors == "null" { @@ -600,398 +543,7 @@ LIMIT 1` return &out, nil } -func (r *opsRepository) InsertRetryAttempt(ctx context.Context, input *service.OpsInsertRetryAttemptInput) (int64, error) { - if r == nil || r.db == nil { - return 0, fmt.Errorf("nil ops repository") - } - if input == nil { - return 0, fmt.Errorf("nil input") - } - if input.SourceErrorID <= 0 { - return 0, fmt.Errorf("invalid source_error_id") - } - if strings.TrimSpace(input.Mode) == "" { - return 0, fmt.Errorf("invalid mode") - } - - q := ` -INSERT INTO ops_retry_attempts ( - requested_by_user_id, - source_error_id, - mode, - pinned_account_id, - status, - started_at -) VALUES ( - $1,$2,$3,$4,$5,$6 -) RETURNING id` - - var id int64 - err := r.db.QueryRowContext( - ctx, - q, - opsNullInt64(&input.RequestedByUserID), - input.SourceErrorID, - strings.TrimSpace(input.Mode), - opsNullInt64(input.PinnedAccountID), - strings.TrimSpace(input.Status), - input.StartedAt, - ).Scan(&id) - if err != nil { - return 0, err - } - return id, nil -} - -func (r *opsRepository) UpdateRetryAttempt(ctx context.Context, input *service.OpsUpdateRetryAttemptInput) error { - if r == nil || r.db == nil { - return fmt.Errorf("nil ops repository") - } - if input == nil { - return fmt.Errorf("nil input") - } - if input.ID <= 0 { - return fmt.Errorf("invalid id") - } - - q := ` -UPDATE ops_retry_attempts -SET - status = $2, - finished_at = $3, - duration_ms = $4, - success = $5, - http_status_code = $6, - upstream_request_id = $7, - used_account_id = $8, - response_preview = $9, - response_truncated = $10, - result_request_id = $11, - result_error_id = $12, - error_message = $13 -WHERE id = $1` - - _, err := r.db.ExecContext( - ctx, - q, - input.ID, - strings.TrimSpace(input.Status), - nullTime(input.FinishedAt), - input.DurationMs, - nullBool(input.Success), - nullInt(input.HTTPStatusCode), - opsNullString(input.UpstreamRequestID), - nullInt64(input.UsedAccountID), - opsNullString(input.ResponsePreview), - nullBool(input.ResponseTruncated), - opsNullString(input.ResultRequestID), - nullInt64(input.ResultErrorID), - opsNullString(input.ErrorMessage), - ) - return err -} - -func (r *opsRepository) GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*service.OpsRetryAttempt, error) { - if r == nil || r.db == nil { - return nil, fmt.Errorf("nil ops repository") - } - if sourceErrorID <= 0 { - return nil, fmt.Errorf("invalid source_error_id") - } - - q := ` -SELECT - id, - created_at, - COALESCE(requested_by_user_id, 0), - source_error_id, - COALESCE(mode, ''), - pinned_account_id, - COALESCE(status, ''), - started_at, - finished_at, - duration_ms, - success, - http_status_code, - upstream_request_id, - used_account_id, - response_preview, - response_truncated, - result_request_id, - result_error_id, - error_message -FROM ops_retry_attempts -WHERE source_error_id = $1 -ORDER BY created_at DESC -LIMIT 1` - - var out service.OpsRetryAttempt - var pinnedAccountID sql.NullInt64 - var requestedBy sql.NullInt64 - var startedAt sql.NullTime - var finishedAt sql.NullTime - var durationMs sql.NullInt64 - var success sql.NullBool - var httpStatusCode sql.NullInt64 - var upstreamRequestID sql.NullString - var usedAccountID sql.NullInt64 - var responsePreview sql.NullString - var responseTruncated sql.NullBool - var resultRequestID sql.NullString - var resultErrorID sql.NullInt64 - var errorMessage sql.NullString - - err := r.db.QueryRowContext(ctx, q, sourceErrorID).Scan( - &out.ID, - &out.CreatedAt, - &requestedBy, - &out.SourceErrorID, - &out.Mode, - &pinnedAccountID, - &out.Status, - &startedAt, - &finishedAt, - &durationMs, - &success, - &httpStatusCode, - &upstreamRequestID, - &usedAccountID, - &responsePreview, - &responseTruncated, - &resultRequestID, - &resultErrorID, - &errorMessage, - ) - if err != nil { - return nil, err - } - out.RequestedByUserID = requestedBy.Int64 - if pinnedAccountID.Valid { - v := pinnedAccountID.Int64 - out.PinnedAccountID = &v - } - if startedAt.Valid { - t := startedAt.Time - out.StartedAt = &t - } - if finishedAt.Valid { - t := finishedAt.Time - out.FinishedAt = &t - } - if durationMs.Valid { - v := durationMs.Int64 - out.DurationMs = &v - } - if success.Valid { - v := success.Bool - out.Success = &v - } - if httpStatusCode.Valid { - v := int(httpStatusCode.Int64) - out.HTTPStatusCode = &v - } - if upstreamRequestID.Valid { - s := upstreamRequestID.String - out.UpstreamRequestID = &s - } - if usedAccountID.Valid { - v := usedAccountID.Int64 - out.UsedAccountID = &v - } - if responsePreview.Valid { - s := responsePreview.String - out.ResponsePreview = &s - } - if responseTruncated.Valid { - v := responseTruncated.Bool - out.ResponseTruncated = &v - } - if resultRequestID.Valid { - s := resultRequestID.String - out.ResultRequestID = &s - } - if resultErrorID.Valid { - v := resultErrorID.Int64 - out.ResultErrorID = &v - } - if errorMessage.Valid { - s := errorMessage.String - out.ErrorMessage = &s - } - - return &out, nil -} - -func nullTime(t time.Time) sql.NullTime { - if t.IsZero() { - return sql.NullTime{} - } - return sql.NullTime{Time: t, Valid: true} -} - -func nullBool(v *bool) sql.NullBool { - if v == nil { - return sql.NullBool{} - } - return sql.NullBool{Bool: *v, Valid: true} -} - -func (r *opsRepository) ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*service.OpsRetryAttempt, error) { - if r == nil || r.db == nil { - return nil, fmt.Errorf("nil ops repository") - } - if sourceErrorID <= 0 { - return nil, fmt.Errorf("invalid source_error_id") - } - if limit <= 0 { - limit = 50 - } - if limit > 200 { - limit = 200 - } - - q := ` -SELECT - r.id, - r.created_at, - COALESCE(r.requested_by_user_id, 0), - r.source_error_id, - COALESCE(r.mode, ''), - r.pinned_account_id, - COALESCE(pa.name, ''), - COALESCE(r.status, ''), - r.started_at, - r.finished_at, - r.duration_ms, - r.success, - r.http_status_code, - r.upstream_request_id, - r.used_account_id, - COALESCE(ua.name, ''), - r.response_preview, - r.response_truncated, - r.result_request_id, - r.result_error_id, - r.error_message -FROM ops_retry_attempts r -LEFT JOIN accounts pa ON r.pinned_account_id = pa.id -LEFT JOIN accounts ua ON r.used_account_id = ua.id -WHERE r.source_error_id = $1 -ORDER BY r.created_at DESC -LIMIT $2` - - rows, err := r.db.QueryContext(ctx, q, sourceErrorID, limit) - if err != nil { - return nil, err - } - defer func() { _ = rows.Close() }() - - out := make([]*service.OpsRetryAttempt, 0, 16) - for rows.Next() { - var item service.OpsRetryAttempt - var pinnedAccountID sql.NullInt64 - var pinnedAccountName string - var requestedBy sql.NullInt64 - var startedAt sql.NullTime - var finishedAt sql.NullTime - var durationMs sql.NullInt64 - var success sql.NullBool - var httpStatusCode sql.NullInt64 - var upstreamRequestID sql.NullString - var usedAccountID sql.NullInt64 - var usedAccountName string - var responsePreview sql.NullString - var responseTruncated sql.NullBool - var resultRequestID sql.NullString - var resultErrorID sql.NullInt64 - var errorMessage sql.NullString - - if err := rows.Scan( - &item.ID, - &item.CreatedAt, - &requestedBy, - &item.SourceErrorID, - &item.Mode, - &pinnedAccountID, - &pinnedAccountName, - &item.Status, - &startedAt, - &finishedAt, - &durationMs, - &success, - &httpStatusCode, - &upstreamRequestID, - &usedAccountID, - &usedAccountName, - &responsePreview, - &responseTruncated, - &resultRequestID, - &resultErrorID, - &errorMessage, - ); err != nil { - return nil, err - } - - item.RequestedByUserID = requestedBy.Int64 - if pinnedAccountID.Valid { - v := pinnedAccountID.Int64 - item.PinnedAccountID = &v - } - item.PinnedAccountName = pinnedAccountName - if startedAt.Valid { - t := startedAt.Time - item.StartedAt = &t - } - if finishedAt.Valid { - t := finishedAt.Time - item.FinishedAt = &t - } - if durationMs.Valid { - v := durationMs.Int64 - item.DurationMs = &v - } - if success.Valid { - v := success.Bool - item.Success = &v - } - if httpStatusCode.Valid { - v := int(httpStatusCode.Int64) - item.HTTPStatusCode = &v - } - if upstreamRequestID.Valid { - item.UpstreamRequestID = &upstreamRequestID.String - } - if usedAccountID.Valid { - v := usedAccountID.Int64 - item.UsedAccountID = &v - } - item.UsedAccountName = usedAccountName - if responsePreview.Valid { - item.ResponsePreview = &responsePreview.String - } - if responseTruncated.Valid { - v := responseTruncated.Bool - item.ResponseTruncated = &v - } - if resultRequestID.Valid { - item.ResultRequestID = &resultRequestID.String - } - if resultErrorID.Valid { - v := resultErrorID.Int64 - item.ResultErrorID = &v - } - if errorMessage.Valid { - item.ErrorMessage = &errorMessage.String - } - out = append(out, &item) - } - if err := rows.Err(); err != nil { - return nil, err - } - return out, nil -} - -func (r *opsRepository) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error { +func (r *opsRepository) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedAt *time.Time) error { if r == nil || r.db == nil { return fmt.Errorf("nil ops repository") } @@ -1004,8 +556,7 @@ UPDATE ops_error_logs SET resolved = $2, resolved_at = $3, - resolved_by_user_id = $4, - resolved_retry_id = $5 + resolved_by_user_id = $4 WHERE id = $1` at := sql.NullTime{} @@ -1023,7 +574,6 @@ WHERE id = $1` resolved, at, nullInt64(resolvedByUserID), - nullInt64(resolvedRetryID), ) return err } diff --git a/backend/internal/repository/ops_repo_replay_cleanup_test.go b/backend/internal/repository/ops_repo_replay_cleanup_test.go new file mode 100644 index 00000000..a6a15e9a --- /dev/null +++ b/backend/internal/repository/ops_repo_replay_cleanup_test.go @@ -0,0 +1,44 @@ +package repository + +import ( + "reflect" + "strings" + "testing" + + "github.com/Wei-Shaw/sub2api/internal/service" +) + +func TestOpsErrorLogInsertDoesNotPersistRequestReplayFields(t *testing.T) { + disallowedColumns := []string{ + "request_body", + "request_headers", + "request_body_truncated", + "request_body_bytes", + "is_retryable", + "retry_count", + "resolved_retry_id", + } + + insertSQL := strings.ToLower(insertOpsErrorLogSQL) + for _, column := range disallowedColumns { + if strings.Contains(insertSQL, column) { + t.Fatalf("ops error log insert still references dropped replay column %q", column) + } + } + + inputType := reflect.TypeOf(service.OpsInsertErrorLogInput{}) + disallowedFields := []string{ + "RequestBodyJSON", + "RequestBodyTruncated", + "RequestBodyBytes", + "RequestHeadersJSON", + "IsRetryable", + "RetryCount", + "ResolvedRetryID", + } + for _, field := range disallowedFields { + if _, ok := inputType.FieldByName(field); ok { + t.Fatalf("OpsInsertErrorLogInput still carries replay field %q", field) + } + } +} diff --git a/backend/internal/server/routes/admin.go b/backend/internal/server/routes/admin.go index 92e2f5b6..5a85ba87 100644 --- a/backend/internal/server/routes/admin.go +++ b/backend/internal/server/routes/admin.go @@ -174,22 +174,17 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) { // Error logs (legacy) ops.GET("/errors", h.Admin.Ops.GetErrorLogs) ops.GET("/errors/:id", h.Admin.Ops.GetErrorLogByID) - ops.GET("/errors/:id/retries", h.Admin.Ops.ListRetryAttempts) - ops.POST("/errors/:id/retry", h.Admin.Ops.RetryErrorRequest) ops.PUT("/errors/:id/resolve", h.Admin.Ops.UpdateErrorResolution) // Request errors (client-visible failures) ops.GET("/request-errors", h.Admin.Ops.ListRequestErrors) ops.GET("/request-errors/:id", h.Admin.Ops.GetRequestError) ops.GET("/request-errors/:id/upstream-errors", h.Admin.Ops.ListRequestErrorUpstreamErrors) - ops.POST("/request-errors/:id/retry-client", h.Admin.Ops.RetryRequestErrorClient) - ops.POST("/request-errors/:id/upstream-errors/:idx/retry", h.Admin.Ops.RetryRequestErrorUpstreamEvent) ops.PUT("/request-errors/:id/resolve", h.Admin.Ops.ResolveRequestError) // Upstream errors (independent upstream failures) ops.GET("/upstream-errors", h.Admin.Ops.ListUpstreamErrors) ops.GET("/upstream-errors/:id", h.Admin.Ops.GetUpstreamError) - ops.POST("/upstream-errors/:id/retry", h.Admin.Ops.RetryUpstreamError) ops.PUT("/upstream-errors/:id/resolve", h.Admin.Ops.ResolveUpstreamError) // Request drilldown (success + error) diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index cfae171d..951f324c 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -628,11 +628,6 @@ urlFallbackLoop: return nil, err } - // Capture upstream request body for ops retry of this attempt. - if p.c != nil && len(p.body) > 0 { - p.c.Set(OpsUpstreamRequestBodyKey, string(p.body)) - } - resp, err = p.httpUpstream.Do(upstreamReq, p.proxyURL, p.account.ID, p.account.Concurrency) if err == nil && resp == nil { err = errors.New("upstream returned nil response") diff --git a/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go b/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go index 87cfc4f8..5cb03f30 100644 --- a/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go +++ b/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go @@ -188,11 +188,6 @@ func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardStreamPreservesBodyAnd require.NotContains(t, rec.Body.String(), `"cache_read_input_tokens":7`, "透传输出不应被网关改写") require.Equal(t, 7, result.Usage.CacheReadInputTokens, "计费 usage 解析应保留 cached_tokens 兼容") require.Empty(t, rec.Header().Get("Set-Cookie"), "响应头应经过安全过滤") - rawBody, ok := c.Get(OpsUpstreamRequestBodyKey) - require.True(t, ok) - bodyBytes, ok := rawBody.([]byte) - require.True(t, ok, "应以 []byte 形式缓存上游请求体,避免重复 string 拷贝") - require.Equal(t, "claude-3-haiku-20240307", gjson.GetBytes(bodyBytes, "model").String(), "缓存的上游请求体应包含映射后的模型") } func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardCountTokensPreservesBody(t *testing.T) { @@ -938,10 +933,6 @@ func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardDirect_UpstreamRequest require.Error(t, err) require.Contains(t, err.Error(), "upstream request failed") require.Equal(t, http.StatusBadGateway, rec.Code) - rawBody, ok := c.Get(OpsUpstreamRequestBodyKey) - require.True(t, ok) - _, ok = rawBody.([]byte) - require.True(t, ok) } func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardDirect_EmptyResponseBody(t *testing.T) { diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 8180e321..afc19adc 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -4525,9 +4525,6 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A // Pre-filter: strip empty text blocks (including nested in tool_result) to prevent upstream 400. body = StripEmptyTextBlocks(body) - // 重试间复用同一请求体,避免每次 string(body) 产生额外分配。 - setOpsUpstreamRequestBody(c, body) - // 重试循环 var resp *http.Response retryStart := time.Now() @@ -5018,9 +5015,6 @@ func (s *GatewayService) forwardAnthropicAPIKeyPassthroughWithInput( // Pre-filter: strip empty text blocks (including nested in tool_result) to prevent upstream 400. input.Body = StripEmptyTextBlocks(input.Body) - // 重试间复用同一请求体,避免每次 string(body) 产生额外分配。 - setOpsUpstreamRequestBody(c, input.Body) - var resp *http.Response retryStart := time.Now() for attempt := 1; attempt <= maxRetryAttempts; attempt++ { @@ -6201,7 +6195,6 @@ func (s *GatewayService) buildUpstreamRequestAnthropicVertex( if err != nil { return nil, err } - setOpsUpstreamRequestBody(c, vertexBody) fullURL, err := buildVertexAnthropicURL(account.VertexProjectID(), account.VertexLocation(modelID), modelID, reqStream) if err != nil { return nil, err diff --git a/backend/internal/service/gemini_chat_completions_compat_service.go b/backend/internal/service/gemini_chat_completions_compat_service.go index 5ea02df5..dcc3213b 100644 --- a/backend/internal/service/gemini_chat_completions_compat_service.go +++ b/backend/internal/service/gemini_chat_completions_compat_service.go @@ -123,10 +123,6 @@ func (s *GeminiMessagesCompatService) forwardClaudeBodyAsChatCompletions( } requestIDHeader = idHeader - if c != nil { - c.Set(OpsUpstreamRequestBodyKey, string(geminiReq)) - } - resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { safeErr := sanitizeUpstreamErrorMessage(err.Error()) diff --git a/backend/internal/service/gemini_messages_compat_service.go b/backend/internal/service/gemini_messages_compat_service.go index 4d6fa47b..516556ca 100644 --- a/backend/internal/service/gemini_messages_compat_service.go +++ b/backend/internal/service/gemini_messages_compat_service.go @@ -766,12 +766,6 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex } requestIDHeader = idHeader - // Capture upstream request body for ops retry of this attempt. - if c != nil { - // In this code path `body` is already the JSON sent to upstream. - c.Set(OpsUpstreamRequestBodyKey, string(body)) - } - resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { safeErr := sanitizeUpstreamErrorMessage(err.Error()) @@ -1293,12 +1287,6 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin. } requestIDHeader = idHeader - // Capture upstream request body for ops retry of this attempt. - if c != nil { - // In this code path `body` is already the JSON sent to upstream. - c.Set(OpsUpstreamRequestBodyKey, string(body)) - } - resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { safeErr := sanitizeUpstreamErrorMessage(err.Error()) diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index 3e09b33e..cfaf5bff 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -2473,9 +2473,6 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco return nil, err } - // Capture upstream request body for ops retry of this attempt. - setOpsUpstreamRequestBody(c, body) - // 命中 WS 时仅走 WebSocket Mode;不再自动回退 HTTP。 if wsDecision.Transport == OpenAIUpstreamTransportResponsesWebsocketV2 { wsReqBody := reqBody @@ -2748,7 +2745,6 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco if err != nil { return nil, fmt.Errorf("serialize invalid_encrypted_content retry body: %w", err) } - setOpsUpstreamRequestBody(c, body) httpInvalidEncryptedContentRetryTried = true logger.LegacyPrintf("service.openai_gateway", "[OpenAI] Retrying non-WSv2 request once after invalid_encrypted_content (account: %s)", account.Name) continue @@ -2786,6 +2782,10 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco } defer func() { _ = resp.Body.Close() }() + reasoningEffort := extractOpenAIReasoningEffort(reqBody, originalModel) + serviceTier := extractOpenAIServiceTier(reqBody) + releaseOpenAIParsedRequestBody(c) + // Handle normal response var usage *OpenAIUsage var firstTokenMs *int @@ -2821,9 +2821,6 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco usage = &OpenAIUsage{} } - reasoningEffort := extractOpenAIReasoningEffort(reqBody, originalModel) - serviceTier := extractOpenAIServiceTier(reqBody) - forwardResult := &OpenAIForwardResult{ RequestID: resp.Header.Get("x-request-id"), Usage: *usage, @@ -3006,7 +3003,6 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough( proxyURL = account.Proxy.URL() } - setOpsUpstreamRequestBody(c, body) if c != nil { c.Set("openai_passthrough", true) } @@ -3045,6 +3041,8 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough( return nil, s.handleErrorResponsePassthrough(ctx, resp, c, account, body) } + serviceTier := extractOpenAIServiceTierFromBody(body) + var usage *OpenAIUsage var firstTokenMs *int imageCount := 0 @@ -3081,7 +3079,7 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough( Usage: *usage, Model: reqModel, UpstreamModel: upstreamPassthroughModel, - ServiceTier: extractOpenAIServiceTierFromBody(body), + ServiceTier: serviceTier, ReasoningEffort: reasoningEffort, Stream: reqStream, OpenAIWSMode: false, @@ -6503,6 +6501,13 @@ func getOpenAIRequestBodyMap(c *gin.Context, body []byte) (map[string]any, error return reqBody, nil } +func releaseOpenAIParsedRequestBody(c *gin.Context) { + if c == nil { + return + } + delete(c.Keys, OpenAIParsedRequestBodyKey) +} + func extractOpenAIReasoningEffort(reqBody map[string]any, requestedModel string) *string { if value, present := getOpenAIReasoningEffortFromReqBody(reqBody); present { if value == "" { diff --git a/backend/internal/service/openai_images.go b/backend/internal/service/openai_images.go index 783a44e9..95c054c9 100644 --- a/backend/internal/service/openai_images.go +++ b/backend/internal/service/openai_images.go @@ -588,11 +588,7 @@ func (s *OpenAIGatewayService) forwardOpenAIImagesAPIKey( if err != nil { return nil, err } - if !parsed.Multipart { - setOpsUpstreamRequestBody(c, forwardBody) - } - - upstreamCtx, releaseUpstreamCtx := detachUpstreamContext(ctx) + upstreamCtx, releaseUpstreamCtx := detachStreamUpstreamContext(ctx, parsed.Stream) defer releaseUpstreamCtx() token, _, err := s.GetAccessToken(upstreamCtx, account) diff --git a/backend/internal/service/openai_images_responses.go b/backend/internal/service/openai_images_responses.go index 96c2e0d8..c89c2aaf 100644 --- a/backend/internal/service/openai_images_responses.go +++ b/backend/internal/service/openai_images_responses.go @@ -979,8 +979,6 @@ func (s *OpenAIGatewayService) forwardOpenAIImagesOAuth( if err != nil { return nil, err } - setOpsUpstreamRequestBody(c, responsesBody) - upstreamReq, err := s.buildUpstreamRequest(upstreamCtx, c, account, responsesBody, token, true, parsed.StickySessionSeed(), false) if err != nil { return nil, err diff --git a/backend/internal/service/ops_cleanup_executor.go b/backend/internal/service/ops_cleanup_executor.go index 63a7367f..d51863c4 100644 --- a/backend/internal/service/ops_cleanup_executor.go +++ b/backend/internal/service/ops_cleanup_executor.go @@ -26,7 +26,6 @@ type opsCleanupTarget struct { type opsCleanupDeletedCounts struct { errorLogs int64 - retryAttempts int64 alertEvents int64 systemLogs int64 logAudits int64 @@ -37,9 +36,8 @@ type opsCleanupDeletedCounts struct { func (c opsCleanupDeletedCounts) String() string { return fmt.Sprintf( - "error_logs=%d retry_attempts=%d alert_events=%d system_logs=%d log_audits=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d", + "error_logs=%d alert_events=%d system_logs=%d log_audits=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d", c.errorLogs, - c.retryAttempts, c.alertEvents, c.systemLogs, c.logAudits, diff --git a/backend/internal/service/ops_cleanup_service.go b/backend/internal/service/ops_cleanup_service.go index 60a690f3..f812c290 100644 --- a/backend/internal/service/ops_cleanup_service.go +++ b/backend/internal/service/ops_cleanup_service.go @@ -299,7 +299,6 @@ func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDelet targets := []opsCleanupTarget{ {effective.ErrorLogRetentionDays, "ops_error_logs", "created_at", false, &out.errorLogs}, - {effective.ErrorLogRetentionDays, "ops_retry_attempts", "created_at", false, &out.retryAttempts}, {effective.ErrorLogRetentionDays, "ops_alert_events", "created_at", false, &out.alertEvents}, {effective.ErrorLogRetentionDays, "ops_system_logs", "created_at", false, &out.systemLogs}, {effective.ErrorLogRetentionDays, "ops_system_log_cleanup_audits", "created_at", false, &out.logAudits}, diff --git a/backend/internal/service/ops_models.go b/backend/internal/service/ops_models.go index 5fefb74f..ba735346 100644 --- a/backend/internal/service/ops_models.go +++ b/backend/internal/service/ops_models.go @@ -37,14 +37,10 @@ type OpsErrorLog struct { Platform string `json:"platform"` Model string `json:"model"` - IsRetryable bool `json:"is_retryable"` - RetryCount int `json:"retry_count"` - Resolved bool `json:"resolved"` ResolvedAt *time.Time `json:"resolved_at"` ResolvedByUserID *int64 `json:"resolved_by_user_id"` ResolvedByUserName string `json:"resolved_by_user_name"` - ResolvedRetryID *int64 `json:"resolved_retry_id"` ResolvedStatusRaw string `json:"-"` ClientRequestID string `json:"client_request_id"` @@ -89,12 +85,6 @@ type OpsErrorLogDetail struct { ResponseLatencyMs *int64 `json:"response_latency_ms"` TimeToFirstTokenMs *int64 `json:"time_to_first_token_ms"` - // Retry context - RequestBody string `json:"request_body"` - RequestBodyTruncated bool `json:"request_body_truncated"` - RequestBodyBytes *int `json:"request_body_bytes"` - RequestHeaders string `json:"request_headers,omitempty"` - // vNext metric semantics IsBusinessLimited bool `json:"is_business_limited"` } @@ -136,55 +126,3 @@ type OpsErrorLogList struct { Page int `json:"page"` PageSize int `json:"page_size"` } - -type OpsRetryAttempt struct { - ID int64 `json:"id"` - CreatedAt time.Time `json:"created_at"` - - RequestedByUserID int64 `json:"requested_by_user_id"` - SourceErrorID int64 `json:"source_error_id"` - Mode string `json:"mode"` - PinnedAccountID *int64 `json:"pinned_account_id"` - PinnedAccountName string `json:"pinned_account_name"` - - Status string `json:"status"` - StartedAt *time.Time `json:"started_at"` - FinishedAt *time.Time `json:"finished_at"` - DurationMs *int64 `json:"duration_ms"` - - // Persisted execution results (best-effort) - Success *bool `json:"success"` - HTTPStatusCode *int `json:"http_status_code"` - UpstreamRequestID *string `json:"upstream_request_id"` - UsedAccountID *int64 `json:"used_account_id"` - UsedAccountName string `json:"used_account_name"` - ResponsePreview *string `json:"response_preview"` - ResponseTruncated *bool `json:"response_truncated"` - - // Optional correlation - ResultRequestID *string `json:"result_request_id"` - ResultErrorID *int64 `json:"result_error_id"` - - ErrorMessage *string `json:"error_message"` -} - -type OpsRetryResult struct { - AttemptID int64 `json:"attempt_id"` - Mode string `json:"mode"` - Status string `json:"status"` - - PinnedAccountID *int64 `json:"pinned_account_id"` - UsedAccountID *int64 `json:"used_account_id"` - - HTTPStatusCode int `json:"http_status_code"` - UpstreamRequestID string `json:"upstream_request_id"` - - ResponsePreview string `json:"response_preview"` - ResponseTruncated bool `json:"response_truncated"` - - ErrorMessage string `json:"error_message"` - - StartedAt time.Time `json:"started_at"` - FinishedAt time.Time `json:"finished_at"` - DurationMs int64 `json:"duration_ms"` -} diff --git a/backend/internal/service/ops_port.go b/backend/internal/service/ops_port.go index 04bf91c8..30145ed3 100644 --- a/backend/internal/service/ops_port.go +++ b/backend/internal/service/ops_port.go @@ -16,11 +16,7 @@ type OpsRepository interface { DeleteSystemLogs(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error) InsertSystemLogCleanupAudit(ctx context.Context, input *OpsSystemLogCleanupAudit) error - InsertRetryAttempt(ctx context.Context, input *OpsInsertRetryAttemptInput) (int64, error) - UpdateRetryAttempt(ctx context.Context, input *OpsUpdateRetryAttemptInput) error - GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*OpsRetryAttempt, error) - ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*OpsRetryAttempt, error) - UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error + UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedAt *time.Time) error // Lightweight window stats (for realtime WS / quick sampling). GetWindowStats(ctx context.Context, filter *OpsDashboardFilter) (*OpsWindowStats, error) @@ -121,51 +117,9 @@ type OpsInsertErrorLogInput struct { ResponseLatencyMs *int64 TimeToFirstTokenMs *int64 - RequestBodyJSON *string // sanitized json string (not raw bytes) - RequestBodyTruncated bool - RequestBodyBytes *int - RequestHeadersJSON *string // optional json string - - IsRetryable bool - RetryCount int - CreatedAt time.Time } -type OpsInsertRetryAttemptInput struct { - RequestedByUserID int64 - SourceErrorID int64 - Mode string - PinnedAccountID *int64 - - // running|queued etc. - Status string - StartedAt time.Time -} - -type OpsUpdateRetryAttemptInput struct { - ID int64 - - // succeeded|failed - Status string - FinishedAt time.Time - DurationMs int64 - - // Persisted execution results (best-effort) - Success *bool - HTTPStatusCode *int - UpstreamRequestID *string - UsedAccountID *int64 - ResponsePreview *string - ResponseTruncated *bool - - // Optional correlation (legacy fields kept) - ResultRequestID *string - ResultErrorID *int64 - - ErrorMessage *string -} - type OpsInsertSystemMetricsInput struct { CreatedAt time.Time WindowMinutes int diff --git a/backend/internal/service/ops_repo_mock_test.go b/backend/internal/service/ops_repo_mock_test.go index c8c66ec6..4138ea77 100644 --- a/backend/internal/service/ops_repo_mock_test.go +++ b/backend/internal/service/ops_repo_mock_test.go @@ -69,23 +69,7 @@ func (m *opsRepoMock) InsertSystemLogCleanupAudit(ctx context.Context, input *Op return nil } -func (m *opsRepoMock) InsertRetryAttempt(ctx context.Context, input *OpsInsertRetryAttemptInput) (int64, error) { - return 0, nil -} - -func (m *opsRepoMock) UpdateRetryAttempt(ctx context.Context, input *OpsUpdateRetryAttemptInput) error { - return nil -} - -func (m *opsRepoMock) GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*OpsRetryAttempt, error) { - return nil, nil -} - -func (m *opsRepoMock) ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*OpsRetryAttempt, error) { - return []*OpsRetryAttempt{}, nil -} - -func (m *opsRepoMock) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error { +func (m *opsRepoMock) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedAt *time.Time) error { return nil } diff --git a/backend/internal/service/ops_retry.go b/backend/internal/service/ops_retry.go deleted file mode 100644 index bd40d389..00000000 --- a/backend/internal/service/ops_retry.go +++ /dev/null @@ -1,726 +0,0 @@ -package service - -import ( - "bytes" - "context" - "database/sql" - "encoding/json" - "errors" - "fmt" - "log" - "net/http" - "strings" - "time" - - "github.com/Wei-Shaw/sub2api/internal/domain" - infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors" - "github.com/gin-gonic/gin" - "github.com/lib/pq" -) - -const ( - OpsRetryModeClient = "client" - OpsRetryModeUpstream = "upstream" -) - -const ( - opsRetryStatusRunning = "running" - opsRetryStatusSucceeded = "succeeded" - opsRetryStatusFailed = "failed" -) - -const ( - opsRetryTimeout = 60 * time.Second - opsRetryCaptureBytesLimit = 64 * 1024 - opsRetryResponsePreviewMax = 8 * 1024 - opsRetryMinIntervalPerError = 10 * time.Second - opsRetryMaxAccountSwitches = 3 -) - -var opsRetryRequestHeaderAllowlist = map[string]bool{ - "anthropic-beta": true, - "anthropic-version": true, -} - -type opsRetryRequestType string - -const ( - opsRetryTypeMessages opsRetryRequestType = "messages" - opsRetryTypeOpenAI opsRetryRequestType = "openai_responses" - opsRetryTypeGeminiV1B opsRetryRequestType = "gemini_v1beta" -) - -type limitedResponseWriter struct { - header http.Header - wroteHeader bool - - limit int - totalWritten int64 - buf bytes.Buffer -} - -func newLimitedResponseWriter(limit int) *limitedResponseWriter { - if limit <= 0 { - limit = 1 - } - return &limitedResponseWriter{ - header: make(http.Header), - limit: limit, - } -} - -func (w *limitedResponseWriter) Header() http.Header { - return w.header -} - -func (w *limitedResponseWriter) WriteHeader(statusCode int) { - if w.wroteHeader { - return - } - w.wroteHeader = true -} - -func (w *limitedResponseWriter) Write(p []byte) (int, error) { - if !w.wroteHeader { - w.WriteHeader(http.StatusOK) - } - w.totalWritten += int64(len(p)) - - if w.buf.Len() < w.limit { - remaining := w.limit - w.buf.Len() - if len(p) > remaining { - _, _ = w.buf.Write(p[:remaining]) - } else { - _, _ = w.buf.Write(p) - } - } - - // Pretend we wrote everything to avoid upstream/client code treating it as an error. - return len(p), nil -} - -func (w *limitedResponseWriter) Flush() {} - -func (w *limitedResponseWriter) bodyBytes() []byte { - return w.buf.Bytes() -} - -func (w *limitedResponseWriter) truncated() bool { - return w.totalWritten > int64(w.limit) -} - -const ( - OpsRetryModeUpstreamEvent = "upstream_event" -) - -func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, errorID int64, mode string, pinnedAccountID *int64) (*OpsRetryResult, error) { - if err := s.RequireMonitoringEnabled(ctx); err != nil { - return nil, err - } - if s.opsRepo == nil { - return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") - } - - mode = strings.ToLower(strings.TrimSpace(mode)) - switch mode { - case OpsRetryModeClient, OpsRetryModeUpstream: - default: - return nil, infraerrors.BadRequest("OPS_RETRY_INVALID_MODE", "mode must be client or upstream") - } - - errorLog, err := s.GetErrorLogByID(ctx, errorID) - if err != nil { - return nil, err - } - if errorLog == nil { - return nil, infraerrors.NotFound("OPS_ERROR_NOT_FOUND", "ops error log not found") - } - if strings.TrimSpace(errorLog.RequestBody) == "" { - return nil, infraerrors.BadRequest("OPS_RETRY_NO_REQUEST_BODY", "No request body found to retry") - } - - var pinned *int64 - if mode == OpsRetryModeUpstream { - if pinnedAccountID != nil && *pinnedAccountID > 0 { - pinned = pinnedAccountID - } else if errorLog.AccountID != nil && *errorLog.AccountID > 0 { - pinned = errorLog.AccountID - } else { - return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "pinned_account_id is required for upstream retry") - } - } - - return s.retryWithErrorLog(ctx, requestedByUserID, errorID, mode, mode, pinned, errorLog) -} - -// RetryUpstreamEvent retries a specific upstream attempt captured inside ops_error_logs.upstream_errors. -// idx is 0-based. It always pins the original event account_id. -func (s *OpsService) RetryUpstreamEvent(ctx context.Context, requestedByUserID int64, errorID int64, idx int) (*OpsRetryResult, error) { - if err := s.RequireMonitoringEnabled(ctx); err != nil { - return nil, err - } - if s.opsRepo == nil { - return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") - } - if idx < 0 { - return nil, infraerrors.BadRequest("OPS_RETRY_INVALID_UPSTREAM_IDX", "invalid upstream idx") - } - - errorLog, err := s.GetErrorLogByID(ctx, errorID) - if err != nil { - return nil, err - } - if errorLog == nil { - return nil, infraerrors.NotFound("OPS_ERROR_NOT_FOUND", "ops error log not found") - } - - events, err := ParseOpsUpstreamErrors(errorLog.UpstreamErrors) - if err != nil { - return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_EVENTS_INVALID", "invalid upstream_errors") - } - if idx >= len(events) { - return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_IDX_OOB", "upstream idx out of range") - } - ev := events[idx] - if ev == nil { - return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_EVENT_MISSING", "upstream event missing") - } - if ev.AccountID <= 0 { - return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "account_id is required for upstream retry") - } - - upstreamBody := strings.TrimSpace(ev.UpstreamRequestBody) - if upstreamBody == "" { - return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_NO_REQUEST_BODY", "No upstream request body found to retry") - } - - override := *errorLog - override.RequestBody = upstreamBody - pinned := ev.AccountID - - // Persist as upstream_event, execute as upstream pinned retry. - return s.retryWithErrorLog(ctx, requestedByUserID, errorID, OpsRetryModeUpstreamEvent, OpsRetryModeUpstream, &pinned, &override) -} - -func (s *OpsService) retryWithErrorLog(ctx context.Context, requestedByUserID int64, errorID int64, mode string, execMode string, pinnedAccountID *int64, errorLog *OpsErrorLogDetail) (*OpsRetryResult, error) { - latest, err := s.opsRepo.GetLatestRetryAttemptForError(ctx, errorID) - if err != nil && !errors.Is(err, sql.ErrNoRows) { - return nil, infraerrors.InternalServer("OPS_RETRY_LOAD_LATEST_FAILED", "Failed to check retry status").WithCause(err) - } - if latest != nil { - if strings.EqualFold(latest.Status, opsRetryStatusRunning) || strings.EqualFold(latest.Status, "queued") { - return nil, infraerrors.Conflict("OPS_RETRY_IN_PROGRESS", "A retry is already in progress for this error") - } - - lastAttemptAt := latest.CreatedAt - if latest.FinishedAt != nil && !latest.FinishedAt.IsZero() { - lastAttemptAt = *latest.FinishedAt - } else if latest.StartedAt != nil && !latest.StartedAt.IsZero() { - lastAttemptAt = *latest.StartedAt - } - - if time.Since(lastAttemptAt) < opsRetryMinIntervalPerError { - return nil, infraerrors.Conflict("OPS_RETRY_TOO_FREQUENT", "Please wait before retrying this error again") - } - } - - if errorLog == nil || strings.TrimSpace(errorLog.RequestBody) == "" { - return nil, infraerrors.BadRequest("OPS_RETRY_NO_REQUEST_BODY", "No request body found to retry") - } - - var pinned *int64 - if execMode == OpsRetryModeUpstream { - if pinnedAccountID != nil && *pinnedAccountID > 0 { - pinned = pinnedAccountID - } else if errorLog.AccountID != nil && *errorLog.AccountID > 0 { - pinned = errorLog.AccountID - } else { - return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "account_id is required for upstream retry") - } - } - - startedAt := time.Now() - attemptID, err := s.opsRepo.InsertRetryAttempt(ctx, &OpsInsertRetryAttemptInput{ - RequestedByUserID: requestedByUserID, - SourceErrorID: errorID, - Mode: mode, - PinnedAccountID: pinned, - Status: opsRetryStatusRunning, - StartedAt: startedAt, - }) - if err != nil { - var pqErr *pq.Error - if errors.As(err, &pqErr) && string(pqErr.Code) == "23505" { - return nil, infraerrors.Conflict("OPS_RETRY_IN_PROGRESS", "A retry is already in progress for this error") - } - return nil, infraerrors.InternalServer("OPS_RETRY_CREATE_ATTEMPT_FAILED", "Failed to create retry attempt").WithCause(err) - } - - result := &OpsRetryResult{ - AttemptID: attemptID, - Mode: mode, - Status: opsRetryStatusFailed, - PinnedAccountID: pinned, - HTTPStatusCode: 0, - UpstreamRequestID: "", - ResponsePreview: "", - ResponseTruncated: false, - ErrorMessage: "", - StartedAt: startedAt, - } - - execCtx, cancel := context.WithTimeout(ctx, opsRetryTimeout) - defer cancel() - - execRes := s.executeRetry(execCtx, errorLog, execMode, pinned) - - finishedAt := time.Now() - result.FinishedAt = finishedAt - result.DurationMs = finishedAt.Sub(startedAt).Milliseconds() - - if execRes != nil { - result.Status = execRes.status - result.UsedAccountID = execRes.usedAccountID - result.HTTPStatusCode = execRes.httpStatusCode - result.UpstreamRequestID = execRes.upstreamRequestID - result.ResponsePreview = execRes.responsePreview - result.ResponseTruncated = execRes.responseTruncated - result.ErrorMessage = execRes.errorMessage - } - - updateCtx, updateCancel := context.WithTimeout(context.Background(), 3*time.Second) - defer updateCancel() - - var updateErrMsg *string - if strings.TrimSpace(result.ErrorMessage) != "" { - msg := result.ErrorMessage - updateErrMsg = &msg - } - // Keep legacy result_request_id empty; use upstream_request_id instead. - var resultRequestID *string - - finalStatus := result.Status - if strings.TrimSpace(finalStatus) == "" { - finalStatus = opsRetryStatusFailed - } - - success := strings.EqualFold(finalStatus, opsRetryStatusSucceeded) - httpStatus := result.HTTPStatusCode - upstreamReqID := result.UpstreamRequestID - usedAccountID := result.UsedAccountID - preview := result.ResponsePreview - truncated := result.ResponseTruncated - - if err := s.opsRepo.UpdateRetryAttempt(updateCtx, &OpsUpdateRetryAttemptInput{ - ID: attemptID, - Status: finalStatus, - FinishedAt: finishedAt, - DurationMs: result.DurationMs, - Success: &success, - HTTPStatusCode: &httpStatus, - UpstreamRequestID: &upstreamReqID, - UsedAccountID: usedAccountID, - ResponsePreview: &preview, - ResponseTruncated: &truncated, - ResultRequestID: resultRequestID, - ErrorMessage: updateErrMsg, - }); err != nil { - log.Printf("[Ops] UpdateRetryAttempt failed: %v", err) - } else if success { - if err := s.opsRepo.UpdateErrorResolution(updateCtx, errorID, true, &requestedByUserID, &attemptID, &finishedAt); err != nil { - log.Printf("[Ops] UpdateErrorResolution failed: %v", err) - } - } - - return result, nil -} - -type opsRetryExecution struct { - status string - - usedAccountID *int64 - httpStatusCode int - upstreamRequestID string - - responsePreview string - responseTruncated bool - - errorMessage string -} - -func (s *OpsService) executeRetry(ctx context.Context, errorLog *OpsErrorLogDetail, mode string, pinnedAccountID *int64) *opsRetryExecution { - if errorLog == nil { - return &opsRetryExecution{ - status: opsRetryStatusFailed, - errorMessage: "missing error log", - } - } - - reqType := detectOpsRetryType(errorLog.RequestPath) - bodyBytes := []byte(errorLog.RequestBody) - - switch reqType { - case opsRetryTypeMessages: - bodyBytes = FilterThinkingBlocksForRetry(bodyBytes) - case opsRetryTypeOpenAI, opsRetryTypeGeminiV1B: - // No-op - } - - switch strings.ToLower(strings.TrimSpace(mode)) { - case OpsRetryModeUpstream: - if pinnedAccountID == nil || *pinnedAccountID <= 0 { - return &opsRetryExecution{ - status: opsRetryStatusFailed, - errorMessage: "pinned_account_id required for upstream retry", - } - } - return s.executePinnedRetry(ctx, reqType, errorLog, bodyBytes, *pinnedAccountID) - case OpsRetryModeClient: - return s.executeClientRetry(ctx, reqType, errorLog, bodyBytes) - default: - return &opsRetryExecution{ - status: opsRetryStatusFailed, - errorMessage: "invalid retry mode", - } - } -} - -func detectOpsRetryType(path string) opsRetryRequestType { - p := strings.ToLower(strings.TrimSpace(path)) - switch { - case strings.Contains(p, "/responses"), strings.Contains(p, "/images/"): - return opsRetryTypeOpenAI - case strings.Contains(p, "/v1beta/"): - return opsRetryTypeGeminiV1B - default: - return opsRetryTypeMessages - } -} - -func (s *OpsService) executePinnedRetry(ctx context.Context, reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte, pinnedAccountID int64) *opsRetryExecution { - if s.accountRepo == nil { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account repository not available"} - } - - account, err := s.accountRepo.GetByID(ctx, pinnedAccountID) - if err != nil { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: fmt.Sprintf("account not found: %v", err)} - } - if account == nil { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account not found"} - } - if !account.IsSchedulable() { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account is not schedulable"} - } - if errorLog.GroupID != nil && *errorLog.GroupID > 0 { - if !containsInt64(account.GroupIDs, *errorLog.GroupID) { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "pinned account is not in the same group as the original request"} - } - } - - var release func() - if s.concurrencyService != nil { - acq, err := s.concurrencyService.AcquireAccountSlot(ctx, account.ID, account.Concurrency) - if err != nil { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: fmt.Sprintf("acquire account slot failed: %v", err)} - } - if acq == nil || !acq.Acquired { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account concurrency limit reached"} - } - release = acq.ReleaseFunc - } - if release != nil { - defer release() - } - - usedID := account.ID - exec := s.executeWithAccount(ctx, reqType, errorLog, body, account) - exec.usedAccountID = &usedID - if exec.status == "" { - exec.status = opsRetryStatusFailed - } - return exec -} - -func (s *OpsService) executeClientRetry(ctx context.Context, reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte) *opsRetryExecution { - groupID := errorLog.GroupID - if groupID == nil || *groupID <= 0 { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "group_id missing; cannot reselect account"} - } - - model, stream, parsedErr := extractRetryModelAndStream(reqType, errorLog, body) - if parsedErr != nil { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: parsedErr.Error()} - } - _ = stream - - excluded := make(map[int64]struct{}) - switches := 0 - - for { - if switches >= opsRetryMaxAccountSwitches { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "retry failed after exhausting account failovers"} - } - - selection, selErr := s.selectAccountForRetry(ctx, reqType, groupID, model, excluded) - if selErr != nil { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: selErr.Error()} - } - if selection == nil || selection.Account == nil { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: ErrNoAvailableAccounts.Error()} - } - - account := selection.Account - if !selection.Acquired || selection.ReleaseFunc == nil { - excluded[account.ID] = struct{}{} - switches++ - continue - } - - attemptCtx := ctx - if switches > 0 { - attemptCtx = WithAccountSwitchCount(attemptCtx, switches, false) - } - exec := func() *opsRetryExecution { - defer selection.ReleaseFunc() - return s.executeWithAccount(attemptCtx, reqType, errorLog, body, account) - }() - - if exec != nil { - if exec.status == opsRetryStatusSucceeded { - usedID := account.ID - exec.usedAccountID = &usedID - return exec - } - // If the gateway services ask for failover, try another account. - if s.isFailoverError(exec.errorMessage) { - excluded[account.ID] = struct{}{} - switches++ - continue - } - usedID := account.ID - exec.usedAccountID = &usedID - return exec - } - - excluded[account.ID] = struct{}{} - switches++ - } -} - -func (s *OpsService) selectAccountForRetry(ctx context.Context, reqType opsRetryRequestType, groupID *int64, model string, excludedIDs map[int64]struct{}) (*AccountSelectionResult, error) { - switch reqType { - case opsRetryTypeOpenAI: - if s.openAIGatewayService == nil { - return nil, fmt.Errorf("openai gateway service not available") - } - return s.openAIGatewayService.SelectAccountWithLoadAwareness(ctx, groupID, "", model, excludedIDs) - case opsRetryTypeGeminiV1B, opsRetryTypeMessages: - if s.gatewayService == nil { - return nil, fmt.Errorf("gateway service not available") - } - return s.gatewayService.SelectAccountWithLoadAwareness(ctx, groupID, "", model, excludedIDs, "", int64(0)) // 重试不使用会话限制 - default: - return nil, fmt.Errorf("unsupported retry type: %s", reqType) - } -} - -func extractRetryModelAndStream(reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte) (model string, stream bool, err error) { - switch reqType { - case opsRetryTypeMessages: - parsed, parseErr := ParseGatewayRequest(body, domain.PlatformAnthropic) - if parseErr != nil { - return "", false, fmt.Errorf("failed to parse messages request body: %w", parseErr) - } - return parsed.Model, parsed.Stream, nil - case opsRetryTypeOpenAI: - var v struct { - Model string `json:"model"` - Stream bool `json:"stream"` - } - if err := json.Unmarshal(body, &v); err != nil { - return "", false, fmt.Errorf("failed to parse openai request body: %w", err) - } - return strings.TrimSpace(v.Model), v.Stream, nil - case opsRetryTypeGeminiV1B: - if strings.TrimSpace(errorLog.Model) == "" { - return "", false, fmt.Errorf("missing model for gemini v1beta retry") - } - return strings.TrimSpace(errorLog.Model), errorLog.Stream, nil - default: - return "", false, fmt.Errorf("unsupported retry type: %s", reqType) - } -} - -func (s *OpsService) executeWithAccount(ctx context.Context, reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte, account *Account) *opsRetryExecution { - if account == nil { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "missing account"} - } - - c, w := newOpsRetryContext(ctx, errorLog) - - var err error - switch reqType { - case opsRetryTypeOpenAI: - if s.openAIGatewayService == nil { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "openai gateway service not available"} - } - _, err = s.openAIGatewayService.Forward(ctx, c, account, body) - case opsRetryTypeGeminiV1B: - if s.geminiCompatService == nil || s.antigravityGatewayService == nil { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "gemini services not available"} - } - modelName := strings.TrimSpace(errorLog.Model) - action := "generateContent" - if errorLog.Stream { - action = "streamGenerateContent" - } - if account.Platform == PlatformAntigravity { - _, err = s.antigravityGatewayService.ForwardGemini(ctx, c, account, modelName, action, errorLog.Stream, body, false) - } else { - _, err = s.geminiCompatService.ForwardNative(ctx, c, account, modelName, action, errorLog.Stream, body) - } - case opsRetryTypeMessages: - switch account.Platform { - case PlatformAntigravity: - if s.antigravityGatewayService == nil { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "antigravity gateway service not available"} - } - _, err = s.antigravityGatewayService.Forward(ctx, c, account, body, false) - case PlatformGemini: - if s.geminiCompatService == nil { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "gemini gateway service not available"} - } - _, err = s.geminiCompatService.Forward(ctx, c, account, body) - default: - if s.gatewayService == nil { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "gateway service not available"} - } - parsedReq, parseErr := ParseGatewayRequest(body, domain.PlatformAnthropic) - if parseErr != nil { - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "failed to parse request body"} - } - _, err = s.gatewayService.Forward(ctx, c, account, parsedReq) - } - default: - return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "unsupported retry type"} - } - - statusCode := http.StatusOK - if c != nil && c.Writer != nil { - statusCode = c.Writer.Status() - } - - upstreamReqID := extractUpstreamRequestID(c) - preview, truncated := extractResponsePreview(w) - - exec := &opsRetryExecution{ - status: opsRetryStatusFailed, - httpStatusCode: statusCode, - upstreamRequestID: upstreamReqID, - responsePreview: preview, - responseTruncated: truncated, - errorMessage: "", - } - - if err == nil && statusCode < 400 { - exec.status = opsRetryStatusSucceeded - return exec - } - - if err != nil { - exec.errorMessage = err.Error() - } else { - exec.errorMessage = fmt.Sprintf("upstream returned status %d", statusCode) - } - - return exec -} - -func newOpsRetryContext(ctx context.Context, errorLog *OpsErrorLogDetail) (*gin.Context, *limitedResponseWriter) { - w := newLimitedResponseWriter(opsRetryCaptureBytesLimit) - c, _ := gin.CreateTestContext(w) - - path := "/" - if errorLog != nil && strings.TrimSpace(errorLog.RequestPath) != "" { - path = errorLog.RequestPath - } - - req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "http://localhost"+path, bytes.NewReader(nil)) - req.Header.Set("content-type", "application/json") - if errorLog != nil && strings.TrimSpace(errorLog.UserAgent) != "" { - req.Header.Set("user-agent", errorLog.UserAgent) - } - // Restore a minimal, whitelisted subset of request headers to improve retry fidelity - // (e.g. anthropic-beta / anthropic-version). Never replay auth credentials. - if errorLog != nil && strings.TrimSpace(errorLog.RequestHeaders) != "" { - var stored map[string]string - if err := json.Unmarshal([]byte(errorLog.RequestHeaders), &stored); err == nil { - for k, v := range stored { - key := strings.TrimSpace(k) - if key == "" { - continue - } - if !opsRetryRequestHeaderAllowlist[strings.ToLower(key)] { - continue - } - val := strings.TrimSpace(v) - if val == "" { - continue - } - req.Header.Set(key, val) - } - } - } - - c.Request = req - SetOpenAIClientTransport(c, OpenAIClientTransportHTTP) - return c, w -} - -func extractUpstreamRequestID(c *gin.Context) string { - if c == nil || c.Writer == nil { - return "" - } - h := c.Writer.Header() - if h == nil { - return "" - } - for _, key := range []string{"x-request-id", "X-Request-Id", "X-Request-ID"} { - if v := strings.TrimSpace(h.Get(key)); v != "" { - return v - } - } - return "" -} - -func extractResponsePreview(w *limitedResponseWriter) (preview string, truncated bool) { - if w == nil { - return "", false - } - b := bytes.TrimSpace(w.bodyBytes()) - if len(b) == 0 { - return "", w.truncated() - } - if len(b) > opsRetryResponsePreviewMax { - return string(b[:opsRetryResponsePreviewMax]), true - } - return string(b), w.truncated() -} - -func containsInt64(items []int64, needle int64) bool { - for _, v := range items { - if v == needle { - return true - } - } - return false -} - -func (s *OpsService) isFailoverError(message string) bool { - msg := strings.ToLower(strings.TrimSpace(message)) - if msg == "" { - return false - } - return strings.Contains(msg, "upstream error:") && strings.Contains(msg, "failover") -} diff --git a/backend/internal/service/ops_retry_context_test.go b/backend/internal/service/ops_retry_context_test.go deleted file mode 100644 index a8c26ee4..00000000 --- a/backend/internal/service/ops_retry_context_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package service - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestNewOpsRetryContext_SetsHTTPTransportAndRequestHeaders(t *testing.T) { - errorLog := &OpsErrorLogDetail{ - OpsErrorLog: OpsErrorLog{ - RequestPath: "/openai/v1/responses", - }, - UserAgent: "ops-retry-agent/1.0", - RequestHeaders: `{ - "anthropic-beta":"beta-v1", - "ANTHROPIC-VERSION":"2023-06-01", - "authorization":"Bearer should-not-forward" - }`, - } - - c, w := newOpsRetryContext(context.Background(), errorLog) - require.NotNil(t, c) - require.NotNil(t, w) - require.NotNil(t, c.Request) - - require.Equal(t, "/openai/v1/responses", c.Request.URL.Path) - require.Equal(t, "application/json", c.Request.Header.Get("Content-Type")) - require.Equal(t, "ops-retry-agent/1.0", c.Request.Header.Get("User-Agent")) - require.Equal(t, "beta-v1", c.Request.Header.Get("anthropic-beta")) - require.Equal(t, "2023-06-01", c.Request.Header.Get("anthropic-version")) - require.Empty(t, c.Request.Header.Get("authorization"), "未在白名单内的敏感头不应被重放") - require.Equal(t, OpenAIClientTransportHTTP, GetOpenAIClientTransport(c)) -} - -func TestNewOpsRetryContext_InvalidHeadersJSONStillSetsHTTPTransport(t *testing.T) { - errorLog := &OpsErrorLogDetail{ - RequestHeaders: "{invalid-json", - } - - c, _ := newOpsRetryContext(context.Background(), errorLog) - require.NotNil(t, c) - require.NotNil(t, c.Request) - require.Equal(t, "/", c.Request.URL.Path) - require.Equal(t, OpenAIClientTransportHTTP, GetOpenAIClientTransport(c)) -} diff --git a/backend/internal/service/ops_service.go b/backend/internal/service/ops_service.go index 11afc6f9..1cea72fa 100644 --- a/backend/internal/service/ops_service.go +++ b/backend/internal/service/ops_service.go @@ -16,26 +16,9 @@ import ( var ErrOpsDisabled = infraerrors.NotFound("OPS_DISABLED", "Ops monitoring is disabled") const ( - opsMaxStoredRequestBodyBytes = 256 * 1024 - opsMaxStoredErrorBodyBytes = 20 * 1024 + opsMaxStoredErrorBodyBytes = 20 * 1024 ) -// PrepareOpsRequestBodyForQueue 在入队前对请求体执行脱敏与裁剪,返回可直接写入 OpsInsertErrorLogInput 的字段。 -// 该方法用于避免异步队列持有大块原始请求体,减少错误风暴下的内存放大风险。 -func PrepareOpsRequestBodyForQueue(raw []byte) (requestBodyJSON *string, truncated bool, requestBodyBytes *int) { - if len(raw) == 0 { - return nil, false, nil - } - sanitized, truncated, bytesLen := sanitizeAndTrimRequestBody(raw, opsMaxStoredRequestBodyBytes) - if sanitized != "" { - out := sanitized - requestBodyJSON = &out - } - n := bytesLen - requestBodyBytes = &n - return requestBodyJSON, truncated, requestBodyBytes -} - // OpsService provides ingestion and query APIs for the Ops monitoring module. type OpsService struct { opsRepo OpsRepository @@ -138,8 +121,8 @@ func (s *OpsService) IsMonitoringEnabled(ctx context.Context) bool { } } -func (s *OpsService) RecordError(ctx context.Context, entry *OpsInsertErrorLogInput, rawRequestBody []byte) error { - prepared, ok, err := s.prepareErrorLogInput(ctx, entry, rawRequestBody) +func (s *OpsService) RecordError(ctx context.Context, entry *OpsInsertErrorLogInput) error { + prepared, ok, err := s.prepareErrorLogInput(ctx, entry) if err != nil { log.Printf("[Ops] RecordError prepare failed: %v", err) return err @@ -162,7 +145,7 @@ func (s *OpsService) RecordErrorBatch(ctx context.Context, entries []*OpsInsertE } prepared := make([]*OpsInsertErrorLogInput, 0, len(entries)) for _, entry := range entries { - item, ok, err := s.prepareErrorLogInput(ctx, entry, nil) + item, ok, err := s.prepareErrorLogInput(ctx, entry) if err != nil { log.Printf("[Ops] RecordErrorBatch prepare failed: %v", err) continue @@ -198,7 +181,7 @@ func (s *OpsService) RecordErrorBatch(ctx context.Context, entries []*OpsInsertE return nil } -func (s *OpsService) prepareErrorLogInput(ctx context.Context, entry *OpsInsertErrorLogInput, rawRequestBody []byte) (*OpsInsertErrorLogInput, bool, error) { +func (s *OpsService) prepareErrorLogInput(ctx context.Context, entry *OpsInsertErrorLogInput) (*OpsInsertErrorLogInput, bool, error) { if entry == nil { return nil, false, nil } @@ -224,11 +207,6 @@ func (s *OpsService) prepareErrorLogInput(ctx context.Context, entry *OpsInsertE entry.ErrorType = "api_error" } - // Sanitize + trim request body (errors only). - if len(rawRequestBody) > 0 { - entry.RequestBodyJSON, entry.RequestBodyTruncated, entry.RequestBodyBytes = PrepareOpsRequestBodyForQueue(rawRequestBody) - } - // Sanitize + truncate error_body to avoid storing sensitive data. if strings.TrimSpace(entry.ErrorBody) != "" { sanitized, _ := sanitizeErrorBodyForStorage(entry.ErrorBody, opsMaxStoredErrorBodyBytes) @@ -315,25 +293,6 @@ func sanitizeOpsUpstreamErrors(entry *OpsInsertErrorLogInput) error { out.Detail = "" } - out.UpstreamRequestBody = strings.TrimSpace(out.UpstreamRequestBody) - if out.UpstreamRequestBody != "" { - // Reuse the same sanitization/trimming strategy as request body storage. - // Keep it small so it is safe to persist in ops_error_logs JSON. - sanitizedBody, truncated, _ := sanitizeAndTrimRequestBody([]byte(out.UpstreamRequestBody), 10*1024) - if sanitizedBody != "" { - out.UpstreamRequestBody = sanitizedBody - if truncated { - out.Kind = strings.TrimSpace(out.Kind) - if out.Kind == "" { - out.Kind = "upstream" - } - out.Kind = out.Kind + ":request_body_truncated" - } - } else { - out.UpstreamRequestBody = "" - } - } - // Drop fully-empty events (can happen if only status code was known). if out.UpstreamStatusCode == 0 && out.Message == "" && out.Detail == "" { continue @@ -381,27 +340,7 @@ func (s *OpsService) GetErrorLogByID(ctx context.Context, id int64) (*OpsErrorLo return detail, nil } -func (s *OpsService) ListRetryAttemptsByErrorID(ctx context.Context, errorID int64, limit int) ([]*OpsRetryAttempt, error) { - if err := s.RequireMonitoringEnabled(ctx); err != nil { - return nil, err - } - if s.opsRepo == nil { - return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") - } - if errorID <= 0 { - return nil, infraerrors.BadRequest("OPS_ERROR_INVALID_ID", "invalid error id") - } - items, err := s.opsRepo.ListRetryAttemptsByErrorID(ctx, errorID, limit) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return []*OpsRetryAttempt{}, nil - } - return nil, infraerrors.InternalServer("OPS_RETRY_LIST_FAILED", "Failed to list retry attempts").WithCause(err) - } - return items, nil -} - -func (s *OpsService) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64) error { +func (s *OpsService) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64) error { if err := s.RequireMonitoringEnabled(ctx); err != nil { return err } @@ -418,10 +357,10 @@ func (s *OpsService) UpdateErrorResolution(ctx context.Context, errorID int64, r } return infraerrors.InternalServer("OPS_ERROR_LOAD_FAILED", "Failed to load ops error log").WithCause(err) } - return s.opsRepo.UpdateErrorResolution(ctx, errorID, resolved, resolvedByUserID, resolvedRetryID, nil) + return s.opsRepo.UpdateErrorResolution(ctx, errorID, resolved, resolvedByUserID, nil) } -func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, truncated bool, bytesLen int) { +func sanitizeAndTrimJSONPayload(raw []byte, maxBytes int) (jsonString string, truncated bool, bytesLen int) { bytesLen = len(raw) if len(raw) == 0 { return "", false, 0 @@ -429,7 +368,7 @@ func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, tr var decoded any if err := json.Unmarshal(raw, &decoded); err != nil { - // If it's not valid JSON, don't store (retry would not be reliable anyway). + // If it is not valid JSON, fall back to the caller's non-JSON handling. return "", false, bytesLen } @@ -465,7 +404,7 @@ func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, tr // This avoids downstream code that expects certain top-level keys from crashing. if root, ok := decoded.(map[string]any); ok { placeholder := shallowCopyMap(root) - placeholder["request_body_truncated"] = true + placeholder["payload_truncated"] = true // Replace potentially huge arrays/strings, but keep the keys present. for _, k := range []string{"messages", "contents", "input", "prompt"} { @@ -488,7 +427,7 @@ func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, tr } // Final fallback: minimal valid JSON. - encoded4, err4 := json.Marshal(map[string]any{"request_body_truncated": true}) + encoded4, err4 := json.Marshal(map[string]any{"payload_truncated": true}) if err4 != nil { return "", true, bytesLen } @@ -732,7 +671,7 @@ func sanitizeErrorBodyForStorage(raw string, maxBytes int) (sanitized string, tr } // Prefer JSON-safe sanitization when possible. - if out, trunc, _ := sanitizeAndTrimRequestBody([]byte(raw), maxBytes); out != "" { + if out, trunc, _ := sanitizeAndTrimJSONPayload([]byte(raw), maxBytes); out != "" { return out, trunc } diff --git a/backend/internal/service/ops_service_batch_test.go b/backend/internal/service/ops_service_batch_test.go index f3a14d7f..a9419ad7 100644 --- a/backend/internal/service/ops_service_batch_test.go +++ b/backend/internal/service/ops_service_batch_test.go @@ -31,11 +31,10 @@ func TestOpsServiceRecordErrorBatch_SanitizesAndBatches(t *testing.T) { UpstreamErrorDetail: strPtr(detail), UpstreamErrors: []*OpsUpstreamErrorEvent{ { - AccountID: -2, - UpstreamStatusCode: 429, - Message: " token leaked ", - Detail: `{"refresh_token":"secret"}`, - UpstreamRequestBody: `{"api_key":"secret","messages":[{"role":"user","content":"hello"}]}`, + AccountID: -2, + UpstreamStatusCode: 429, + Message: " token leaked ", + Detail: `{"refresh_token":"secret"}`, }, }, }, diff --git a/backend/internal/service/ops_service_prepare_queue_test.go b/backend/internal/service/ops_service_prepare_queue_test.go deleted file mode 100644 index d6f32c2d..00000000 --- a/backend/internal/service/ops_service_prepare_queue_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package service - -import ( - "encoding/json" - "strings" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestPrepareOpsRequestBodyForQueue_EmptyBody(t *testing.T) { - requestBodyJSON, truncated, requestBodyBytes := PrepareOpsRequestBodyForQueue(nil) - require.Nil(t, requestBodyJSON) - require.False(t, truncated) - require.Nil(t, requestBodyBytes) -} - -func TestPrepareOpsRequestBodyForQueue_InvalidJSON(t *testing.T) { - raw := []byte("{invalid-json") - requestBodyJSON, truncated, requestBodyBytes := PrepareOpsRequestBodyForQueue(raw) - require.Nil(t, requestBodyJSON) - require.False(t, truncated) - require.NotNil(t, requestBodyBytes) - require.Equal(t, len(raw), *requestBodyBytes) -} - -func TestPrepareOpsRequestBodyForQueue_RedactSensitiveFields(t *testing.T) { - raw := []byte(`{ - "model":"claude-3-5-sonnet-20241022", - "api_key":"sk-test-123", - "headers":{"authorization":"Bearer secret-token"}, - "messages":[{"role":"user","content":"hello"}] - }`) - - requestBodyJSON, truncated, requestBodyBytes := PrepareOpsRequestBodyForQueue(raw) - require.NotNil(t, requestBodyJSON) - require.NotNil(t, requestBodyBytes) - require.False(t, truncated) - require.Equal(t, len(raw), *requestBodyBytes) - - var body map[string]any - require.NoError(t, json.Unmarshal([]byte(*requestBodyJSON), &body)) - require.Equal(t, "[REDACTED]", body["api_key"]) - headers, ok := body["headers"].(map[string]any) - require.True(t, ok) - require.Equal(t, "[REDACTED]", headers["authorization"]) -} - -func TestPrepareOpsRequestBodyForQueue_LargeBodyTruncated(t *testing.T) { - largeMsg := strings.Repeat("x", opsMaxStoredRequestBodyBytes*2) - raw := []byte(`{"model":"claude-3-5-sonnet-20241022","messages":[{"role":"user","content":"` + largeMsg + `"}]}`) - - requestBodyJSON, truncated, requestBodyBytes := PrepareOpsRequestBodyForQueue(raw) - require.NotNil(t, requestBodyJSON) - require.NotNil(t, requestBodyBytes) - require.True(t, truncated) - require.Equal(t, len(raw), *requestBodyBytes) - require.LessOrEqual(t, len(*requestBodyJSON), opsMaxStoredRequestBodyBytes) - require.Contains(t, *requestBodyJSON, "request_body_truncated") -} diff --git a/backend/internal/service/ops_service_redaction_test.go b/backend/internal/service/ops_service_redaction_test.go index e0aeafa5..72b85ff0 100644 --- a/backend/internal/service/ops_service_redaction_test.go +++ b/backend/internal/service/ops_service_redaction_test.go @@ -45,11 +45,11 @@ func TestIsSensitiveKey_TokenBudgetKeysNotRedacted(t *testing.T) { } } -func TestSanitizeAndTrimRequestBody_PreservesTokenBudgetFields(t *testing.T) { +func TestSanitizeAndTrimJSONPayload_PreservesTokenBudgetFields(t *testing.T) { t.Parallel() raw := []byte(`{"model":"claude-3","max_tokens":123,"thinking":{"type":"enabled","budget_tokens":456},"access_token":"abc","messages":[{"role":"user","content":"hi"}]}`) - out, _, _ := sanitizeAndTrimRequestBody(raw, 10*1024) + out, _, _ := sanitizeAndTrimJSONPayload(raw, 10*1024) if out == "" { t.Fatalf("expected non-empty sanitized output") } diff --git a/backend/internal/service/ops_upstream_context.go b/backend/internal/service/ops_upstream_context.go index 5c8ac5a6..b4ff0e74 100644 --- a/backend/internal/service/ops_upstream_context.go +++ b/backend/internal/service/ops_upstream_context.go @@ -16,11 +16,6 @@ const ( OpsUpstreamErrorDetailKey = "ops_upstream_error_detail" OpsUpstreamErrorsKey = "ops_upstream_errors" - // Best-effort capture of the current upstream request body so ops can - // retry the specific upstream attempt (not just the client request). - // This value is sanitized+trimmed before being persisted. - OpsUpstreamRequestBodyKey = "ops_upstream_request_body" - // Optional stage latencies (milliseconds) for troubleshooting and alerting. OpsAuthLatencyMsKey = "ops_auth_latency_ms" OpsRoutingLatencyMsKey = "ops_routing_latency_ms" @@ -44,14 +39,6 @@ const ( OpsClientBusinessLimitedReasonIPRestriction = "api_key_ip_restriction" ) -func setOpsUpstreamRequestBody(c *gin.Context, body []byte) { - if c == nil || len(body) == 0 { - return - } - // 热路径避免 string(body) 额外分配,按需在落库前再转换。 - c.Set(OpsUpstreamRequestBodyKey, body) -} - func SetOpsLatencyMs(c *gin.Context, key string, value int64) { if c == nil || strings.TrimSpace(key) == "" || value < 0 { return @@ -125,10 +112,6 @@ type OpsUpstreamErrorEvent struct { // Helps debug 404/routing errors by showing which endpoint was targeted. UpstreamURL string `json:"upstream_url,omitempty"` - // Best-effort upstream request capture (sanitized+trimmed). - // Required for retrying a specific upstream attempt. - UpstreamRequestBody string `json:"upstream_request_body,omitempty"` - // Best-effort upstream response capture (sanitized+trimmed). UpstreamResponseBody string `json:"upstream_response_body,omitempty"` @@ -148,7 +131,6 @@ func appendOpsUpstreamError(c *gin.Context, ev OpsUpstreamErrorEvent) { } ev.Platform = strings.TrimSpace(ev.Platform) ev.UpstreamRequestID = strings.TrimSpace(ev.UpstreamRequestID) - ev.UpstreamRequestBody = strings.TrimSpace(ev.UpstreamRequestBody) ev.UpstreamResponseBody = strings.TrimSpace(ev.UpstreamResponseBody) ev.Kind = strings.TrimSpace(ev.Kind) ev.UpstreamURL = strings.TrimSpace(ev.UpstreamURL) @@ -158,19 +140,6 @@ func appendOpsUpstreamError(c *gin.Context, ev OpsUpstreamErrorEvent) { ev.Message = sanitizeUpstreamErrorMessage(ev.Message) } - // If the caller didn't explicitly pass upstream request body but the gateway - // stored it on the context, attach it so ops can retry this specific attempt. - if ev.UpstreamRequestBody == "" { - if v, ok := c.Get(OpsUpstreamRequestBodyKey); ok { - switch raw := v.(type) { - case string: - ev.UpstreamRequestBody = strings.TrimSpace(raw) - case []byte: - ev.UpstreamRequestBody = strings.TrimSpace(string(raw)) - } - } - } - var existing []*OpsUpstreamErrorEvent if v, ok := c.Get(OpsUpstreamErrorsKey); ok { if arr, ok := v.([]*OpsUpstreamErrorEvent); ok { diff --git a/backend/internal/service/ops_upstream_context_test.go b/backend/internal/service/ops_upstream_context_test.go index fa6d1085..711223f4 100644 --- a/backend/internal/service/ops_upstream_context_test.go +++ b/backend/internal/service/ops_upstream_context_test.go @@ -1,10 +1,8 @@ package service import ( - "net/http/httptest" "testing" - "github.com/gin-gonic/gin" "github.com/stretchr/testify/require" ) @@ -28,41 +26,3 @@ func TestSafeUpstreamURL(t *testing.T) { }) } } - -func TestAppendOpsUpstreamError_UsesRequestBodyBytesFromContext(t *testing.T) { - gin.SetMode(gin.TestMode) - rec := httptest.NewRecorder() - c, _ := gin.CreateTestContext(rec) - - setOpsUpstreamRequestBody(c, []byte(`{"model":"gpt-5"}`)) - appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ - Kind: "http_error", - Message: "upstream failed", - }) - - v, ok := c.Get(OpsUpstreamErrorsKey) - require.True(t, ok) - events, ok := v.([]*OpsUpstreamErrorEvent) - require.True(t, ok) - require.Len(t, events, 1) - require.Equal(t, `{"model":"gpt-5"}`, events[0].UpstreamRequestBody) -} - -func TestAppendOpsUpstreamError_UsesRequestBodyStringFromContext(t *testing.T) { - gin.SetMode(gin.TestMode) - rec := httptest.NewRecorder() - c, _ := gin.CreateTestContext(rec) - - c.Set(OpsUpstreamRequestBodyKey, `{"model":"gpt-4"}`) - appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ - Kind: "request_error", - Message: "dial timeout", - }) - - v, ok := c.Get(OpsUpstreamErrorsKey) - require.True(t, ok) - events, ok := v.([]*OpsUpstreamErrorEvent) - require.True(t, ok) - require.Len(t, events, 1) - require.Equal(t, `{"model":"gpt-4"}`, events[0].UpstreamRequestBody) -} diff --git a/backend/internal/service/slice_helpers.go b/backend/internal/service/slice_helpers.go new file mode 100644 index 00000000..4894f5aa --- /dev/null +++ b/backend/internal/service/slice_helpers.go @@ -0,0 +1,10 @@ +package service + +func containsInt64(values []int64, target int64) bool { + for _, v := range values { + if v == target { + return true + } + } + return false +} diff --git a/backend/migrations/136_remove_ops_retry_replay.sql b/backend/migrations/136_remove_ops_retry_replay.sql new file mode 100644 index 00000000..12ecc77d --- /dev/null +++ b/backend/migrations/136_remove_ops_retry_replay.sql @@ -0,0 +1,16 @@ +-- Remove unused Ops retry/replay storage. +-- The retry endpoints are no longer exposed, so keeping request bodies and +-- retry audit rows only increases write width, memory retention, and DB size. + +DROP TABLE IF EXISTS ops_retry_attempts CASCADE; + +ALTER TABLE ops_error_logs + DROP COLUMN IF EXISTS request_body, + DROP COLUMN IF EXISTS request_headers, + DROP COLUMN IF EXISTS request_body_truncated, + DROP COLUMN IF EXISTS request_body_bytes, + DROP COLUMN IF EXISTS is_retryable, + DROP COLUMN IF EXISTS retry_count, + DROP COLUMN IF EXISTS resolved_retry_id; + +COMMENT ON TABLE ops_error_logs IS 'Ops error logs (vNext). Stores sanitized error details; request replay storage removed.';