Merge pull request #2581 from is7Qin/fix/ops-body-memory-retention

降低大请求体场景下的 Ops 错误日志内存放大
This commit is contained in:
Wesley Liddick 2026-05-20 08:40:24 +08:00 committed by GitHub
commit 8e77241386
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 138 additions and 2198 deletions

View File

@ -1,9 +1,7 @@
package admin
import (
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
@ -384,79 +382,6 @@ func (h *OpsHandler) ListRequestErrorUpstreamErrors(c *gin.Context) {
response.Paginated(c, result.Errors, int64(result.Total), result.Page, result.PageSize)
}
// RetryRequestErrorClient retries the client request based on stored request body.
// POST /api/v1/admin/ops/request-errors/:id/retry-client
func (h *OpsHandler) RetryRequestErrorClient(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
subject, ok := middleware.GetAuthSubjectFromContext(c)
if !ok || subject.UserID <= 0 {
response.Error(c, http.StatusUnauthorized, "Unauthorized")
return
}
idStr := strings.TrimSpace(c.Param("id"))
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid error id")
return
}
result, err := h.opsService.RetryError(c.Request.Context(), subject.UserID, id, service.OpsRetryModeClient, nil)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, result)
}
// RetryRequestErrorUpstreamEvent retries a specific upstream attempt using captured upstream_request_body.
// POST /api/v1/admin/ops/request-errors/:id/upstream-errors/:idx/retry
func (h *OpsHandler) RetryRequestErrorUpstreamEvent(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
subject, ok := middleware.GetAuthSubjectFromContext(c)
if !ok || subject.UserID <= 0 {
response.Error(c, http.StatusUnauthorized, "Unauthorized")
return
}
idStr := strings.TrimSpace(c.Param("id"))
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid error id")
return
}
idxStr := strings.TrimSpace(c.Param("idx"))
idx, err := strconv.Atoi(idxStr)
if err != nil || idx < 0 {
response.BadRequest(c, "Invalid upstream idx")
return
}
result, err := h.opsService.RetryUpstreamEvent(c.Request.Context(), subject.UserID, id, idx)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, result)
}
// ResolveRequestError toggles resolved status.
// PUT /api/v1/admin/ops/request-errors/:id/resolve
func (h *OpsHandler) ResolveRequestError(c *gin.Context) {
@ -564,39 +489,6 @@ func (h *OpsHandler) GetUpstreamError(c *gin.Context) {
h.GetErrorLogByID(c)
}
// RetryUpstreamError retries upstream error using the original account_id.
// POST /api/v1/admin/ops/upstream-errors/:id/retry
func (h *OpsHandler) RetryUpstreamError(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
subject, ok := middleware.GetAuthSubjectFromContext(c)
if !ok || subject.UserID <= 0 {
response.Error(c, http.StatusUnauthorized, "Unauthorized")
return
}
idStr := strings.TrimSpace(c.Param("id"))
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid error id")
return
}
result, err := h.opsService.RetryError(c.Request.Context(), subject.UserID, id, service.OpsRetryModeUpstream, nil)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, result)
}
// ResolveUpstreamError toggles resolved status.
// PUT /api/v1/admin/ops/upstream-errors/:id/resolve
func (h *OpsHandler) ResolveUpstreamError(c *gin.Context) {
@ -706,106 +598,10 @@ func (h *OpsHandler) ListRequestDetails(c *gin.Context) {
response.Paginated(c, out.Items, out.Total, out.Page, out.PageSize)
}
type opsRetryRequest struct {
Mode string `json:"mode"`
PinnedAccountID *int64 `json:"pinned_account_id"`
Force bool `json:"force"`
}
type opsResolveRequest struct {
Resolved bool `json:"resolved"`
}
// RetryErrorRequest retries a failed request using stored request_body.
// POST /api/v1/admin/ops/errors/:id/retry
func (h *OpsHandler) RetryErrorRequest(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
subject, ok := middleware.GetAuthSubjectFromContext(c)
if !ok || subject.UserID <= 0 {
response.Error(c, http.StatusUnauthorized, "Unauthorized")
return
}
idStr := strings.TrimSpace(c.Param("id"))
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid error id")
return
}
req := opsRetryRequest{Mode: service.OpsRetryModeClient}
if err := c.ShouldBindJSON(&req); err != nil && !errors.Is(err, io.EOF) {
response.BadRequest(c, "Invalid request: "+err.Error())
return
}
if strings.TrimSpace(req.Mode) == "" {
req.Mode = service.OpsRetryModeClient
}
// Force flag is currently a UI-level acknowledgement. Server may still enforce safety constraints.
_ = req.Force
// Legacy endpoint safety: only allow retrying the client request here.
// Upstream retries must go through the split endpoints.
if strings.EqualFold(strings.TrimSpace(req.Mode), service.OpsRetryModeUpstream) {
response.BadRequest(c, "upstream retry is not supported on this endpoint")
return
}
result, err := h.opsService.RetryError(c.Request.Context(), subject.UserID, id, req.Mode, req.PinnedAccountID)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, result)
}
// ListRetryAttempts lists retry attempts for an error log.
// GET /api/v1/admin/ops/errors/:id/retries
func (h *OpsHandler) ListRetryAttempts(c *gin.Context) {
if h.opsService == nil {
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
return
}
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
idStr := strings.TrimSpace(c.Param("id"))
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil || id <= 0 {
response.BadRequest(c, "Invalid error id")
return
}
limit := 50
if v := strings.TrimSpace(c.Query("limit")); v != "" {
n, err := strconv.Atoi(v)
if err != nil || n <= 0 {
response.BadRequest(c, "Invalid limit")
return
}
limit = n
}
items, err := h.opsService.ListRetryAttemptsByErrorID(c.Request.Context(), id, limit)
if err != nil {
response.ErrorFrom(c, err)
return
}
response.Success(c, items)
}
// UpdateErrorResolution allows manual resolve/unresolve.
// PUT /api/v1/admin/ops/errors/:id/resolve
func (h *OpsHandler) UpdateErrorResolution(c *gin.Context) {
@ -837,7 +633,7 @@ func (h *OpsHandler) UpdateErrorResolution(c *gin.Context) {
return
}
uid := subject.UserID
if err := h.opsService.UpdateErrorResolution(c.Request.Context(), id, req.Resolved, &uid, nil); err != nil {
if err := h.opsService.UpdateErrorResolution(c.Request.Context(), id, req.Resolved, &uid); err != nil {
response.ErrorFrom(c, err)
return
}

View File

@ -151,7 +151,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
return
}
setOpsRequestContext(c, "", false, body)
setOpsRequestContext(c, "", false)
parsedReq, err := service.ParseGatewayRequest(body, domain.PlatformAnthropic)
if err != nil {
@ -184,7 +184,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
// 在请求上下文中记录 thinking 状态,供 Antigravity 最终模型 key 推导/模型维度限流使用
c.Request = c.Request.WithContext(service.WithThinkingEnabled(c.Request.Context(), parsedReq.ThinkingEnabled, h.metadataBridgeEnabled()))
setOpsRequestContext(c, reqModel, reqStream, body)
setOpsRequestContext(c, reqModel, reqStream)
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false)))
// 验证 model 必填
@ -1512,7 +1512,7 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) {
return
}
setOpsRequestContext(c, "", false, body)
setOpsRequestContext(c, "", false)
parsedReq, err := service.ParseGatewayRequest(body, domain.PlatformAnthropic)
if err != nil {
@ -1531,7 +1531,7 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) {
return
}
setOpsRequestContext(c, parsedReq.Model, parsedReq.Stream, body)
setOpsRequestContext(c, parsedReq.Model, parsedReq.Stream)
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(parsedReq.Stream, false)))
// 获取订阅信息可能为nil

View File

@ -60,7 +60,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) {
return
}
setOpsRequestContext(c, "", false, body)
setOpsRequestContext(c, "", false)
// Validate JSON
if !gjson.ValidBytes(body) {
@ -78,7 +78,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) {
reqStream := gjson.GetBytes(body, "stream").Bool()
reqLog = reqLog.With(zap.String("model", reqModel), zap.Bool("stream", reqStream))
setOpsRequestContext(c, reqModel, reqStream, body)
setOpsRequestContext(c, reqModel, reqStream)
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false)))
// 解析渠道级模型映射

View File

@ -60,7 +60,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) {
return
}
setOpsRequestContext(c, "", false, body)
setOpsRequestContext(c, "", false)
// Validate JSON
if !gjson.ValidBytes(body) {
@ -78,7 +78,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) {
reqStream := gjson.GetBytes(body, "stream").Bool()
reqLog = reqLog.With(zap.String("model", reqModel), zap.Bool("stream", reqStream))
setOpsRequestContext(c, reqModel, reqStream, body)
setOpsRequestContext(c, reqModel, reqStream)
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false)))
// 解析渠道级模型映射

View File

@ -184,7 +184,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
return
}
setOpsRequestContext(c, modelName, stream, body)
setOpsRequestContext(c, modelName, stream)
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(stream, false)))
if decision := h.checkContentModeration(c, reqLog, apiKey, authSubject, service.ContentModerationProtocolGemini, modelName, body); decision != nil && decision.Blocked {

View File

@ -78,7 +78,7 @@ func (h *OpenAIGatewayHandler) ChatCompletions(c *gin.Context) {
reqLog = reqLog.With(zap.String("model", reqModel), zap.Bool("stream", reqStream))
setOpsRequestContext(c, reqModel, reqStream, body)
setOpsRequestContext(c, reqModel, reqStream)
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false)))
if decision := h.checkContentModeration(c, reqLog, apiKey, subject, service.ContentModerationProtocolOpenAIChat, reqModel, body); decision != nil && decision.Blocked {

View File

@ -130,7 +130,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
return
}
setOpsRequestContext(c, "", false, body)
setOpsRequestContext(c, "", false)
sessionHashBody := body
if service.IsOpenAIResponsesCompactPathForTest(c) {
if compactSeed := strings.TrimSpace(gjson.GetBytes(body, "prompt_cache_key").String()); compactSeed != "" {
@ -189,7 +189,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
return
}
setOpsRequestContext(c, reqModel, reqStream, body)
setOpsRequestContext(c, reqModel, reqStream)
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false)))
if decision := h.checkContentModeration(c, reqLog, apiKey, subject, service.ContentModerationProtocolOpenAIResponses, reqModel, body); decision != nil && decision.Blocked {
@ -611,7 +611,7 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) {
reqLog = reqLog.With(zap.String("model", reqModel), zap.Bool("stream", reqStream))
setOpsRequestContext(c, reqModel, reqStream, body)
setOpsRequestContext(c, reqModel, reqStream)
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false)))
if decision := h.checkContentModeration(c, reqLog, apiKey, subject, service.ContentModerationProtocolAnthropicMessages, reqModel, body); decision != nil && decision.Blocked {
@ -1174,7 +1174,7 @@ func (h *OpenAIGatewayHandler) ResponsesWebSocket(c *gin.Context) {
zap.Bool("has_previous_response_id", previousResponseID != ""),
zap.String("previous_response_id_kind", previousResponseIDKind),
)
setOpsRequestContext(c, reqModel, true, firstMessage)
setOpsRequestContext(c, reqModel, true)
setOpsEndpointContext(c, "", int16(service.RequestTypeWSV2))
if decision := h.checkContentModeration(c, reqLog, apiKey, subject, service.ContentModerationProtocolOpenAIResponses, reqModel, firstMessage); decision != nil && decision.Blocked {

View File

@ -63,9 +63,9 @@ func (h *OpenAIGatewayHandler) Images(c *gin.Context) {
}
if isMultipartImagesContentType(c.GetHeader("Content-Type")) {
setOpsRequestContext(c, "", false, nil)
setOpsRequestContext(c, "", false)
} else {
setOpsRequestContext(c, "", false, body)
setOpsRequestContext(c, "", false)
}
parsed, err := h.gatewayService.ParseOpenAIImagesRequest(c, body)
@ -98,9 +98,9 @@ func (h *OpenAIGatewayHandler) Images(c *gin.Context) {
}
if parsed.Multipart {
setOpsRequestContext(c, parsed.Model, parsed.Stream, nil)
setOpsRequestContext(c, parsed.Model, parsed.Stream)
} else {
setOpsRequestContext(c, parsed.Model, parsed.Stream, body)
setOpsRequestContext(c, parsed.Model, parsed.Stream)
}
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(parsed.Stream, false)))

View File

@ -25,7 +25,6 @@ import (
const (
opsModelKey = "ops_model"
opsStreamKey = "ops_stream"
opsRequestBodyKey = "ops_request_body"
opsAccountIDKey = "ops_account_id"
opsRoutingCapacityLimitedKey = "ops_routing_capacity_limited"
@ -336,16 +335,13 @@ func opsErrorLogConfig() (workerCount int, queueSize int) {
return workerCount, queueSize
}
func setOpsRequestContext(c *gin.Context, model string, stream bool, requestBody []byte) {
func setOpsRequestContext(c *gin.Context, model string, stream bool) {
if c == nil {
return
}
model = strings.TrimSpace(model)
c.Set(opsModelKey, model)
c.Set(opsStreamKey, stream)
if len(requestBody) > 0 {
c.Set(opsRequestBodyKey, requestBody)
}
if c.Request != nil && model != "" {
ctx := context.WithValue(c.Request.Context(), ctxkey.Model, model)
c.Request = c.Request.WithContext(ctx)
@ -364,22 +360,6 @@ func setOpsEndpointContext(c *gin.Context, upstreamModel string, requestType int
c.Set(opsRequestTypeKey, requestType)
}
func attachOpsRequestBodyToEntry(c *gin.Context, entry *service.OpsInsertErrorLogInput) {
if c == nil || entry == nil {
return
}
v, ok := c.Get(opsRequestBodyKey)
if !ok {
return
}
raw, ok := v.([]byte)
if !ok || len(raw) == 0 {
return
}
entry.RequestBodyJSON, entry.RequestBodyTruncated, entry.RequestBodyBytes = service.PrepareOpsRequestBodyForQueue(raw)
opsErrorLogSanitized.Add(1)
}
func setOpsSelectedAccount(c *gin.Context, accountID int64, platform ...string) {
if c == nil || accountID <= 0 {
return
@ -711,7 +691,7 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
ErrorPhase: "upstream",
ErrorType: "upstream_error",
// Severity/retryability should reflect the upstream failure, not the final client status (200).
// Severity should reflect the upstream failure, not the final client status (200).
Severity: classifyOpsSeverity("upstream_error", effectiveUpstreamStatus),
StatusCode: status,
IsBusinessLimited: false,
@ -728,9 +708,7 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
UpstreamErrorDetail: upstreamErrorDetail,
UpstreamErrors: events,
IsRetryable: classifyOpsIsRetryable("upstream_error", effectiveUpstreamStatus),
RetryCount: 0,
CreatedAt: time.Now(),
CreatedAt: time.Now(),
}
applyOpsLatencyFieldsFromContext(c, entry)
@ -754,10 +732,6 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
entry.ClientIP = &clientIP
}
// Store request headers/body only when an upstream error occurred to keep overhead minimal.
entry.RequestHeadersJSON = extractOpsRetryRequestHeaders(c)
attachOpsRequestBodyToEntry(c, entry)
// Skip logging if a passthrough rule with skip_monitoring=true matched.
if v, ok := c.Get(service.OpsSkipPassthroughKey); ok {
if skip, _ := v.(bool); skip {
@ -870,9 +844,7 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
ErrorSource: errorSource,
ErrorOwner: errorOwner,
IsRetryable: classifyOpsIsRetryable(normalizedType, status),
RetryCount: 0,
CreatedAt: time.Now(),
CreatedAt: time.Now(),
}
applyOpsLatencyFieldsFromContext(c, entry)
@ -950,20 +922,10 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
entry.ClientIP = &clientIP
}
// Persist only a minimal, whitelisted set of request headers to improve retry fidelity.
// Do NOT store Authorization/Cookie/etc.
entry.RequestHeadersJSON = extractOpsRetryRequestHeaders(c)
attachOpsRequestBodyToEntry(c, entry)
enqueueOpsErrorLog(ops, entry)
}
}
var opsRetryRequestHeaderAllowlist = []string{
"anthropic-beta",
"anthropic-version",
}
// isCountTokensRequest checks if the request is a count_tokens request
func isCountTokensRequest(c *gin.Context) bool {
if c == nil || c.Request == nil || c.Request.URL == nil {
@ -972,32 +934,6 @@ func isCountTokensRequest(c *gin.Context) bool {
return strings.Contains(c.Request.URL.Path, "/count_tokens")
}
func extractOpsRetryRequestHeaders(c *gin.Context) *string {
if c == nil || c.Request == nil {
return nil
}
headers := make(map[string]string, 4)
for _, key := range opsRetryRequestHeaderAllowlist {
v := strings.TrimSpace(c.GetHeader(key))
if v == "" {
continue
}
// Keep headers small even if a client sends something unexpected.
headers[key] = truncateString(v, 512)
}
if len(headers) == 0 {
return nil
}
raw, err := json.Marshal(headers)
if err != nil {
return nil
}
s := string(raw)
return &s
}
func applyOpsLatencyFieldsFromContext(c *gin.Context, entry *service.OpsInsertErrorLogInput) {
if c == nil || entry == nil {
return
@ -1199,24 +1135,6 @@ func classifyOpsSeverity(errType string, status int) string {
return "P3"
}
func classifyOpsIsRetryable(errType string, statusCode int) bool {
switch errType {
case "authentication_error", "invalid_request_error":
return false
case "timeout_error":
return true
case "rate_limit_error":
// May be transient (upstream or queue); retry can help.
return true
case "billing_error", "subscription_error":
return false
case "upstream_error", "overloaded_error":
return statusCode >= 500 || statusCode == 429 || statusCode == 529
default:
return statusCode >= 500
}
}
func classifyOpsErrorLog(c *gin.Context, errType, message, code string, status int) (phase string, isBusinessLimited bool, errorOwner string, errorSource string) {
phase = classifyOpsPhase(errType, message, code)
routingCapacityLimited := isOpsRoutingCapacityLimited(c)

View File

@ -44,49 +44,6 @@ func resetOpsErrorLoggerStateForTest(t *testing.T) {
opsErrorLogDrained.Store(false)
}
func TestAttachOpsRequestBodyToEntry_SanitizeAndTrim(t *testing.T) {
resetOpsErrorLoggerStateForTest(t)
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil)
raw := []byte(`{"access_token":"secret-token","messages":[{"role":"user","content":"hello"}]}`)
setOpsRequestContext(c, "claude-3", false, raw)
entry := &service.OpsInsertErrorLogInput{}
attachOpsRequestBodyToEntry(c, entry)
require.NotNil(t, entry.RequestBodyBytes)
require.Equal(t, len(raw), *entry.RequestBodyBytes)
require.NotNil(t, entry.RequestBodyJSON)
require.NotContains(t, *entry.RequestBodyJSON, "secret-token")
require.Contains(t, *entry.RequestBodyJSON, "[REDACTED]")
require.Equal(t, int64(1), OpsErrorLogSanitizedTotal())
}
func TestAttachOpsRequestBodyToEntry_InvalidJSONKeepsSize(t *testing.T) {
resetOpsErrorLoggerStateForTest(t)
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil)
raw := []byte("not-json")
setOpsRequestContext(c, "claude-3", false, raw)
entry := &service.OpsInsertErrorLogInput{}
attachOpsRequestBodyToEntry(c, entry)
require.Nil(t, entry.RequestBodyJSON)
require.NotNil(t, entry.RequestBodyBytes)
require.Equal(t, len(raw), *entry.RequestBodyBytes)
require.False(t, entry.RequestBodyTruncated)
require.Equal(t, int64(1), OpsErrorLogSanitizedTotal())
}
func TestEnqueueOpsErrorLog_QueueFullDrop(t *testing.T) {
resetOpsErrorLoggerStateForTest(t)
@ -108,39 +65,6 @@ func TestEnqueueOpsErrorLog_QueueFullDrop(t *testing.T) {
require.Equal(t, int64(1), OpsErrorLogQueueLength())
}
func TestAttachOpsRequestBodyToEntry_EarlyReturnBranches(t *testing.T) {
resetOpsErrorLoggerStateForTest(t)
gin.SetMode(gin.TestMode)
entry := &service.OpsInsertErrorLogInput{}
attachOpsRequestBodyToEntry(nil, entry)
attachOpsRequestBodyToEntry(&gin.Context{}, nil)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil)
// 无请求体 key
attachOpsRequestBodyToEntry(c, entry)
require.Nil(t, entry.RequestBodyJSON)
require.Nil(t, entry.RequestBodyBytes)
require.False(t, entry.RequestBodyTruncated)
// 错误类型
c.Set(opsRequestBodyKey, "not-bytes")
attachOpsRequestBodyToEntry(c, entry)
require.Nil(t, entry.RequestBodyJSON)
require.Nil(t, entry.RequestBodyBytes)
// 空 bytes
c.Set(opsRequestBodyKey, []byte{})
attachOpsRequestBodyToEntry(c, entry)
require.Nil(t, entry.RequestBodyJSON)
require.Nil(t, entry.RequestBodyBytes)
require.Equal(t, int64(0), OpsErrorLogSanitizedTotal())
}
func TestEnqueueOpsErrorLog_EarlyReturnBranches(t *testing.T) {
resetOpsErrorLoggerStateForTest(t)

View File

@ -54,15 +54,9 @@ INSERT INTO ops_error_logs (
upstream_latency_ms,
response_latency_ms,
time_to_first_token_ms,
request_body,
request_body_truncated,
request_body_bytes,
request_headers,
is_retryable,
retry_count,
created_at
) VALUES (
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35,$36,$37,$38,$39,$40,$41,$42,$43
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35,$36,$37
)`
func NewOpsRepository(db *sql.DB) service.OpsRepository {
@ -170,12 +164,6 @@ func opsInsertErrorLogArgs(input *service.OpsInsertErrorLogInput) []any {
opsNullInt64(input.UpstreamLatencyMs),
opsNullInt64(input.ResponseLatencyMs),
opsNullInt64(input.TimeToFirstTokenMs),
opsNullString(input.RequestBodyJSON),
input.RequestBodyTruncated,
opsNullInt(input.RequestBodyBytes),
opsNullString(input.RequestHeadersJSON),
input.IsRetryable,
input.RetryCount,
input.CreatedAt,
}
}
@ -222,13 +210,10 @@ SELECT
COALESCE(e.upstream_status_code, e.status_code, 0),
COALESCE(e.platform, ''),
COALESCE(e.model, ''),
COALESCE(e.is_retryable, false),
COALESCE(e.retry_count, 0),
COALESCE(e.resolved, false),
e.resolved_at,
e.resolved_by_user_id,
COALESCE(u2.email, ''),
e.resolved_retry_id,
COALESCE(e.client_request_id, ''),
COALESCE(e.request_id, ''),
COALESCE(e.error_message, ''),
@ -277,7 +262,6 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
var resolvedAt sql.NullTime
var resolvedBy sql.NullInt64
var resolvedByName string
var resolvedRetryID sql.NullInt64
var requestType sql.NullInt64
if err := rows.Scan(
&item.ID,
@ -290,13 +274,10 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
&statusCode,
&item.Platform,
&item.Model,
&item.IsRetryable,
&item.RetryCount,
&item.Resolved,
&resolvedAt,
&resolvedBy,
&resolvedByName,
&resolvedRetryID,
&item.ClientRequestID,
&item.RequestID,
&item.Message,
@ -327,10 +308,6 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
item.ResolvedByUserID = &v
}
item.ResolvedByUserName = resolvedByName
if resolvedRetryID.Valid {
v := resolvedRetryID.Int64
item.ResolvedRetryID = &v
}
item.StatusCode = int(statusCode.Int64)
if clientIP.Valid {
s := clientIP.String
@ -393,12 +370,9 @@ SELECT
COALESCE(e.upstream_status_code, e.status_code, 0),
COALESCE(e.platform, ''),
COALESCE(e.model, ''),
COALESCE(e.is_retryable, false),
COALESCE(e.retry_count, 0),
COALESCE(e.resolved, false),
e.resolved_at,
e.resolved_by_user_id,
e.resolved_retry_id,
COALESCE(e.client_request_id, ''),
COALESCE(e.request_id, ''),
COALESCE(e.error_message, ''),
@ -428,11 +402,7 @@ SELECT
e.routing_latency_ms,
e.upstream_latency_ms,
e.response_latency_ms,
e.time_to_first_token_ms,
COALESCE(e.request_body::text, ''),
e.request_body_truncated,
e.request_body_bytes,
COALESCE(e.request_headers::text, '')
e.time_to_first_token_ms
FROM ops_error_logs e
LEFT JOIN users u ON e.user_id = u.id
LEFT JOIN accounts a ON e.account_id = a.id
@ -445,7 +415,6 @@ LIMIT 1`
var upstreamStatusCode sql.NullInt64
var resolvedAt sql.NullTime
var resolvedBy sql.NullInt64
var resolvedRetryID sql.NullInt64
var clientIP sql.NullString
var userID sql.NullInt64
var apiKeyID sql.NullInt64
@ -456,7 +425,6 @@ LIMIT 1`
var upstreamLatency sql.NullInt64
var responseLatency sql.NullInt64
var ttft sql.NullInt64
var requestBodyBytes sql.NullInt64
var requestType sql.NullInt64
err := r.db.QueryRowContext(ctx, q, id).Scan(
@ -470,12 +438,9 @@ LIMIT 1`
&statusCode,
&out.Platform,
&out.Model,
&out.IsRetryable,
&out.RetryCount,
&out.Resolved,
&resolvedAt,
&resolvedBy,
&resolvedRetryID,
&out.ClientRequestID,
&out.RequestID,
&out.Message,
@ -506,10 +471,6 @@ LIMIT 1`
&upstreamLatency,
&responseLatency,
&ttft,
&out.RequestBody,
&out.RequestBodyTruncated,
&requestBodyBytes,
&out.RequestHeaders,
)
if err != nil {
return nil, err
@ -524,10 +485,6 @@ LIMIT 1`
v := resolvedBy.Int64
out.ResolvedByUserID = &v
}
if resolvedRetryID.Valid {
v := resolvedRetryID.Int64
out.ResolvedRetryID = &v
}
if clientIP.Valid {
s := clientIP.String
out.ClientIP = &s
@ -572,25 +529,11 @@ LIMIT 1`
v := ttft.Int64
out.TimeToFirstTokenMs = &v
}
if requestBodyBytes.Valid {
v := int(requestBodyBytes.Int64)
out.RequestBodyBytes = &v
}
if requestType.Valid {
v := int16(requestType.Int64)
out.RequestType = &v
}
// Normalize request_body to empty string when stored as JSON null.
out.RequestBody = strings.TrimSpace(out.RequestBody)
if out.RequestBody == "null" {
out.RequestBody = ""
}
// Normalize request_headers to empty string when stored as JSON null.
out.RequestHeaders = strings.TrimSpace(out.RequestHeaders)
if out.RequestHeaders == "null" {
out.RequestHeaders = ""
}
// Normalize upstream_errors to empty string when stored as JSON null.
out.UpstreamErrors = strings.TrimSpace(out.UpstreamErrors)
if out.UpstreamErrors == "null" {
@ -600,398 +543,7 @@ LIMIT 1`
return &out, nil
}
func (r *opsRepository) InsertRetryAttempt(ctx context.Context, input *service.OpsInsertRetryAttemptInput) (int64, error) {
if r == nil || r.db == nil {
return 0, fmt.Errorf("nil ops repository")
}
if input == nil {
return 0, fmt.Errorf("nil input")
}
if input.SourceErrorID <= 0 {
return 0, fmt.Errorf("invalid source_error_id")
}
if strings.TrimSpace(input.Mode) == "" {
return 0, fmt.Errorf("invalid mode")
}
q := `
INSERT INTO ops_retry_attempts (
requested_by_user_id,
source_error_id,
mode,
pinned_account_id,
status,
started_at
) VALUES (
$1,$2,$3,$4,$5,$6
) RETURNING id`
var id int64
err := r.db.QueryRowContext(
ctx,
q,
opsNullInt64(&input.RequestedByUserID),
input.SourceErrorID,
strings.TrimSpace(input.Mode),
opsNullInt64(input.PinnedAccountID),
strings.TrimSpace(input.Status),
input.StartedAt,
).Scan(&id)
if err != nil {
return 0, err
}
return id, nil
}
func (r *opsRepository) UpdateRetryAttempt(ctx context.Context, input *service.OpsUpdateRetryAttemptInput) error {
if r == nil || r.db == nil {
return fmt.Errorf("nil ops repository")
}
if input == nil {
return fmt.Errorf("nil input")
}
if input.ID <= 0 {
return fmt.Errorf("invalid id")
}
q := `
UPDATE ops_retry_attempts
SET
status = $2,
finished_at = $3,
duration_ms = $4,
success = $5,
http_status_code = $6,
upstream_request_id = $7,
used_account_id = $8,
response_preview = $9,
response_truncated = $10,
result_request_id = $11,
result_error_id = $12,
error_message = $13
WHERE id = $1`
_, err := r.db.ExecContext(
ctx,
q,
input.ID,
strings.TrimSpace(input.Status),
nullTime(input.FinishedAt),
input.DurationMs,
nullBool(input.Success),
nullInt(input.HTTPStatusCode),
opsNullString(input.UpstreamRequestID),
nullInt64(input.UsedAccountID),
opsNullString(input.ResponsePreview),
nullBool(input.ResponseTruncated),
opsNullString(input.ResultRequestID),
nullInt64(input.ResultErrorID),
opsNullString(input.ErrorMessage),
)
return err
}
func (r *opsRepository) GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*service.OpsRetryAttempt, error) {
if r == nil || r.db == nil {
return nil, fmt.Errorf("nil ops repository")
}
if sourceErrorID <= 0 {
return nil, fmt.Errorf("invalid source_error_id")
}
q := `
SELECT
id,
created_at,
COALESCE(requested_by_user_id, 0),
source_error_id,
COALESCE(mode, ''),
pinned_account_id,
COALESCE(status, ''),
started_at,
finished_at,
duration_ms,
success,
http_status_code,
upstream_request_id,
used_account_id,
response_preview,
response_truncated,
result_request_id,
result_error_id,
error_message
FROM ops_retry_attempts
WHERE source_error_id = $1
ORDER BY created_at DESC
LIMIT 1`
var out service.OpsRetryAttempt
var pinnedAccountID sql.NullInt64
var requestedBy sql.NullInt64
var startedAt sql.NullTime
var finishedAt sql.NullTime
var durationMs sql.NullInt64
var success sql.NullBool
var httpStatusCode sql.NullInt64
var upstreamRequestID sql.NullString
var usedAccountID sql.NullInt64
var responsePreview sql.NullString
var responseTruncated sql.NullBool
var resultRequestID sql.NullString
var resultErrorID sql.NullInt64
var errorMessage sql.NullString
err := r.db.QueryRowContext(ctx, q, sourceErrorID).Scan(
&out.ID,
&out.CreatedAt,
&requestedBy,
&out.SourceErrorID,
&out.Mode,
&pinnedAccountID,
&out.Status,
&startedAt,
&finishedAt,
&durationMs,
&success,
&httpStatusCode,
&upstreamRequestID,
&usedAccountID,
&responsePreview,
&responseTruncated,
&resultRequestID,
&resultErrorID,
&errorMessage,
)
if err != nil {
return nil, err
}
out.RequestedByUserID = requestedBy.Int64
if pinnedAccountID.Valid {
v := pinnedAccountID.Int64
out.PinnedAccountID = &v
}
if startedAt.Valid {
t := startedAt.Time
out.StartedAt = &t
}
if finishedAt.Valid {
t := finishedAt.Time
out.FinishedAt = &t
}
if durationMs.Valid {
v := durationMs.Int64
out.DurationMs = &v
}
if success.Valid {
v := success.Bool
out.Success = &v
}
if httpStatusCode.Valid {
v := int(httpStatusCode.Int64)
out.HTTPStatusCode = &v
}
if upstreamRequestID.Valid {
s := upstreamRequestID.String
out.UpstreamRequestID = &s
}
if usedAccountID.Valid {
v := usedAccountID.Int64
out.UsedAccountID = &v
}
if responsePreview.Valid {
s := responsePreview.String
out.ResponsePreview = &s
}
if responseTruncated.Valid {
v := responseTruncated.Bool
out.ResponseTruncated = &v
}
if resultRequestID.Valid {
s := resultRequestID.String
out.ResultRequestID = &s
}
if resultErrorID.Valid {
v := resultErrorID.Int64
out.ResultErrorID = &v
}
if errorMessage.Valid {
s := errorMessage.String
out.ErrorMessage = &s
}
return &out, nil
}
func nullTime(t time.Time) sql.NullTime {
if t.IsZero() {
return sql.NullTime{}
}
return sql.NullTime{Time: t, Valid: true}
}
func nullBool(v *bool) sql.NullBool {
if v == nil {
return sql.NullBool{}
}
return sql.NullBool{Bool: *v, Valid: true}
}
func (r *opsRepository) ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*service.OpsRetryAttempt, error) {
if r == nil || r.db == nil {
return nil, fmt.Errorf("nil ops repository")
}
if sourceErrorID <= 0 {
return nil, fmt.Errorf("invalid source_error_id")
}
if limit <= 0 {
limit = 50
}
if limit > 200 {
limit = 200
}
q := `
SELECT
r.id,
r.created_at,
COALESCE(r.requested_by_user_id, 0),
r.source_error_id,
COALESCE(r.mode, ''),
r.pinned_account_id,
COALESCE(pa.name, ''),
COALESCE(r.status, ''),
r.started_at,
r.finished_at,
r.duration_ms,
r.success,
r.http_status_code,
r.upstream_request_id,
r.used_account_id,
COALESCE(ua.name, ''),
r.response_preview,
r.response_truncated,
r.result_request_id,
r.result_error_id,
r.error_message
FROM ops_retry_attempts r
LEFT JOIN accounts pa ON r.pinned_account_id = pa.id
LEFT JOIN accounts ua ON r.used_account_id = ua.id
WHERE r.source_error_id = $1
ORDER BY r.created_at DESC
LIMIT $2`
rows, err := r.db.QueryContext(ctx, q, sourceErrorID, limit)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
out := make([]*service.OpsRetryAttempt, 0, 16)
for rows.Next() {
var item service.OpsRetryAttempt
var pinnedAccountID sql.NullInt64
var pinnedAccountName string
var requestedBy sql.NullInt64
var startedAt sql.NullTime
var finishedAt sql.NullTime
var durationMs sql.NullInt64
var success sql.NullBool
var httpStatusCode sql.NullInt64
var upstreamRequestID sql.NullString
var usedAccountID sql.NullInt64
var usedAccountName string
var responsePreview sql.NullString
var responseTruncated sql.NullBool
var resultRequestID sql.NullString
var resultErrorID sql.NullInt64
var errorMessage sql.NullString
if err := rows.Scan(
&item.ID,
&item.CreatedAt,
&requestedBy,
&item.SourceErrorID,
&item.Mode,
&pinnedAccountID,
&pinnedAccountName,
&item.Status,
&startedAt,
&finishedAt,
&durationMs,
&success,
&httpStatusCode,
&upstreamRequestID,
&usedAccountID,
&usedAccountName,
&responsePreview,
&responseTruncated,
&resultRequestID,
&resultErrorID,
&errorMessage,
); err != nil {
return nil, err
}
item.RequestedByUserID = requestedBy.Int64
if pinnedAccountID.Valid {
v := pinnedAccountID.Int64
item.PinnedAccountID = &v
}
item.PinnedAccountName = pinnedAccountName
if startedAt.Valid {
t := startedAt.Time
item.StartedAt = &t
}
if finishedAt.Valid {
t := finishedAt.Time
item.FinishedAt = &t
}
if durationMs.Valid {
v := durationMs.Int64
item.DurationMs = &v
}
if success.Valid {
v := success.Bool
item.Success = &v
}
if httpStatusCode.Valid {
v := int(httpStatusCode.Int64)
item.HTTPStatusCode = &v
}
if upstreamRequestID.Valid {
item.UpstreamRequestID = &upstreamRequestID.String
}
if usedAccountID.Valid {
v := usedAccountID.Int64
item.UsedAccountID = &v
}
item.UsedAccountName = usedAccountName
if responsePreview.Valid {
item.ResponsePreview = &responsePreview.String
}
if responseTruncated.Valid {
v := responseTruncated.Bool
item.ResponseTruncated = &v
}
if resultRequestID.Valid {
item.ResultRequestID = &resultRequestID.String
}
if resultErrorID.Valid {
v := resultErrorID.Int64
item.ResultErrorID = &v
}
if errorMessage.Valid {
item.ErrorMessage = &errorMessage.String
}
out = append(out, &item)
}
if err := rows.Err(); err != nil {
return nil, err
}
return out, nil
}
func (r *opsRepository) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error {
func (r *opsRepository) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedAt *time.Time) error {
if r == nil || r.db == nil {
return fmt.Errorf("nil ops repository")
}
@ -1004,8 +556,7 @@ UPDATE ops_error_logs
SET
resolved = $2,
resolved_at = $3,
resolved_by_user_id = $4,
resolved_retry_id = $5
resolved_by_user_id = $4
WHERE id = $1`
at := sql.NullTime{}
@ -1023,7 +574,6 @@ WHERE id = $1`
resolved,
at,
nullInt64(resolvedByUserID),
nullInt64(resolvedRetryID),
)
return err
}

View File

@ -0,0 +1,44 @@
package repository
import (
"reflect"
"strings"
"testing"
"github.com/Wei-Shaw/sub2api/internal/service"
)
func TestOpsErrorLogInsertDoesNotPersistRequestReplayFields(t *testing.T) {
disallowedColumns := []string{
"request_body",
"request_headers",
"request_body_truncated",
"request_body_bytes",
"is_retryable",
"retry_count",
"resolved_retry_id",
}
insertSQL := strings.ToLower(insertOpsErrorLogSQL)
for _, column := range disallowedColumns {
if strings.Contains(insertSQL, column) {
t.Fatalf("ops error log insert still references dropped replay column %q", column)
}
}
inputType := reflect.TypeOf(service.OpsInsertErrorLogInput{})
disallowedFields := []string{
"RequestBodyJSON",
"RequestBodyTruncated",
"RequestBodyBytes",
"RequestHeadersJSON",
"IsRetryable",
"RetryCount",
"ResolvedRetryID",
}
for _, field := range disallowedFields {
if _, ok := inputType.FieldByName(field); ok {
t.Fatalf("OpsInsertErrorLogInput still carries replay field %q", field)
}
}
}

View File

@ -174,22 +174,17 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
// Error logs (legacy)
ops.GET("/errors", h.Admin.Ops.GetErrorLogs)
ops.GET("/errors/:id", h.Admin.Ops.GetErrorLogByID)
ops.GET("/errors/:id/retries", h.Admin.Ops.ListRetryAttempts)
ops.POST("/errors/:id/retry", h.Admin.Ops.RetryErrorRequest)
ops.PUT("/errors/:id/resolve", h.Admin.Ops.UpdateErrorResolution)
// Request errors (client-visible failures)
ops.GET("/request-errors", h.Admin.Ops.ListRequestErrors)
ops.GET("/request-errors/:id", h.Admin.Ops.GetRequestError)
ops.GET("/request-errors/:id/upstream-errors", h.Admin.Ops.ListRequestErrorUpstreamErrors)
ops.POST("/request-errors/:id/retry-client", h.Admin.Ops.RetryRequestErrorClient)
ops.POST("/request-errors/:id/upstream-errors/:idx/retry", h.Admin.Ops.RetryRequestErrorUpstreamEvent)
ops.PUT("/request-errors/:id/resolve", h.Admin.Ops.ResolveRequestError)
// Upstream errors (independent upstream failures)
ops.GET("/upstream-errors", h.Admin.Ops.ListUpstreamErrors)
ops.GET("/upstream-errors/:id", h.Admin.Ops.GetUpstreamError)
ops.POST("/upstream-errors/:id/retry", h.Admin.Ops.RetryUpstreamError)
ops.PUT("/upstream-errors/:id/resolve", h.Admin.Ops.ResolveUpstreamError)
// Request drilldown (success + error)

View File

@ -628,11 +628,6 @@ urlFallbackLoop:
return nil, err
}
// Capture upstream request body for ops retry of this attempt.
if p.c != nil && len(p.body) > 0 {
p.c.Set(OpsUpstreamRequestBodyKey, string(p.body))
}
resp, err = p.httpUpstream.Do(upstreamReq, p.proxyURL, p.account.ID, p.account.Concurrency)
if err == nil && resp == nil {
err = errors.New("upstream returned nil response")

View File

@ -188,11 +188,6 @@ func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardStreamPreservesBodyAnd
require.NotContains(t, rec.Body.String(), `"cache_read_input_tokens":7`, "透传输出不应被网关改写")
require.Equal(t, 7, result.Usage.CacheReadInputTokens, "计费 usage 解析应保留 cached_tokens 兼容")
require.Empty(t, rec.Header().Get("Set-Cookie"), "响应头应经过安全过滤")
rawBody, ok := c.Get(OpsUpstreamRequestBodyKey)
require.True(t, ok)
bodyBytes, ok := rawBody.([]byte)
require.True(t, ok, "应以 []byte 形式缓存上游请求体,避免重复 string 拷贝")
require.Equal(t, "claude-3-haiku-20240307", gjson.GetBytes(bodyBytes, "model").String(), "缓存的上游请求体应包含映射后的模型")
}
func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardCountTokensPreservesBody(t *testing.T) {
@ -938,10 +933,6 @@ func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardDirect_UpstreamRequest
require.Error(t, err)
require.Contains(t, err.Error(), "upstream request failed")
require.Equal(t, http.StatusBadGateway, rec.Code)
rawBody, ok := c.Get(OpsUpstreamRequestBodyKey)
require.True(t, ok)
_, ok = rawBody.([]byte)
require.True(t, ok)
}
func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardDirect_EmptyResponseBody(t *testing.T) {

View File

@ -4525,9 +4525,6 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
// Pre-filter: strip empty text blocks (including nested in tool_result) to prevent upstream 400.
body = StripEmptyTextBlocks(body)
// 重试间复用同一请求体,避免每次 string(body) 产生额外分配。
setOpsUpstreamRequestBody(c, body)
// 重试循环
var resp *http.Response
retryStart := time.Now()
@ -5018,9 +5015,6 @@ func (s *GatewayService) forwardAnthropicAPIKeyPassthroughWithInput(
// Pre-filter: strip empty text blocks (including nested in tool_result) to prevent upstream 400.
input.Body = StripEmptyTextBlocks(input.Body)
// 重试间复用同一请求体,避免每次 string(body) 产生额外分配。
setOpsUpstreamRequestBody(c, input.Body)
var resp *http.Response
retryStart := time.Now()
for attempt := 1; attempt <= maxRetryAttempts; attempt++ {
@ -6201,7 +6195,6 @@ func (s *GatewayService) buildUpstreamRequestAnthropicVertex(
if err != nil {
return nil, err
}
setOpsUpstreamRequestBody(c, vertexBody)
fullURL, err := buildVertexAnthropicURL(account.VertexProjectID(), account.VertexLocation(modelID), modelID, reqStream)
if err != nil {
return nil, err

View File

@ -123,10 +123,6 @@ func (s *GeminiMessagesCompatService) forwardClaudeBodyAsChatCompletions(
}
requestIDHeader = idHeader
if c != nil {
c.Set(OpsUpstreamRequestBodyKey, string(geminiReq))
}
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil {
safeErr := sanitizeUpstreamErrorMessage(err.Error())

View File

@ -766,12 +766,6 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
}
requestIDHeader = idHeader
// Capture upstream request body for ops retry of this attempt.
if c != nil {
// In this code path `body` is already the JSON sent to upstream.
c.Set(OpsUpstreamRequestBodyKey, string(body))
}
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil {
safeErr := sanitizeUpstreamErrorMessage(err.Error())
@ -1293,12 +1287,6 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin.
}
requestIDHeader = idHeader
// Capture upstream request body for ops retry of this attempt.
if c != nil {
// In this code path `body` is already the JSON sent to upstream.
c.Set(OpsUpstreamRequestBodyKey, string(body))
}
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil {
safeErr := sanitizeUpstreamErrorMessage(err.Error())

View File

@ -2473,9 +2473,6 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
return nil, err
}
// Capture upstream request body for ops retry of this attempt.
setOpsUpstreamRequestBody(c, body)
// 命中 WS 时仅走 WebSocket Mode不再自动回退 HTTP。
if wsDecision.Transport == OpenAIUpstreamTransportResponsesWebsocketV2 {
wsReqBody := reqBody
@ -2748,7 +2745,6 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
if err != nil {
return nil, fmt.Errorf("serialize invalid_encrypted_content retry body: %w", err)
}
setOpsUpstreamRequestBody(c, body)
httpInvalidEncryptedContentRetryTried = true
logger.LegacyPrintf("service.openai_gateway", "[OpenAI] Retrying non-WSv2 request once after invalid_encrypted_content (account: %s)", account.Name)
continue
@ -2786,6 +2782,10 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
}
defer func() { _ = resp.Body.Close() }()
reasoningEffort := extractOpenAIReasoningEffort(reqBody, originalModel)
serviceTier := extractOpenAIServiceTier(reqBody)
releaseOpenAIParsedRequestBody(c)
// Handle normal response
var usage *OpenAIUsage
var firstTokenMs *int
@ -2821,9 +2821,6 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
usage = &OpenAIUsage{}
}
reasoningEffort := extractOpenAIReasoningEffort(reqBody, originalModel)
serviceTier := extractOpenAIServiceTier(reqBody)
forwardResult := &OpenAIForwardResult{
RequestID: resp.Header.Get("x-request-id"),
Usage: *usage,
@ -3006,7 +3003,6 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough(
proxyURL = account.Proxy.URL()
}
setOpsUpstreamRequestBody(c, body)
if c != nil {
c.Set("openai_passthrough", true)
}
@ -3045,6 +3041,8 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough(
return nil, s.handleErrorResponsePassthrough(ctx, resp, c, account, body)
}
serviceTier := extractOpenAIServiceTierFromBody(body)
var usage *OpenAIUsage
var firstTokenMs *int
imageCount := 0
@ -3081,7 +3079,7 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough(
Usage: *usage,
Model: reqModel,
UpstreamModel: upstreamPassthroughModel,
ServiceTier: extractOpenAIServiceTierFromBody(body),
ServiceTier: serviceTier,
ReasoningEffort: reasoningEffort,
Stream: reqStream,
OpenAIWSMode: false,
@ -6503,6 +6501,13 @@ func getOpenAIRequestBodyMap(c *gin.Context, body []byte) (map[string]any, error
return reqBody, nil
}
func releaseOpenAIParsedRequestBody(c *gin.Context) {
if c == nil {
return
}
delete(c.Keys, OpenAIParsedRequestBodyKey)
}
func extractOpenAIReasoningEffort(reqBody map[string]any, requestedModel string) *string {
if value, present := getOpenAIReasoningEffortFromReqBody(reqBody); present {
if value == "" {

View File

@ -588,11 +588,7 @@ func (s *OpenAIGatewayService) forwardOpenAIImagesAPIKey(
if err != nil {
return nil, err
}
if !parsed.Multipart {
setOpsUpstreamRequestBody(c, forwardBody)
}
upstreamCtx, releaseUpstreamCtx := detachUpstreamContext(ctx)
upstreamCtx, releaseUpstreamCtx := detachStreamUpstreamContext(ctx, parsed.Stream)
defer releaseUpstreamCtx()
token, _, err := s.GetAccessToken(upstreamCtx, account)

View File

@ -979,8 +979,6 @@ func (s *OpenAIGatewayService) forwardOpenAIImagesOAuth(
if err != nil {
return nil, err
}
setOpsUpstreamRequestBody(c, responsesBody)
upstreamReq, err := s.buildUpstreamRequest(upstreamCtx, c, account, responsesBody, token, true, parsed.StickySessionSeed(), false)
if err != nil {
return nil, err

View File

@ -26,7 +26,6 @@ type opsCleanupTarget struct {
type opsCleanupDeletedCounts struct {
errorLogs int64
retryAttempts int64
alertEvents int64
systemLogs int64
logAudits int64
@ -37,9 +36,8 @@ type opsCleanupDeletedCounts struct {
func (c opsCleanupDeletedCounts) String() string {
return fmt.Sprintf(
"error_logs=%d retry_attempts=%d alert_events=%d system_logs=%d log_audits=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d",
"error_logs=%d alert_events=%d system_logs=%d log_audits=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d",
c.errorLogs,
c.retryAttempts,
c.alertEvents,
c.systemLogs,
c.logAudits,

View File

@ -299,7 +299,6 @@ func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDelet
targets := []opsCleanupTarget{
{effective.ErrorLogRetentionDays, "ops_error_logs", "created_at", false, &out.errorLogs},
{effective.ErrorLogRetentionDays, "ops_retry_attempts", "created_at", false, &out.retryAttempts},
{effective.ErrorLogRetentionDays, "ops_alert_events", "created_at", false, &out.alertEvents},
{effective.ErrorLogRetentionDays, "ops_system_logs", "created_at", false, &out.systemLogs},
{effective.ErrorLogRetentionDays, "ops_system_log_cleanup_audits", "created_at", false, &out.logAudits},

View File

@ -37,14 +37,10 @@ type OpsErrorLog struct {
Platform string `json:"platform"`
Model string `json:"model"`
IsRetryable bool `json:"is_retryable"`
RetryCount int `json:"retry_count"`
Resolved bool `json:"resolved"`
ResolvedAt *time.Time `json:"resolved_at"`
ResolvedByUserID *int64 `json:"resolved_by_user_id"`
ResolvedByUserName string `json:"resolved_by_user_name"`
ResolvedRetryID *int64 `json:"resolved_retry_id"`
ResolvedStatusRaw string `json:"-"`
ClientRequestID string `json:"client_request_id"`
@ -89,12 +85,6 @@ type OpsErrorLogDetail struct {
ResponseLatencyMs *int64 `json:"response_latency_ms"`
TimeToFirstTokenMs *int64 `json:"time_to_first_token_ms"`
// Retry context
RequestBody string `json:"request_body"`
RequestBodyTruncated bool `json:"request_body_truncated"`
RequestBodyBytes *int `json:"request_body_bytes"`
RequestHeaders string `json:"request_headers,omitempty"`
// vNext metric semantics
IsBusinessLimited bool `json:"is_business_limited"`
}
@ -136,55 +126,3 @@ type OpsErrorLogList struct {
Page int `json:"page"`
PageSize int `json:"page_size"`
}
type OpsRetryAttempt struct {
ID int64 `json:"id"`
CreatedAt time.Time `json:"created_at"`
RequestedByUserID int64 `json:"requested_by_user_id"`
SourceErrorID int64 `json:"source_error_id"`
Mode string `json:"mode"`
PinnedAccountID *int64 `json:"pinned_account_id"`
PinnedAccountName string `json:"pinned_account_name"`
Status string `json:"status"`
StartedAt *time.Time `json:"started_at"`
FinishedAt *time.Time `json:"finished_at"`
DurationMs *int64 `json:"duration_ms"`
// Persisted execution results (best-effort)
Success *bool `json:"success"`
HTTPStatusCode *int `json:"http_status_code"`
UpstreamRequestID *string `json:"upstream_request_id"`
UsedAccountID *int64 `json:"used_account_id"`
UsedAccountName string `json:"used_account_name"`
ResponsePreview *string `json:"response_preview"`
ResponseTruncated *bool `json:"response_truncated"`
// Optional correlation
ResultRequestID *string `json:"result_request_id"`
ResultErrorID *int64 `json:"result_error_id"`
ErrorMessage *string `json:"error_message"`
}
type OpsRetryResult struct {
AttemptID int64 `json:"attempt_id"`
Mode string `json:"mode"`
Status string `json:"status"`
PinnedAccountID *int64 `json:"pinned_account_id"`
UsedAccountID *int64 `json:"used_account_id"`
HTTPStatusCode int `json:"http_status_code"`
UpstreamRequestID string `json:"upstream_request_id"`
ResponsePreview string `json:"response_preview"`
ResponseTruncated bool `json:"response_truncated"`
ErrorMessage string `json:"error_message"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at"`
DurationMs int64 `json:"duration_ms"`
}

View File

@ -16,11 +16,7 @@ type OpsRepository interface {
DeleteSystemLogs(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error)
InsertSystemLogCleanupAudit(ctx context.Context, input *OpsSystemLogCleanupAudit) error
InsertRetryAttempt(ctx context.Context, input *OpsInsertRetryAttemptInput) (int64, error)
UpdateRetryAttempt(ctx context.Context, input *OpsUpdateRetryAttemptInput) error
GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*OpsRetryAttempt, error)
ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*OpsRetryAttempt, error)
UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error
UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedAt *time.Time) error
// Lightweight window stats (for realtime WS / quick sampling).
GetWindowStats(ctx context.Context, filter *OpsDashboardFilter) (*OpsWindowStats, error)
@ -121,51 +117,9 @@ type OpsInsertErrorLogInput struct {
ResponseLatencyMs *int64
TimeToFirstTokenMs *int64
RequestBodyJSON *string // sanitized json string (not raw bytes)
RequestBodyTruncated bool
RequestBodyBytes *int
RequestHeadersJSON *string // optional json string
IsRetryable bool
RetryCount int
CreatedAt time.Time
}
type OpsInsertRetryAttemptInput struct {
RequestedByUserID int64
SourceErrorID int64
Mode string
PinnedAccountID *int64
// running|queued etc.
Status string
StartedAt time.Time
}
type OpsUpdateRetryAttemptInput struct {
ID int64
// succeeded|failed
Status string
FinishedAt time.Time
DurationMs int64
// Persisted execution results (best-effort)
Success *bool
HTTPStatusCode *int
UpstreamRequestID *string
UsedAccountID *int64
ResponsePreview *string
ResponseTruncated *bool
// Optional correlation (legacy fields kept)
ResultRequestID *string
ResultErrorID *int64
ErrorMessage *string
}
type OpsInsertSystemMetricsInput struct {
CreatedAt time.Time
WindowMinutes int

View File

@ -69,23 +69,7 @@ func (m *opsRepoMock) InsertSystemLogCleanupAudit(ctx context.Context, input *Op
return nil
}
func (m *opsRepoMock) InsertRetryAttempt(ctx context.Context, input *OpsInsertRetryAttemptInput) (int64, error) {
return 0, nil
}
func (m *opsRepoMock) UpdateRetryAttempt(ctx context.Context, input *OpsUpdateRetryAttemptInput) error {
return nil
}
func (m *opsRepoMock) GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*OpsRetryAttempt, error) {
return nil, nil
}
func (m *opsRepoMock) ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*OpsRetryAttempt, error) {
return []*OpsRetryAttempt{}, nil
}
func (m *opsRepoMock) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error {
func (m *opsRepoMock) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedAt *time.Time) error {
return nil
}

View File

@ -1,726 +0,0 @@
package service
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/domain"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/gin-gonic/gin"
"github.com/lib/pq"
)
const (
OpsRetryModeClient = "client"
OpsRetryModeUpstream = "upstream"
)
const (
opsRetryStatusRunning = "running"
opsRetryStatusSucceeded = "succeeded"
opsRetryStatusFailed = "failed"
)
const (
opsRetryTimeout = 60 * time.Second
opsRetryCaptureBytesLimit = 64 * 1024
opsRetryResponsePreviewMax = 8 * 1024
opsRetryMinIntervalPerError = 10 * time.Second
opsRetryMaxAccountSwitches = 3
)
var opsRetryRequestHeaderAllowlist = map[string]bool{
"anthropic-beta": true,
"anthropic-version": true,
}
type opsRetryRequestType string
const (
opsRetryTypeMessages opsRetryRequestType = "messages"
opsRetryTypeOpenAI opsRetryRequestType = "openai_responses"
opsRetryTypeGeminiV1B opsRetryRequestType = "gemini_v1beta"
)
type limitedResponseWriter struct {
header http.Header
wroteHeader bool
limit int
totalWritten int64
buf bytes.Buffer
}
func newLimitedResponseWriter(limit int) *limitedResponseWriter {
if limit <= 0 {
limit = 1
}
return &limitedResponseWriter{
header: make(http.Header),
limit: limit,
}
}
func (w *limitedResponseWriter) Header() http.Header {
return w.header
}
func (w *limitedResponseWriter) WriteHeader(statusCode int) {
if w.wroteHeader {
return
}
w.wroteHeader = true
}
func (w *limitedResponseWriter) Write(p []byte) (int, error) {
if !w.wroteHeader {
w.WriteHeader(http.StatusOK)
}
w.totalWritten += int64(len(p))
if w.buf.Len() < w.limit {
remaining := w.limit - w.buf.Len()
if len(p) > remaining {
_, _ = w.buf.Write(p[:remaining])
} else {
_, _ = w.buf.Write(p)
}
}
// Pretend we wrote everything to avoid upstream/client code treating it as an error.
return len(p), nil
}
func (w *limitedResponseWriter) Flush() {}
func (w *limitedResponseWriter) bodyBytes() []byte {
return w.buf.Bytes()
}
func (w *limitedResponseWriter) truncated() bool {
return w.totalWritten > int64(w.limit)
}
const (
OpsRetryModeUpstreamEvent = "upstream_event"
)
func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, errorID int64, mode string, pinnedAccountID *int64) (*OpsRetryResult, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
mode = strings.ToLower(strings.TrimSpace(mode))
switch mode {
case OpsRetryModeClient, OpsRetryModeUpstream:
default:
return nil, infraerrors.BadRequest("OPS_RETRY_INVALID_MODE", "mode must be client or upstream")
}
errorLog, err := s.GetErrorLogByID(ctx, errorID)
if err != nil {
return nil, err
}
if errorLog == nil {
return nil, infraerrors.NotFound("OPS_ERROR_NOT_FOUND", "ops error log not found")
}
if strings.TrimSpace(errorLog.RequestBody) == "" {
return nil, infraerrors.BadRequest("OPS_RETRY_NO_REQUEST_BODY", "No request body found to retry")
}
var pinned *int64
if mode == OpsRetryModeUpstream {
if pinnedAccountID != nil && *pinnedAccountID > 0 {
pinned = pinnedAccountID
} else if errorLog.AccountID != nil && *errorLog.AccountID > 0 {
pinned = errorLog.AccountID
} else {
return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "pinned_account_id is required for upstream retry")
}
}
return s.retryWithErrorLog(ctx, requestedByUserID, errorID, mode, mode, pinned, errorLog)
}
// RetryUpstreamEvent retries a specific upstream attempt captured inside ops_error_logs.upstream_errors.
// idx is 0-based. It always pins the original event account_id.
func (s *OpsService) RetryUpstreamEvent(ctx context.Context, requestedByUserID int64, errorID int64, idx int) (*OpsRetryResult, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if idx < 0 {
return nil, infraerrors.BadRequest("OPS_RETRY_INVALID_UPSTREAM_IDX", "invalid upstream idx")
}
errorLog, err := s.GetErrorLogByID(ctx, errorID)
if err != nil {
return nil, err
}
if errorLog == nil {
return nil, infraerrors.NotFound("OPS_ERROR_NOT_FOUND", "ops error log not found")
}
events, err := ParseOpsUpstreamErrors(errorLog.UpstreamErrors)
if err != nil {
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_EVENTS_INVALID", "invalid upstream_errors")
}
if idx >= len(events) {
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_IDX_OOB", "upstream idx out of range")
}
ev := events[idx]
if ev == nil {
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_EVENT_MISSING", "upstream event missing")
}
if ev.AccountID <= 0 {
return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "account_id is required for upstream retry")
}
upstreamBody := strings.TrimSpace(ev.UpstreamRequestBody)
if upstreamBody == "" {
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_NO_REQUEST_BODY", "No upstream request body found to retry")
}
override := *errorLog
override.RequestBody = upstreamBody
pinned := ev.AccountID
// Persist as upstream_event, execute as upstream pinned retry.
return s.retryWithErrorLog(ctx, requestedByUserID, errorID, OpsRetryModeUpstreamEvent, OpsRetryModeUpstream, &pinned, &override)
}
func (s *OpsService) retryWithErrorLog(ctx context.Context, requestedByUserID int64, errorID int64, mode string, execMode string, pinnedAccountID *int64, errorLog *OpsErrorLogDetail) (*OpsRetryResult, error) {
latest, err := s.opsRepo.GetLatestRetryAttemptForError(ctx, errorID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, infraerrors.InternalServer("OPS_RETRY_LOAD_LATEST_FAILED", "Failed to check retry status").WithCause(err)
}
if latest != nil {
if strings.EqualFold(latest.Status, opsRetryStatusRunning) || strings.EqualFold(latest.Status, "queued") {
return nil, infraerrors.Conflict("OPS_RETRY_IN_PROGRESS", "A retry is already in progress for this error")
}
lastAttemptAt := latest.CreatedAt
if latest.FinishedAt != nil && !latest.FinishedAt.IsZero() {
lastAttemptAt = *latest.FinishedAt
} else if latest.StartedAt != nil && !latest.StartedAt.IsZero() {
lastAttemptAt = *latest.StartedAt
}
if time.Since(lastAttemptAt) < opsRetryMinIntervalPerError {
return nil, infraerrors.Conflict("OPS_RETRY_TOO_FREQUENT", "Please wait before retrying this error again")
}
}
if errorLog == nil || strings.TrimSpace(errorLog.RequestBody) == "" {
return nil, infraerrors.BadRequest("OPS_RETRY_NO_REQUEST_BODY", "No request body found to retry")
}
var pinned *int64
if execMode == OpsRetryModeUpstream {
if pinnedAccountID != nil && *pinnedAccountID > 0 {
pinned = pinnedAccountID
} else if errorLog.AccountID != nil && *errorLog.AccountID > 0 {
pinned = errorLog.AccountID
} else {
return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "account_id is required for upstream retry")
}
}
startedAt := time.Now()
attemptID, err := s.opsRepo.InsertRetryAttempt(ctx, &OpsInsertRetryAttemptInput{
RequestedByUserID: requestedByUserID,
SourceErrorID: errorID,
Mode: mode,
PinnedAccountID: pinned,
Status: opsRetryStatusRunning,
StartedAt: startedAt,
})
if err != nil {
var pqErr *pq.Error
if errors.As(err, &pqErr) && string(pqErr.Code) == "23505" {
return nil, infraerrors.Conflict("OPS_RETRY_IN_PROGRESS", "A retry is already in progress for this error")
}
return nil, infraerrors.InternalServer("OPS_RETRY_CREATE_ATTEMPT_FAILED", "Failed to create retry attempt").WithCause(err)
}
result := &OpsRetryResult{
AttemptID: attemptID,
Mode: mode,
Status: opsRetryStatusFailed,
PinnedAccountID: pinned,
HTTPStatusCode: 0,
UpstreamRequestID: "",
ResponsePreview: "",
ResponseTruncated: false,
ErrorMessage: "",
StartedAt: startedAt,
}
execCtx, cancel := context.WithTimeout(ctx, opsRetryTimeout)
defer cancel()
execRes := s.executeRetry(execCtx, errorLog, execMode, pinned)
finishedAt := time.Now()
result.FinishedAt = finishedAt
result.DurationMs = finishedAt.Sub(startedAt).Milliseconds()
if execRes != nil {
result.Status = execRes.status
result.UsedAccountID = execRes.usedAccountID
result.HTTPStatusCode = execRes.httpStatusCode
result.UpstreamRequestID = execRes.upstreamRequestID
result.ResponsePreview = execRes.responsePreview
result.ResponseTruncated = execRes.responseTruncated
result.ErrorMessage = execRes.errorMessage
}
updateCtx, updateCancel := context.WithTimeout(context.Background(), 3*time.Second)
defer updateCancel()
var updateErrMsg *string
if strings.TrimSpace(result.ErrorMessage) != "" {
msg := result.ErrorMessage
updateErrMsg = &msg
}
// Keep legacy result_request_id empty; use upstream_request_id instead.
var resultRequestID *string
finalStatus := result.Status
if strings.TrimSpace(finalStatus) == "" {
finalStatus = opsRetryStatusFailed
}
success := strings.EqualFold(finalStatus, opsRetryStatusSucceeded)
httpStatus := result.HTTPStatusCode
upstreamReqID := result.UpstreamRequestID
usedAccountID := result.UsedAccountID
preview := result.ResponsePreview
truncated := result.ResponseTruncated
if err := s.opsRepo.UpdateRetryAttempt(updateCtx, &OpsUpdateRetryAttemptInput{
ID: attemptID,
Status: finalStatus,
FinishedAt: finishedAt,
DurationMs: result.DurationMs,
Success: &success,
HTTPStatusCode: &httpStatus,
UpstreamRequestID: &upstreamReqID,
UsedAccountID: usedAccountID,
ResponsePreview: &preview,
ResponseTruncated: &truncated,
ResultRequestID: resultRequestID,
ErrorMessage: updateErrMsg,
}); err != nil {
log.Printf("[Ops] UpdateRetryAttempt failed: %v", err)
} else if success {
if err := s.opsRepo.UpdateErrorResolution(updateCtx, errorID, true, &requestedByUserID, &attemptID, &finishedAt); err != nil {
log.Printf("[Ops] UpdateErrorResolution failed: %v", err)
}
}
return result, nil
}
type opsRetryExecution struct {
status string
usedAccountID *int64
httpStatusCode int
upstreamRequestID string
responsePreview string
responseTruncated bool
errorMessage string
}
func (s *OpsService) executeRetry(ctx context.Context, errorLog *OpsErrorLogDetail, mode string, pinnedAccountID *int64) *opsRetryExecution {
if errorLog == nil {
return &opsRetryExecution{
status: opsRetryStatusFailed,
errorMessage: "missing error log",
}
}
reqType := detectOpsRetryType(errorLog.RequestPath)
bodyBytes := []byte(errorLog.RequestBody)
switch reqType {
case opsRetryTypeMessages:
bodyBytes = FilterThinkingBlocksForRetry(bodyBytes)
case opsRetryTypeOpenAI, opsRetryTypeGeminiV1B:
// No-op
}
switch strings.ToLower(strings.TrimSpace(mode)) {
case OpsRetryModeUpstream:
if pinnedAccountID == nil || *pinnedAccountID <= 0 {
return &opsRetryExecution{
status: opsRetryStatusFailed,
errorMessage: "pinned_account_id required for upstream retry",
}
}
return s.executePinnedRetry(ctx, reqType, errorLog, bodyBytes, *pinnedAccountID)
case OpsRetryModeClient:
return s.executeClientRetry(ctx, reqType, errorLog, bodyBytes)
default:
return &opsRetryExecution{
status: opsRetryStatusFailed,
errorMessage: "invalid retry mode",
}
}
}
func detectOpsRetryType(path string) opsRetryRequestType {
p := strings.ToLower(strings.TrimSpace(path))
switch {
case strings.Contains(p, "/responses"), strings.Contains(p, "/images/"):
return opsRetryTypeOpenAI
case strings.Contains(p, "/v1beta/"):
return opsRetryTypeGeminiV1B
default:
return opsRetryTypeMessages
}
}
func (s *OpsService) executePinnedRetry(ctx context.Context, reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte, pinnedAccountID int64) *opsRetryExecution {
if s.accountRepo == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account repository not available"}
}
account, err := s.accountRepo.GetByID(ctx, pinnedAccountID)
if err != nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: fmt.Sprintf("account not found: %v", err)}
}
if account == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account not found"}
}
if !account.IsSchedulable() {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account is not schedulable"}
}
if errorLog.GroupID != nil && *errorLog.GroupID > 0 {
if !containsInt64(account.GroupIDs, *errorLog.GroupID) {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "pinned account is not in the same group as the original request"}
}
}
var release func()
if s.concurrencyService != nil {
acq, err := s.concurrencyService.AcquireAccountSlot(ctx, account.ID, account.Concurrency)
if err != nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: fmt.Sprintf("acquire account slot failed: %v", err)}
}
if acq == nil || !acq.Acquired {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account concurrency limit reached"}
}
release = acq.ReleaseFunc
}
if release != nil {
defer release()
}
usedID := account.ID
exec := s.executeWithAccount(ctx, reqType, errorLog, body, account)
exec.usedAccountID = &usedID
if exec.status == "" {
exec.status = opsRetryStatusFailed
}
return exec
}
func (s *OpsService) executeClientRetry(ctx context.Context, reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte) *opsRetryExecution {
groupID := errorLog.GroupID
if groupID == nil || *groupID <= 0 {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "group_id missing; cannot reselect account"}
}
model, stream, parsedErr := extractRetryModelAndStream(reqType, errorLog, body)
if parsedErr != nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: parsedErr.Error()}
}
_ = stream
excluded := make(map[int64]struct{})
switches := 0
for {
if switches >= opsRetryMaxAccountSwitches {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "retry failed after exhausting account failovers"}
}
selection, selErr := s.selectAccountForRetry(ctx, reqType, groupID, model, excluded)
if selErr != nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: selErr.Error()}
}
if selection == nil || selection.Account == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: ErrNoAvailableAccounts.Error()}
}
account := selection.Account
if !selection.Acquired || selection.ReleaseFunc == nil {
excluded[account.ID] = struct{}{}
switches++
continue
}
attemptCtx := ctx
if switches > 0 {
attemptCtx = WithAccountSwitchCount(attemptCtx, switches, false)
}
exec := func() *opsRetryExecution {
defer selection.ReleaseFunc()
return s.executeWithAccount(attemptCtx, reqType, errorLog, body, account)
}()
if exec != nil {
if exec.status == opsRetryStatusSucceeded {
usedID := account.ID
exec.usedAccountID = &usedID
return exec
}
// If the gateway services ask for failover, try another account.
if s.isFailoverError(exec.errorMessage) {
excluded[account.ID] = struct{}{}
switches++
continue
}
usedID := account.ID
exec.usedAccountID = &usedID
return exec
}
excluded[account.ID] = struct{}{}
switches++
}
}
func (s *OpsService) selectAccountForRetry(ctx context.Context, reqType opsRetryRequestType, groupID *int64, model string, excludedIDs map[int64]struct{}) (*AccountSelectionResult, error) {
switch reqType {
case opsRetryTypeOpenAI:
if s.openAIGatewayService == nil {
return nil, fmt.Errorf("openai gateway service not available")
}
return s.openAIGatewayService.SelectAccountWithLoadAwareness(ctx, groupID, "", model, excludedIDs)
case opsRetryTypeGeminiV1B, opsRetryTypeMessages:
if s.gatewayService == nil {
return nil, fmt.Errorf("gateway service not available")
}
return s.gatewayService.SelectAccountWithLoadAwareness(ctx, groupID, "", model, excludedIDs, "", int64(0)) // 重试不使用会话限制
default:
return nil, fmt.Errorf("unsupported retry type: %s", reqType)
}
}
func extractRetryModelAndStream(reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte) (model string, stream bool, err error) {
switch reqType {
case opsRetryTypeMessages:
parsed, parseErr := ParseGatewayRequest(body, domain.PlatformAnthropic)
if parseErr != nil {
return "", false, fmt.Errorf("failed to parse messages request body: %w", parseErr)
}
return parsed.Model, parsed.Stream, nil
case opsRetryTypeOpenAI:
var v struct {
Model string `json:"model"`
Stream bool `json:"stream"`
}
if err := json.Unmarshal(body, &v); err != nil {
return "", false, fmt.Errorf("failed to parse openai request body: %w", err)
}
return strings.TrimSpace(v.Model), v.Stream, nil
case opsRetryTypeGeminiV1B:
if strings.TrimSpace(errorLog.Model) == "" {
return "", false, fmt.Errorf("missing model for gemini v1beta retry")
}
return strings.TrimSpace(errorLog.Model), errorLog.Stream, nil
default:
return "", false, fmt.Errorf("unsupported retry type: %s", reqType)
}
}
func (s *OpsService) executeWithAccount(ctx context.Context, reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte, account *Account) *opsRetryExecution {
if account == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "missing account"}
}
c, w := newOpsRetryContext(ctx, errorLog)
var err error
switch reqType {
case opsRetryTypeOpenAI:
if s.openAIGatewayService == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "openai gateway service not available"}
}
_, err = s.openAIGatewayService.Forward(ctx, c, account, body)
case opsRetryTypeGeminiV1B:
if s.geminiCompatService == nil || s.antigravityGatewayService == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "gemini services not available"}
}
modelName := strings.TrimSpace(errorLog.Model)
action := "generateContent"
if errorLog.Stream {
action = "streamGenerateContent"
}
if account.Platform == PlatformAntigravity {
_, err = s.antigravityGatewayService.ForwardGemini(ctx, c, account, modelName, action, errorLog.Stream, body, false)
} else {
_, err = s.geminiCompatService.ForwardNative(ctx, c, account, modelName, action, errorLog.Stream, body)
}
case opsRetryTypeMessages:
switch account.Platform {
case PlatformAntigravity:
if s.antigravityGatewayService == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "antigravity gateway service not available"}
}
_, err = s.antigravityGatewayService.Forward(ctx, c, account, body, false)
case PlatformGemini:
if s.geminiCompatService == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "gemini gateway service not available"}
}
_, err = s.geminiCompatService.Forward(ctx, c, account, body)
default:
if s.gatewayService == nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "gateway service not available"}
}
parsedReq, parseErr := ParseGatewayRequest(body, domain.PlatformAnthropic)
if parseErr != nil {
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "failed to parse request body"}
}
_, err = s.gatewayService.Forward(ctx, c, account, parsedReq)
}
default:
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "unsupported retry type"}
}
statusCode := http.StatusOK
if c != nil && c.Writer != nil {
statusCode = c.Writer.Status()
}
upstreamReqID := extractUpstreamRequestID(c)
preview, truncated := extractResponsePreview(w)
exec := &opsRetryExecution{
status: opsRetryStatusFailed,
httpStatusCode: statusCode,
upstreamRequestID: upstreamReqID,
responsePreview: preview,
responseTruncated: truncated,
errorMessage: "",
}
if err == nil && statusCode < 400 {
exec.status = opsRetryStatusSucceeded
return exec
}
if err != nil {
exec.errorMessage = err.Error()
} else {
exec.errorMessage = fmt.Sprintf("upstream returned status %d", statusCode)
}
return exec
}
func newOpsRetryContext(ctx context.Context, errorLog *OpsErrorLogDetail) (*gin.Context, *limitedResponseWriter) {
w := newLimitedResponseWriter(opsRetryCaptureBytesLimit)
c, _ := gin.CreateTestContext(w)
path := "/"
if errorLog != nil && strings.TrimSpace(errorLog.RequestPath) != "" {
path = errorLog.RequestPath
}
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "http://localhost"+path, bytes.NewReader(nil))
req.Header.Set("content-type", "application/json")
if errorLog != nil && strings.TrimSpace(errorLog.UserAgent) != "" {
req.Header.Set("user-agent", errorLog.UserAgent)
}
// Restore a minimal, whitelisted subset of request headers to improve retry fidelity
// (e.g. anthropic-beta / anthropic-version). Never replay auth credentials.
if errorLog != nil && strings.TrimSpace(errorLog.RequestHeaders) != "" {
var stored map[string]string
if err := json.Unmarshal([]byte(errorLog.RequestHeaders), &stored); err == nil {
for k, v := range stored {
key := strings.TrimSpace(k)
if key == "" {
continue
}
if !opsRetryRequestHeaderAllowlist[strings.ToLower(key)] {
continue
}
val := strings.TrimSpace(v)
if val == "" {
continue
}
req.Header.Set(key, val)
}
}
}
c.Request = req
SetOpenAIClientTransport(c, OpenAIClientTransportHTTP)
return c, w
}
func extractUpstreamRequestID(c *gin.Context) string {
if c == nil || c.Writer == nil {
return ""
}
h := c.Writer.Header()
if h == nil {
return ""
}
for _, key := range []string{"x-request-id", "X-Request-Id", "X-Request-ID"} {
if v := strings.TrimSpace(h.Get(key)); v != "" {
return v
}
}
return ""
}
func extractResponsePreview(w *limitedResponseWriter) (preview string, truncated bool) {
if w == nil {
return "", false
}
b := bytes.TrimSpace(w.bodyBytes())
if len(b) == 0 {
return "", w.truncated()
}
if len(b) > opsRetryResponsePreviewMax {
return string(b[:opsRetryResponsePreviewMax]), true
}
return string(b), w.truncated()
}
func containsInt64(items []int64, needle int64) bool {
for _, v := range items {
if v == needle {
return true
}
}
return false
}
func (s *OpsService) isFailoverError(message string) bool {
msg := strings.ToLower(strings.TrimSpace(message))
if msg == "" {
return false
}
return strings.Contains(msg, "upstream error:") && strings.Contains(msg, "failover")
}

View File

@ -1,47 +0,0 @@
package service
import (
"context"
"testing"
"github.com/stretchr/testify/require"
)
func TestNewOpsRetryContext_SetsHTTPTransportAndRequestHeaders(t *testing.T) {
errorLog := &OpsErrorLogDetail{
OpsErrorLog: OpsErrorLog{
RequestPath: "/openai/v1/responses",
},
UserAgent: "ops-retry-agent/1.0",
RequestHeaders: `{
"anthropic-beta":"beta-v1",
"ANTHROPIC-VERSION":"2023-06-01",
"authorization":"Bearer should-not-forward"
}`,
}
c, w := newOpsRetryContext(context.Background(), errorLog)
require.NotNil(t, c)
require.NotNil(t, w)
require.NotNil(t, c.Request)
require.Equal(t, "/openai/v1/responses", c.Request.URL.Path)
require.Equal(t, "application/json", c.Request.Header.Get("Content-Type"))
require.Equal(t, "ops-retry-agent/1.0", c.Request.Header.Get("User-Agent"))
require.Equal(t, "beta-v1", c.Request.Header.Get("anthropic-beta"))
require.Equal(t, "2023-06-01", c.Request.Header.Get("anthropic-version"))
require.Empty(t, c.Request.Header.Get("authorization"), "未在白名单内的敏感头不应被重放")
require.Equal(t, OpenAIClientTransportHTTP, GetOpenAIClientTransport(c))
}
func TestNewOpsRetryContext_InvalidHeadersJSONStillSetsHTTPTransport(t *testing.T) {
errorLog := &OpsErrorLogDetail{
RequestHeaders: "{invalid-json",
}
c, _ := newOpsRetryContext(context.Background(), errorLog)
require.NotNil(t, c)
require.NotNil(t, c.Request)
require.Equal(t, "/", c.Request.URL.Path)
require.Equal(t, OpenAIClientTransportHTTP, GetOpenAIClientTransport(c))
}

View File

@ -16,26 +16,9 @@ import (
var ErrOpsDisabled = infraerrors.NotFound("OPS_DISABLED", "Ops monitoring is disabled")
const (
opsMaxStoredRequestBodyBytes = 256 * 1024
opsMaxStoredErrorBodyBytes = 20 * 1024
opsMaxStoredErrorBodyBytes = 20 * 1024
)
// PrepareOpsRequestBodyForQueue 在入队前对请求体执行脱敏与裁剪,返回可直接写入 OpsInsertErrorLogInput 的字段。
// 该方法用于避免异步队列持有大块原始请求体,减少错误风暴下的内存放大风险。
func PrepareOpsRequestBodyForQueue(raw []byte) (requestBodyJSON *string, truncated bool, requestBodyBytes *int) {
if len(raw) == 0 {
return nil, false, nil
}
sanitized, truncated, bytesLen := sanitizeAndTrimRequestBody(raw, opsMaxStoredRequestBodyBytes)
if sanitized != "" {
out := sanitized
requestBodyJSON = &out
}
n := bytesLen
requestBodyBytes = &n
return requestBodyJSON, truncated, requestBodyBytes
}
// OpsService provides ingestion and query APIs for the Ops monitoring module.
type OpsService struct {
opsRepo OpsRepository
@ -138,8 +121,8 @@ func (s *OpsService) IsMonitoringEnabled(ctx context.Context) bool {
}
}
func (s *OpsService) RecordError(ctx context.Context, entry *OpsInsertErrorLogInput, rawRequestBody []byte) error {
prepared, ok, err := s.prepareErrorLogInput(ctx, entry, rawRequestBody)
func (s *OpsService) RecordError(ctx context.Context, entry *OpsInsertErrorLogInput) error {
prepared, ok, err := s.prepareErrorLogInput(ctx, entry)
if err != nil {
log.Printf("[Ops] RecordError prepare failed: %v", err)
return err
@ -162,7 +145,7 @@ func (s *OpsService) RecordErrorBatch(ctx context.Context, entries []*OpsInsertE
}
prepared := make([]*OpsInsertErrorLogInput, 0, len(entries))
for _, entry := range entries {
item, ok, err := s.prepareErrorLogInput(ctx, entry, nil)
item, ok, err := s.prepareErrorLogInput(ctx, entry)
if err != nil {
log.Printf("[Ops] RecordErrorBatch prepare failed: %v", err)
continue
@ -198,7 +181,7 @@ func (s *OpsService) RecordErrorBatch(ctx context.Context, entries []*OpsInsertE
return nil
}
func (s *OpsService) prepareErrorLogInput(ctx context.Context, entry *OpsInsertErrorLogInput, rawRequestBody []byte) (*OpsInsertErrorLogInput, bool, error) {
func (s *OpsService) prepareErrorLogInput(ctx context.Context, entry *OpsInsertErrorLogInput) (*OpsInsertErrorLogInput, bool, error) {
if entry == nil {
return nil, false, nil
}
@ -224,11 +207,6 @@ func (s *OpsService) prepareErrorLogInput(ctx context.Context, entry *OpsInsertE
entry.ErrorType = "api_error"
}
// Sanitize + trim request body (errors only).
if len(rawRequestBody) > 0 {
entry.RequestBodyJSON, entry.RequestBodyTruncated, entry.RequestBodyBytes = PrepareOpsRequestBodyForQueue(rawRequestBody)
}
// Sanitize + truncate error_body to avoid storing sensitive data.
if strings.TrimSpace(entry.ErrorBody) != "" {
sanitized, _ := sanitizeErrorBodyForStorage(entry.ErrorBody, opsMaxStoredErrorBodyBytes)
@ -315,25 +293,6 @@ func sanitizeOpsUpstreamErrors(entry *OpsInsertErrorLogInput) error {
out.Detail = ""
}
out.UpstreamRequestBody = strings.TrimSpace(out.UpstreamRequestBody)
if out.UpstreamRequestBody != "" {
// Reuse the same sanitization/trimming strategy as request body storage.
// Keep it small so it is safe to persist in ops_error_logs JSON.
sanitizedBody, truncated, _ := sanitizeAndTrimRequestBody([]byte(out.UpstreamRequestBody), 10*1024)
if sanitizedBody != "" {
out.UpstreamRequestBody = sanitizedBody
if truncated {
out.Kind = strings.TrimSpace(out.Kind)
if out.Kind == "" {
out.Kind = "upstream"
}
out.Kind = out.Kind + ":request_body_truncated"
}
} else {
out.UpstreamRequestBody = ""
}
}
// Drop fully-empty events (can happen if only status code was known).
if out.UpstreamStatusCode == 0 && out.Message == "" && out.Detail == "" {
continue
@ -381,27 +340,7 @@ func (s *OpsService) GetErrorLogByID(ctx context.Context, id int64) (*OpsErrorLo
return detail, nil
}
func (s *OpsService) ListRetryAttemptsByErrorID(ctx context.Context, errorID int64, limit int) ([]*OpsRetryAttempt, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if errorID <= 0 {
return nil, infraerrors.BadRequest("OPS_ERROR_INVALID_ID", "invalid error id")
}
items, err := s.opsRepo.ListRetryAttemptsByErrorID(ctx, errorID, limit)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return []*OpsRetryAttempt{}, nil
}
return nil, infraerrors.InternalServer("OPS_RETRY_LIST_FAILED", "Failed to list retry attempts").WithCause(err)
}
return items, nil
}
func (s *OpsService) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64) error {
func (s *OpsService) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64) error {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return err
}
@ -418,10 +357,10 @@ func (s *OpsService) UpdateErrorResolution(ctx context.Context, errorID int64, r
}
return infraerrors.InternalServer("OPS_ERROR_LOAD_FAILED", "Failed to load ops error log").WithCause(err)
}
return s.opsRepo.UpdateErrorResolution(ctx, errorID, resolved, resolvedByUserID, resolvedRetryID, nil)
return s.opsRepo.UpdateErrorResolution(ctx, errorID, resolved, resolvedByUserID, nil)
}
func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, truncated bool, bytesLen int) {
func sanitizeAndTrimJSONPayload(raw []byte, maxBytes int) (jsonString string, truncated bool, bytesLen int) {
bytesLen = len(raw)
if len(raw) == 0 {
return "", false, 0
@ -429,7 +368,7 @@ func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, tr
var decoded any
if err := json.Unmarshal(raw, &decoded); err != nil {
// If it's not valid JSON, don't store (retry would not be reliable anyway).
// If it is not valid JSON, fall back to the caller's non-JSON handling.
return "", false, bytesLen
}
@ -465,7 +404,7 @@ func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, tr
// This avoids downstream code that expects certain top-level keys from crashing.
if root, ok := decoded.(map[string]any); ok {
placeholder := shallowCopyMap(root)
placeholder["request_body_truncated"] = true
placeholder["payload_truncated"] = true
// Replace potentially huge arrays/strings, but keep the keys present.
for _, k := range []string{"messages", "contents", "input", "prompt"} {
@ -488,7 +427,7 @@ func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, tr
}
// Final fallback: minimal valid JSON.
encoded4, err4 := json.Marshal(map[string]any{"request_body_truncated": true})
encoded4, err4 := json.Marshal(map[string]any{"payload_truncated": true})
if err4 != nil {
return "", true, bytesLen
}
@ -732,7 +671,7 @@ func sanitizeErrorBodyForStorage(raw string, maxBytes int) (sanitized string, tr
}
// Prefer JSON-safe sanitization when possible.
if out, trunc, _ := sanitizeAndTrimRequestBody([]byte(raw), maxBytes); out != "" {
if out, trunc, _ := sanitizeAndTrimJSONPayload([]byte(raw), maxBytes); out != "" {
return out, trunc
}

View File

@ -31,11 +31,10 @@ func TestOpsServiceRecordErrorBatch_SanitizesAndBatches(t *testing.T) {
UpstreamErrorDetail: strPtr(detail),
UpstreamErrors: []*OpsUpstreamErrorEvent{
{
AccountID: -2,
UpstreamStatusCode: 429,
Message: " token leaked ",
Detail: `{"refresh_token":"secret"}`,
UpstreamRequestBody: `{"api_key":"secret","messages":[{"role":"user","content":"hello"}]}`,
AccountID: -2,
UpstreamStatusCode: 429,
Message: " token leaked ",
Detail: `{"refresh_token":"secret"}`,
},
},
},

View File

@ -1,60 +0,0 @@
package service
import (
"encoding/json"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func TestPrepareOpsRequestBodyForQueue_EmptyBody(t *testing.T) {
requestBodyJSON, truncated, requestBodyBytes := PrepareOpsRequestBodyForQueue(nil)
require.Nil(t, requestBodyJSON)
require.False(t, truncated)
require.Nil(t, requestBodyBytes)
}
func TestPrepareOpsRequestBodyForQueue_InvalidJSON(t *testing.T) {
raw := []byte("{invalid-json")
requestBodyJSON, truncated, requestBodyBytes := PrepareOpsRequestBodyForQueue(raw)
require.Nil(t, requestBodyJSON)
require.False(t, truncated)
require.NotNil(t, requestBodyBytes)
require.Equal(t, len(raw), *requestBodyBytes)
}
func TestPrepareOpsRequestBodyForQueue_RedactSensitiveFields(t *testing.T) {
raw := []byte(`{
"model":"claude-3-5-sonnet-20241022",
"api_key":"sk-test-123",
"headers":{"authorization":"Bearer secret-token"},
"messages":[{"role":"user","content":"hello"}]
}`)
requestBodyJSON, truncated, requestBodyBytes := PrepareOpsRequestBodyForQueue(raw)
require.NotNil(t, requestBodyJSON)
require.NotNil(t, requestBodyBytes)
require.False(t, truncated)
require.Equal(t, len(raw), *requestBodyBytes)
var body map[string]any
require.NoError(t, json.Unmarshal([]byte(*requestBodyJSON), &body))
require.Equal(t, "[REDACTED]", body["api_key"])
headers, ok := body["headers"].(map[string]any)
require.True(t, ok)
require.Equal(t, "[REDACTED]", headers["authorization"])
}
func TestPrepareOpsRequestBodyForQueue_LargeBodyTruncated(t *testing.T) {
largeMsg := strings.Repeat("x", opsMaxStoredRequestBodyBytes*2)
raw := []byte(`{"model":"claude-3-5-sonnet-20241022","messages":[{"role":"user","content":"` + largeMsg + `"}]}`)
requestBodyJSON, truncated, requestBodyBytes := PrepareOpsRequestBodyForQueue(raw)
require.NotNil(t, requestBodyJSON)
require.NotNil(t, requestBodyBytes)
require.True(t, truncated)
require.Equal(t, len(raw), *requestBodyBytes)
require.LessOrEqual(t, len(*requestBodyJSON), opsMaxStoredRequestBodyBytes)
require.Contains(t, *requestBodyJSON, "request_body_truncated")
}

View File

@ -45,11 +45,11 @@ func TestIsSensitiveKey_TokenBudgetKeysNotRedacted(t *testing.T) {
}
}
func TestSanitizeAndTrimRequestBody_PreservesTokenBudgetFields(t *testing.T) {
func TestSanitizeAndTrimJSONPayload_PreservesTokenBudgetFields(t *testing.T) {
t.Parallel()
raw := []byte(`{"model":"claude-3","max_tokens":123,"thinking":{"type":"enabled","budget_tokens":456},"access_token":"abc","messages":[{"role":"user","content":"hi"}]}`)
out, _, _ := sanitizeAndTrimRequestBody(raw, 10*1024)
out, _, _ := sanitizeAndTrimJSONPayload(raw, 10*1024)
if out == "" {
t.Fatalf("expected non-empty sanitized output")
}

View File

@ -16,11 +16,6 @@ const (
OpsUpstreamErrorDetailKey = "ops_upstream_error_detail"
OpsUpstreamErrorsKey = "ops_upstream_errors"
// Best-effort capture of the current upstream request body so ops can
// retry the specific upstream attempt (not just the client request).
// This value is sanitized+trimmed before being persisted.
OpsUpstreamRequestBodyKey = "ops_upstream_request_body"
// Optional stage latencies (milliseconds) for troubleshooting and alerting.
OpsAuthLatencyMsKey = "ops_auth_latency_ms"
OpsRoutingLatencyMsKey = "ops_routing_latency_ms"
@ -44,14 +39,6 @@ const (
OpsClientBusinessLimitedReasonIPRestriction = "api_key_ip_restriction"
)
func setOpsUpstreamRequestBody(c *gin.Context, body []byte) {
if c == nil || len(body) == 0 {
return
}
// 热路径避免 string(body) 额外分配,按需在落库前再转换。
c.Set(OpsUpstreamRequestBodyKey, body)
}
func SetOpsLatencyMs(c *gin.Context, key string, value int64) {
if c == nil || strings.TrimSpace(key) == "" || value < 0 {
return
@ -125,10 +112,6 @@ type OpsUpstreamErrorEvent struct {
// Helps debug 404/routing errors by showing which endpoint was targeted.
UpstreamURL string `json:"upstream_url,omitempty"`
// Best-effort upstream request capture (sanitized+trimmed).
// Required for retrying a specific upstream attempt.
UpstreamRequestBody string `json:"upstream_request_body,omitempty"`
// Best-effort upstream response capture (sanitized+trimmed).
UpstreamResponseBody string `json:"upstream_response_body,omitempty"`
@ -148,7 +131,6 @@ func appendOpsUpstreamError(c *gin.Context, ev OpsUpstreamErrorEvent) {
}
ev.Platform = strings.TrimSpace(ev.Platform)
ev.UpstreamRequestID = strings.TrimSpace(ev.UpstreamRequestID)
ev.UpstreamRequestBody = strings.TrimSpace(ev.UpstreamRequestBody)
ev.UpstreamResponseBody = strings.TrimSpace(ev.UpstreamResponseBody)
ev.Kind = strings.TrimSpace(ev.Kind)
ev.UpstreamURL = strings.TrimSpace(ev.UpstreamURL)
@ -158,19 +140,6 @@ func appendOpsUpstreamError(c *gin.Context, ev OpsUpstreamErrorEvent) {
ev.Message = sanitizeUpstreamErrorMessage(ev.Message)
}
// If the caller didn't explicitly pass upstream request body but the gateway
// stored it on the context, attach it so ops can retry this specific attempt.
if ev.UpstreamRequestBody == "" {
if v, ok := c.Get(OpsUpstreamRequestBodyKey); ok {
switch raw := v.(type) {
case string:
ev.UpstreamRequestBody = strings.TrimSpace(raw)
case []byte:
ev.UpstreamRequestBody = strings.TrimSpace(string(raw))
}
}
}
var existing []*OpsUpstreamErrorEvent
if v, ok := c.Get(OpsUpstreamErrorsKey); ok {
if arr, ok := v.([]*OpsUpstreamErrorEvent); ok {

View File

@ -1,10 +1,8 @@
package service
import (
"net/http/httptest"
"testing"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
)
@ -28,41 +26,3 @@ func TestSafeUpstreamURL(t *testing.T) {
})
}
}
func TestAppendOpsUpstreamError_UsesRequestBodyBytesFromContext(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
setOpsUpstreamRequestBody(c, []byte(`{"model":"gpt-5"}`))
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Kind: "http_error",
Message: "upstream failed",
})
v, ok := c.Get(OpsUpstreamErrorsKey)
require.True(t, ok)
events, ok := v.([]*OpsUpstreamErrorEvent)
require.True(t, ok)
require.Len(t, events, 1)
require.Equal(t, `{"model":"gpt-5"}`, events[0].UpstreamRequestBody)
}
func TestAppendOpsUpstreamError_UsesRequestBodyStringFromContext(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Set(OpsUpstreamRequestBodyKey, `{"model":"gpt-4"}`)
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Kind: "request_error",
Message: "dial timeout",
})
v, ok := c.Get(OpsUpstreamErrorsKey)
require.True(t, ok)
events, ok := v.([]*OpsUpstreamErrorEvent)
require.True(t, ok)
require.Len(t, events, 1)
require.Equal(t, `{"model":"gpt-4"}`, events[0].UpstreamRequestBody)
}

View File

@ -0,0 +1,10 @@
package service
func containsInt64(values []int64, target int64) bool {
for _, v := range values {
if v == target {
return true
}
}
return false
}

View File

@ -0,0 +1,16 @@
-- Remove unused Ops retry/replay storage.
-- The retry endpoints are no longer exposed, so keeping request bodies and
-- retry audit rows only increases write width, memory retention, and DB size.
DROP TABLE IF EXISTS ops_retry_attempts CASCADE;
ALTER TABLE ops_error_logs
DROP COLUMN IF EXISTS request_body,
DROP COLUMN IF EXISTS request_headers,
DROP COLUMN IF EXISTS request_body_truncated,
DROP COLUMN IF EXISTS request_body_bytes,
DROP COLUMN IF EXISTS is_retryable,
DROP COLUMN IF EXISTS retry_count,
DROP COLUMN IF EXISTS resolved_retry_id;
COMMENT ON TABLE ops_error_logs IS 'Ops error logs (vNext). Stores sanitized error details; request replay storage removed.';

View File

@ -1,52 +1,18 @@
/**
* Admin Ops API endpoints (vNext)
* - Error logs list/detail + retry (client/upstream)
* - Error logs list/detail
* - Dashboard overview (raw path)
*/
import { apiClient } from '../client'
import type { PaginatedResponse } from '@/types'
export type OpsRetryMode = 'client' | 'upstream'
export type OpsQueryMode = 'auto' | 'raw' | 'preagg'
export interface OpsRequestOptions {
signal?: AbortSignal
}
export interface OpsRetryRequest {
mode: OpsRetryMode
pinned_account_id?: number
force?: boolean
}
export interface OpsRetryAttempt {
id: number
created_at: string
requested_by_user_id: number
source_error_id: number
mode: string
pinned_account_id?: number | null
pinned_account_name?: string
status: string
started_at?: string | null
finished_at?: string | null
duration_ms?: number | null
success?: boolean | null
http_status_code?: number | null
upstream_request_id?: string | null
used_account_id?: number | null
used_account_name?: string
response_preview?: string | null
response_truncated?: boolean | null
result_request_id?: string | null
result_error_id?: number | null
error_message?: string | null
}
export type OpsUpstreamErrorEvent = {
at_unix_ms?: number
platform?: string
@ -54,33 +20,11 @@ export type OpsUpstreamErrorEvent = {
account_name?: string
upstream_status_code?: number
upstream_request_id?: string
upstream_request_body?: string
kind?: string
message?: string
detail?: string
}
export interface OpsRetryResult {
attempt_id: number
mode: OpsRetryMode
status: 'running' | 'succeeded' | 'failed' | string
pinned_account_id?: number | null
used_account_id?: number | null
http_status_code: number
upstream_request_id: string
response_preview: string
response_truncated: boolean
error_message: string
started_at: string
finished_at: string
duration_ms: number
}
export interface OpsDashboardOverview {
start_time: string
end_time: string
@ -946,13 +890,9 @@ export interface OpsErrorLog {
platform: string
model: string
is_retryable: boolean
retry_count: number
resolved: boolean
resolved_at?: string | null
resolved_by_user_id?: number | null
resolved_retry_id?: number | null
client_request_id: string
request_id: string
@ -994,10 +934,6 @@ export interface OpsErrorDetail extends OpsErrorLog {
response_latency_ms?: number | null
time_to_first_token_ms?: number | null
request_body: string
request_body_truncated: boolean
request_body_bytes?: number | null
is_business_limited: boolean
}
@ -1156,16 +1092,6 @@ export async function getErrorLogDetail(id: number): Promise<OpsErrorDetail> {
return data
}
export async function retryErrorRequest(id: number, req: OpsRetryRequest): Promise<OpsRetryResult> {
const { data } = await apiClient.post<OpsRetryResult>(`/admin/ops/errors/${id}/retry`, req)
return data
}
export async function listRetryAttempts(errorId: number, limit = 50): Promise<OpsRetryAttempt[]> {
const { data } = await apiClient.get<OpsRetryAttempt[]>(`/admin/ops/errors/${errorId}/retries`, { params: { limit } })
return data
}
export async function updateErrorResolved(errorId: number, resolved: boolean): Promise<void> {
await apiClient.put(`/admin/ops/errors/${errorId}/resolve`, { resolved })
}
@ -1191,21 +1117,6 @@ export async function getUpstreamErrorDetail(id: number): Promise<OpsErrorDetail
return data
}
export async function retryRequestErrorClient(id: number): Promise<OpsRetryResult> {
const { data } = await apiClient.post<OpsRetryResult>(`/admin/ops/request-errors/${id}/retry-client`, {})
return data
}
export async function retryRequestErrorUpstreamEvent(id: number, idx: number): Promise<OpsRetryResult> {
const { data } = await apiClient.post<OpsRetryResult>(`/admin/ops/request-errors/${id}/upstream-errors/${idx}/retry`, {})
return data
}
export async function retryUpstreamError(id: number): Promise<OpsRetryResult> {
const { data } = await apiClient.post<OpsRetryResult>(`/admin/ops/upstream-errors/${id}/retry`, {})
return data
}
export async function updateRequestErrorResolved(errorId: number, resolved: boolean): Promise<void> {
await apiClient.put(`/admin/ops/request-errors/${errorId}/resolve`, { resolved })
}
@ -1380,8 +1291,6 @@ export const opsAPI = {
// Legacy unified endpoints
listErrorLogs,
getErrorLogDetail,
retryErrorRequest,
listRetryAttempts,
updateErrorResolved,
// New split endpoints
@ -1389,9 +1298,6 @@ export const opsAPI = {
listUpstreamErrors,
getRequestErrorDetail,
getUpstreamErrorDetail,
retryRequestErrorClient,
retryRequestErrorUpstreamEvent,
retryUpstreamError,
updateRequestErrorResolved,
updateUpstreamErrorResolved,
listRequestErrorUpstreamErrors,

View File

@ -4636,20 +4636,13 @@ export default {
titleWithId: 'Error #{id}',
noErrorSelected: 'No error selected.',
resolution: 'Resolved:',
pinnedToOriginalAccountId: 'Pinned to original account_id',
missingUpstreamRequestBody: 'Missing upstream request body',
failedToLoadRetryHistory: 'Failed to load retry history',
failedToUpdateResolvedStatus: 'Failed to update resolved status',
unsupportedRetryMode: 'Unsupported retry mode',
classificationKeys: {
phase: 'Phase',
owner: 'Owner',
source: 'Source',
retryable: 'Retryable',
resolvedAt: 'Resolved At',
resolvedBy: 'Resolved By',
resolvedRetryId: 'Resolved Retry',
retryCount: 'Retry Count'
resolvedBy: 'Resolved By'
},
source: {
upstream_http: 'Upstream HTTP'
@ -4669,11 +4662,6 @@ export default {
expand: 'Response (click to expand)',
collapse: 'Response (click to collapse)'
},
retryMeta: {
used: 'Used',
success: 'Success',
pinned: 'Pinned'
},
loading: 'Loading…',
requestId: 'Request ID',
time: 'Time',
@ -4705,48 +4693,18 @@ export default {
upstream: 'Upstream',
response: 'Response',
classification: 'Classification',
notRetryable: 'Not recommended to retry',
retry: 'Retry',
retryClient: 'Retry (Client)',
retryUpstream: 'Retry (Upstream pinned)',
pinnedAccountId: 'Pinned account_id',
retryNotes: 'Retry Notes',
requestBody: 'Request Body',
errorBody: 'Error Body',
trimmed: 'trimmed',
confirmRetry: 'Confirm Retry',
retrySuccess: 'Retry succeeded',
retryFailed: 'Retry failed',
retryHint: 'Retry will resend the request with the same parameters',
retryClientHint: 'Use client retry (no account pinning)',
retryUpstreamHint: 'Use upstream pinned retry (pin to the error account)',
pinnedAccountIdHint: '(auto from error log)',
retryNote1: 'Retry will use the same request body and parameters',
retryNote2: 'If the original request failed due to account issues, pinned retry may still fail',
retryNote3: 'Client retry will reselect an account',
retryNote4: 'You can force retry for non-retryable errors, but it is not recommended',
confirmRetryMessage: 'Confirm retry this request?',
confirmRetryHint: 'Will resend with the same request parameters',
forceRetry: 'I understand and want to force retry',
forceRetryHint: 'This error usually cannot be fixed by retry; check to proceed',
forceRetryNeedAck: 'Please check to force retry',
markResolved: 'Mark resolved',
markUnresolved: 'Mark unresolved',
viewRetries: 'Retry history',
retryHistory: 'Retry History',
tabOverview: 'Overview',
tabRetries: 'Retries',
tabRequest: 'Request',
tabResponse: 'Response',
responseBody: 'Response',
compareA: 'Compare A',
compareB: 'Compare B',
retrySummary: 'Retry Summary',
responseHintSucceeded: 'Showing succeeded retry response_preview (#{id})',
responseHintFallback: 'No succeeded retry found; showing stored error_body',
suggestion: 'Suggestion',
suggestUpstreamResolved: '✓ Upstream error resolved by retry; no action needed',
suggestUpstream: 'Upstream instability: check account status, consider switching accounts, or retry',
suggestUpstream: 'Upstream instability: check account status or consider switching accounts',
suggestRequest: 'Client request error: ask customer to fix request parameters',
suggestAuth: 'Auth failed: verify API key/credentials',
suggestPlatform: 'Platform error: prioritize investigation and fix',

View File

@ -4798,20 +4798,13 @@ export default {
titleWithId: '错误 #{id}',
noErrorSelected: '未选择错误。',
resolution: '已解决:',
pinnedToOriginalAccountId: '固定到原 account_id',
missingUpstreamRequestBody: '缺少上游请求体',
failedToLoadRetryHistory: '加载重试历史失败',
failedToUpdateResolvedStatus: '更新解决状态失败',
unsupportedRetryMode: '不支持的重试模式',
classificationKeys: {
phase: '阶段',
owner: '归属方',
source: '来源',
retryable: '可重试',
resolvedAt: '解决时间',
resolvedBy: '解决人',
resolvedRetryId: '解决重试ID',
retryCount: '重试次数'
resolvedBy: '解决人'
},
source: {
upstream_http: '上游 HTTP'
@ -4831,11 +4824,6 @@ export default {
expand: '响应内容(点击展开)',
collapse: '响应内容(点击收起)'
},
retryMeta: {
used: '使用账号',
success: '成功',
pinned: '固定账号'
},
loading: '加载中…',
requestId: '请求 ID',
time: '时间',
@ -4867,48 +4855,18 @@ export default {
upstream: '上游',
response: '响应',
classification: '错误分类',
notRetryable: '此错误不建议重试',
retry: '重试',
retryClient: '重试(客户端)',
retryUpstream: '重试(上游固定)',
pinnedAccountId: '固定 account_id',
retryNotes: '重试说明',
requestBody: '请求体',
errorBody: '错误体',
trimmed: '已截断',
confirmRetry: '确认重试',
retrySuccess: '重试成功',
retryFailed: '重试失败',
retryHint: '重试将使用相同的请求参数重新发送请求',
retryClientHint: '使用客户端重试(不固定账号)',
retryUpstreamHint: '使用上游固定重试(固定到错误的账号)',
pinnedAccountIdHint: '(自动从错误日志获取)',
retryNote1: '重试会使用相同的请求体和参数',
retryNote2: '如果原请求失败是因为账号问题,固定重试可能仍会失败',
retryNote3: '客户端重试会重新选择账号',
retryNote4: '对不可重试的错误可以强制重试,但不推荐',
confirmRetryMessage: '确认要重试该请求吗?',
confirmRetryHint: '将使用相同的请求参数重新发送',
forceRetry: '我已确认并理解强制重试风险',
forceRetryHint: '此错误类型通常不可通过重试解决;如仍需重试请勾选确认',
forceRetryNeedAck: '请先勾选确认再强制重试',
markResolved: '标记已解决',
markUnresolved: '标记未解决',
viewRetries: '重试历史',
retryHistory: '重试历史',
tabOverview: '概览',
tabRetries: '重试历史',
tabRequest: '请求详情',
tabResponse: '响应详情',
responseBody: '响应详情',
compareA: '对比 A',
compareB: '对比 B',
retrySummary: '重试摘要',
responseHintSucceeded: '展示重试成功的 response_preview#{id}',
responseHintFallback: '没有成功的重试结果,展示存储的 error_body',
suggestion: '处理建议',
suggestUpstreamResolved: '✓ 上游错误已通过重试解决,无需人工介入',
suggestUpstream: '⚠️ 上游服务不稳定,建议:检查上游账号状态 / 考虑切换账号 / 再次重试',
suggestUpstream: '⚠️ 上游服务不稳定,建议:检查上游账号状态 / 考虑切换账号',
suggestRequest: '⚠️ 客户端请求错误,建议:联系客户修正请求参数 / 手动标记已解决',
suggestAuth: '⚠️ 认证失败,建议:检查 API Key 是否有效 / 联系客户更新凭证',
suggestPlatform: '🚨 平台错误,建议立即排查修复',

View File

@ -14,8 +14,6 @@ function makeDetail(overrides: Partial<OpsErrorDetail>): OpsErrorDetail {
status_code: 502,
platform: 'openai',
model: 'gpt-4o-mini',
is_retryable: true,
retry_count: 0,
resolved: false,
client_request_id: 'crid-1',
request_id: 'rid-1',
@ -25,8 +23,6 @@ function makeDetail(overrides: Partial<OpsErrorDetail>): OpsErrorDetail {
group_name: 'group',
error_body: '',
user_agent: '',
request_body: '',
request_body_truncated: false,
is_business_limited: false,
...overrides
}