Remove ops retry replay storage
This commit is contained in:
parent
8927ab091e
commit
2eb622f2f6
@ -1,9 +1,7 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -384,79 +382,6 @@ func (h *OpsHandler) ListRequestErrorUpstreamErrors(c *gin.Context) {
|
||||
response.Paginated(c, result.Errors, int64(result.Total), result.Page, result.PageSize)
|
||||
}
|
||||
|
||||
// RetryRequestErrorClient retries the client request based on stored request body.
|
||||
// POST /api/v1/admin/ops/request-errors/:id/retry-client
|
||||
func (h *OpsHandler) RetryRequestErrorClient(c *gin.Context) {
|
||||
if h.opsService == nil {
|
||||
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
|
||||
return
|
||||
}
|
||||
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
subject, ok := middleware.GetAuthSubjectFromContext(c)
|
||||
if !ok || subject.UserID <= 0 {
|
||||
response.Error(c, http.StatusUnauthorized, "Unauthorized")
|
||||
return
|
||||
}
|
||||
|
||||
idStr := strings.TrimSpace(c.Param("id"))
|
||||
id, err := strconv.ParseInt(idStr, 10, 64)
|
||||
if err != nil || id <= 0 {
|
||||
response.BadRequest(c, "Invalid error id")
|
||||
return
|
||||
}
|
||||
|
||||
result, err := h.opsService.RetryError(c.Request.Context(), subject.UserID, id, service.OpsRetryModeClient, nil)
|
||||
if err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
response.Success(c, result)
|
||||
}
|
||||
|
||||
// RetryRequestErrorUpstreamEvent retries a specific upstream attempt using captured upstream_request_body.
|
||||
// POST /api/v1/admin/ops/request-errors/:id/upstream-errors/:idx/retry
|
||||
func (h *OpsHandler) RetryRequestErrorUpstreamEvent(c *gin.Context) {
|
||||
if h.opsService == nil {
|
||||
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
|
||||
return
|
||||
}
|
||||
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
subject, ok := middleware.GetAuthSubjectFromContext(c)
|
||||
if !ok || subject.UserID <= 0 {
|
||||
response.Error(c, http.StatusUnauthorized, "Unauthorized")
|
||||
return
|
||||
}
|
||||
|
||||
idStr := strings.TrimSpace(c.Param("id"))
|
||||
id, err := strconv.ParseInt(idStr, 10, 64)
|
||||
if err != nil || id <= 0 {
|
||||
response.BadRequest(c, "Invalid error id")
|
||||
return
|
||||
}
|
||||
|
||||
idxStr := strings.TrimSpace(c.Param("idx"))
|
||||
idx, err := strconv.Atoi(idxStr)
|
||||
if err != nil || idx < 0 {
|
||||
response.BadRequest(c, "Invalid upstream idx")
|
||||
return
|
||||
}
|
||||
|
||||
result, err := h.opsService.RetryUpstreamEvent(c.Request.Context(), subject.UserID, id, idx)
|
||||
if err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
response.Success(c, result)
|
||||
}
|
||||
|
||||
// ResolveRequestError toggles resolved status.
|
||||
// PUT /api/v1/admin/ops/request-errors/:id/resolve
|
||||
func (h *OpsHandler) ResolveRequestError(c *gin.Context) {
|
||||
@ -564,39 +489,6 @@ func (h *OpsHandler) GetUpstreamError(c *gin.Context) {
|
||||
h.GetErrorLogByID(c)
|
||||
}
|
||||
|
||||
// RetryUpstreamError retries upstream error using the original account_id.
|
||||
// POST /api/v1/admin/ops/upstream-errors/:id/retry
|
||||
func (h *OpsHandler) RetryUpstreamError(c *gin.Context) {
|
||||
if h.opsService == nil {
|
||||
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
|
||||
return
|
||||
}
|
||||
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
subject, ok := middleware.GetAuthSubjectFromContext(c)
|
||||
if !ok || subject.UserID <= 0 {
|
||||
response.Error(c, http.StatusUnauthorized, "Unauthorized")
|
||||
return
|
||||
}
|
||||
|
||||
idStr := strings.TrimSpace(c.Param("id"))
|
||||
id, err := strconv.ParseInt(idStr, 10, 64)
|
||||
if err != nil || id <= 0 {
|
||||
response.BadRequest(c, "Invalid error id")
|
||||
return
|
||||
}
|
||||
|
||||
result, err := h.opsService.RetryError(c.Request.Context(), subject.UserID, id, service.OpsRetryModeUpstream, nil)
|
||||
if err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
response.Success(c, result)
|
||||
}
|
||||
|
||||
// ResolveUpstreamError toggles resolved status.
|
||||
// PUT /api/v1/admin/ops/upstream-errors/:id/resolve
|
||||
func (h *OpsHandler) ResolveUpstreamError(c *gin.Context) {
|
||||
@ -706,106 +598,10 @@ func (h *OpsHandler) ListRequestDetails(c *gin.Context) {
|
||||
response.Paginated(c, out.Items, out.Total, out.Page, out.PageSize)
|
||||
}
|
||||
|
||||
type opsRetryRequest struct {
|
||||
Mode string `json:"mode"`
|
||||
PinnedAccountID *int64 `json:"pinned_account_id"`
|
||||
Force bool `json:"force"`
|
||||
}
|
||||
|
||||
type opsResolveRequest struct {
|
||||
Resolved bool `json:"resolved"`
|
||||
}
|
||||
|
||||
// RetryErrorRequest retries a failed request using stored request_body.
|
||||
// POST /api/v1/admin/ops/errors/:id/retry
|
||||
func (h *OpsHandler) RetryErrorRequest(c *gin.Context) {
|
||||
if h.opsService == nil {
|
||||
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
|
||||
return
|
||||
}
|
||||
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
subject, ok := middleware.GetAuthSubjectFromContext(c)
|
||||
if !ok || subject.UserID <= 0 {
|
||||
response.Error(c, http.StatusUnauthorized, "Unauthorized")
|
||||
return
|
||||
}
|
||||
|
||||
idStr := strings.TrimSpace(c.Param("id"))
|
||||
id, err := strconv.ParseInt(idStr, 10, 64)
|
||||
if err != nil || id <= 0 {
|
||||
response.BadRequest(c, "Invalid error id")
|
||||
return
|
||||
}
|
||||
|
||||
req := opsRetryRequest{Mode: service.OpsRetryModeClient}
|
||||
if err := c.ShouldBindJSON(&req); err != nil && !errors.Is(err, io.EOF) {
|
||||
response.BadRequest(c, "Invalid request: "+err.Error())
|
||||
return
|
||||
}
|
||||
if strings.TrimSpace(req.Mode) == "" {
|
||||
req.Mode = service.OpsRetryModeClient
|
||||
}
|
||||
|
||||
// Force flag is currently a UI-level acknowledgement. Server may still enforce safety constraints.
|
||||
_ = req.Force
|
||||
|
||||
// Legacy endpoint safety: only allow retrying the client request here.
|
||||
// Upstream retries must go through the split endpoints.
|
||||
if strings.EqualFold(strings.TrimSpace(req.Mode), service.OpsRetryModeUpstream) {
|
||||
response.BadRequest(c, "upstream retry is not supported on this endpoint")
|
||||
return
|
||||
}
|
||||
|
||||
result, err := h.opsService.RetryError(c.Request.Context(), subject.UserID, id, req.Mode, req.PinnedAccountID)
|
||||
if err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
response.Success(c, result)
|
||||
}
|
||||
|
||||
// ListRetryAttempts lists retry attempts for an error log.
|
||||
// GET /api/v1/admin/ops/errors/:id/retries
|
||||
func (h *OpsHandler) ListRetryAttempts(c *gin.Context) {
|
||||
if h.opsService == nil {
|
||||
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
|
||||
return
|
||||
}
|
||||
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
idStr := strings.TrimSpace(c.Param("id"))
|
||||
id, err := strconv.ParseInt(idStr, 10, 64)
|
||||
if err != nil || id <= 0 {
|
||||
response.BadRequest(c, "Invalid error id")
|
||||
return
|
||||
}
|
||||
|
||||
limit := 50
|
||||
if v := strings.TrimSpace(c.Query("limit")); v != "" {
|
||||
n, err := strconv.Atoi(v)
|
||||
if err != nil || n <= 0 {
|
||||
response.BadRequest(c, "Invalid limit")
|
||||
return
|
||||
}
|
||||
limit = n
|
||||
}
|
||||
|
||||
items, err := h.opsService.ListRetryAttemptsByErrorID(c.Request.Context(), id, limit)
|
||||
if err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
response.Success(c, items)
|
||||
}
|
||||
|
||||
// UpdateErrorResolution allows manual resolve/unresolve.
|
||||
// PUT /api/v1/admin/ops/errors/:id/resolve
|
||||
func (h *OpsHandler) UpdateErrorResolution(c *gin.Context) {
|
||||
@ -837,7 +633,7 @@ func (h *OpsHandler) UpdateErrorResolution(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
uid := subject.UserID
|
||||
if err := h.opsService.UpdateErrorResolution(c.Request.Context(), id, req.Resolved, &uid, nil); err != nil {
|
||||
if err := h.opsService.UpdateErrorResolution(c.Request.Context(), id, req.Resolved, &uid); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
@ -151,7 +151,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
setOpsRequestContext(c, "", false, body)
|
||||
setOpsRequestContext(c, "", false)
|
||||
|
||||
parsedReq, err := service.ParseGatewayRequest(body, domain.PlatformAnthropic)
|
||||
if err != nil {
|
||||
@ -184,7 +184,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
// 在请求上下文中记录 thinking 状态,供 Antigravity 最终模型 key 推导/模型维度限流使用
|
||||
c.Request = c.Request.WithContext(service.WithThinkingEnabled(c.Request.Context(), parsedReq.ThinkingEnabled, h.metadataBridgeEnabled()))
|
||||
|
||||
setOpsRequestContext(c, reqModel, reqStream, body)
|
||||
setOpsRequestContext(c, reqModel, reqStream)
|
||||
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false)))
|
||||
|
||||
// 验证 model 必填
|
||||
@ -1512,7 +1512,7 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
setOpsRequestContext(c, "", false, body)
|
||||
setOpsRequestContext(c, "", false)
|
||||
|
||||
parsedReq, err := service.ParseGatewayRequest(body, domain.PlatformAnthropic)
|
||||
if err != nil {
|
||||
@ -1531,7 +1531,7 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
setOpsRequestContext(c, parsedReq.Model, parsedReq.Stream, body)
|
||||
setOpsRequestContext(c, parsedReq.Model, parsedReq.Stream)
|
||||
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(parsedReq.Stream, false)))
|
||||
|
||||
// 获取订阅信息(可能为nil)
|
||||
|
||||
@ -60,7 +60,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
setOpsRequestContext(c, "", false, body)
|
||||
setOpsRequestContext(c, "", false)
|
||||
|
||||
// Validate JSON
|
||||
if !gjson.ValidBytes(body) {
|
||||
@ -78,7 +78,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) {
|
||||
reqStream := gjson.GetBytes(body, "stream").Bool()
|
||||
reqLog = reqLog.With(zap.String("model", reqModel), zap.Bool("stream", reqStream))
|
||||
|
||||
setOpsRequestContext(c, reqModel, reqStream, body)
|
||||
setOpsRequestContext(c, reqModel, reqStream)
|
||||
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false)))
|
||||
|
||||
// 解析渠道级模型映射
|
||||
|
||||
@ -60,7 +60,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
setOpsRequestContext(c, "", false, body)
|
||||
setOpsRequestContext(c, "", false)
|
||||
|
||||
// Validate JSON
|
||||
if !gjson.ValidBytes(body) {
|
||||
@ -78,7 +78,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) {
|
||||
reqStream := gjson.GetBytes(body, "stream").Bool()
|
||||
reqLog = reqLog.With(zap.String("model", reqModel), zap.Bool("stream", reqStream))
|
||||
|
||||
setOpsRequestContext(c, reqModel, reqStream, body)
|
||||
setOpsRequestContext(c, reqModel, reqStream)
|
||||
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false)))
|
||||
|
||||
// 解析渠道级模型映射
|
||||
|
||||
@ -184,7 +184,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
setOpsRequestContext(c, modelName, stream, body)
|
||||
setOpsRequestContext(c, modelName, stream)
|
||||
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(stream, false)))
|
||||
|
||||
if decision := h.checkContentModeration(c, reqLog, apiKey, authSubject, service.ContentModerationProtocolGemini, modelName, body); decision != nil && decision.Blocked {
|
||||
|
||||
@ -78,7 +78,7 @@ func (h *OpenAIGatewayHandler) ChatCompletions(c *gin.Context) {
|
||||
|
||||
reqLog = reqLog.With(zap.String("model", reqModel), zap.Bool("stream", reqStream))
|
||||
|
||||
setOpsRequestContext(c, reqModel, reqStream, body)
|
||||
setOpsRequestContext(c, reqModel, reqStream)
|
||||
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false)))
|
||||
|
||||
if decision := h.checkContentModeration(c, reqLog, apiKey, subject, service.ContentModerationProtocolOpenAIChat, reqModel, body); decision != nil && decision.Blocked {
|
||||
|
||||
@ -130,7 +130,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
setOpsRequestContext(c, "", false, body)
|
||||
setOpsRequestContext(c, "", false)
|
||||
sessionHashBody := body
|
||||
if service.IsOpenAIResponsesCompactPathForTest(c) {
|
||||
if compactSeed := strings.TrimSpace(gjson.GetBytes(body, "prompt_cache_key").String()); compactSeed != "" {
|
||||
@ -189,7 +189,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
setOpsRequestContext(c, reqModel, reqStream, body)
|
||||
setOpsRequestContext(c, reqModel, reqStream)
|
||||
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false)))
|
||||
|
||||
if decision := h.checkContentModeration(c, reqLog, apiKey, subject, service.ContentModerationProtocolOpenAIResponses, reqModel, body); decision != nil && decision.Blocked {
|
||||
@ -611,7 +611,7 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) {
|
||||
|
||||
reqLog = reqLog.With(zap.String("model", reqModel), zap.Bool("stream", reqStream))
|
||||
|
||||
setOpsRequestContext(c, reqModel, reqStream, body)
|
||||
setOpsRequestContext(c, reqModel, reqStream)
|
||||
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(reqStream, false)))
|
||||
|
||||
if decision := h.checkContentModeration(c, reqLog, apiKey, subject, service.ContentModerationProtocolAnthropicMessages, reqModel, body); decision != nil && decision.Blocked {
|
||||
@ -1174,7 +1174,7 @@ func (h *OpenAIGatewayHandler) ResponsesWebSocket(c *gin.Context) {
|
||||
zap.Bool("has_previous_response_id", previousResponseID != ""),
|
||||
zap.String("previous_response_id_kind", previousResponseIDKind),
|
||||
)
|
||||
setOpsRequestContext(c, reqModel, true, firstMessage)
|
||||
setOpsRequestContext(c, reqModel, true)
|
||||
setOpsEndpointContext(c, "", int16(service.RequestTypeWSV2))
|
||||
|
||||
if decision := h.checkContentModeration(c, reqLog, apiKey, subject, service.ContentModerationProtocolOpenAIResponses, reqModel, firstMessage); decision != nil && decision.Blocked {
|
||||
|
||||
@ -63,9 +63,9 @@ func (h *OpenAIGatewayHandler) Images(c *gin.Context) {
|
||||
}
|
||||
|
||||
if isMultipartImagesContentType(c.GetHeader("Content-Type")) {
|
||||
setOpsRequestContext(c, "", false, nil)
|
||||
setOpsRequestContext(c, "", false)
|
||||
} else {
|
||||
setOpsRequestContext(c, "", false, body)
|
||||
setOpsRequestContext(c, "", false)
|
||||
}
|
||||
|
||||
parsed, err := h.gatewayService.ParseOpenAIImagesRequest(c, body)
|
||||
@ -98,9 +98,9 @@ func (h *OpenAIGatewayHandler) Images(c *gin.Context) {
|
||||
}
|
||||
|
||||
if parsed.Multipart {
|
||||
setOpsRequestContext(c, parsed.Model, parsed.Stream, nil)
|
||||
setOpsRequestContext(c, parsed.Model, parsed.Stream)
|
||||
} else {
|
||||
setOpsRequestContext(c, parsed.Model, parsed.Stream, body)
|
||||
setOpsRequestContext(c, parsed.Model, parsed.Stream)
|
||||
}
|
||||
setOpsEndpointContext(c, "", int16(service.RequestTypeFromLegacy(parsed.Stream, false)))
|
||||
|
||||
|
||||
@ -25,7 +25,6 @@ import (
|
||||
const (
|
||||
opsModelKey = "ops_model"
|
||||
opsStreamKey = "ops_stream"
|
||||
opsRequestBodyKey = "ops_request_body"
|
||||
opsAccountIDKey = "ops_account_id"
|
||||
opsRoutingCapacityLimitedKey = "ops_routing_capacity_limited"
|
||||
|
||||
@ -336,16 +335,13 @@ func opsErrorLogConfig() (workerCount int, queueSize int) {
|
||||
return workerCount, queueSize
|
||||
}
|
||||
|
||||
func setOpsRequestContext(c *gin.Context, model string, stream bool, requestBody []byte) {
|
||||
func setOpsRequestContext(c *gin.Context, model string, stream bool) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
model = strings.TrimSpace(model)
|
||||
c.Set(opsModelKey, model)
|
||||
c.Set(opsStreamKey, stream)
|
||||
if len(requestBody) > 0 {
|
||||
c.Set(opsRequestBodyKey, requestBody)
|
||||
}
|
||||
if c.Request != nil && model != "" {
|
||||
ctx := context.WithValue(c.Request.Context(), ctxkey.Model, model)
|
||||
c.Request = c.Request.WithContext(ctx)
|
||||
@ -364,22 +360,6 @@ func setOpsEndpointContext(c *gin.Context, upstreamModel string, requestType int
|
||||
c.Set(opsRequestTypeKey, requestType)
|
||||
}
|
||||
|
||||
func attachOpsRequestBodyToEntry(c *gin.Context, entry *service.OpsInsertErrorLogInput) {
|
||||
if c == nil || entry == nil {
|
||||
return
|
||||
}
|
||||
v, ok := c.Get(opsRequestBodyKey)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
raw, ok := v.([]byte)
|
||||
if !ok || len(raw) == 0 {
|
||||
return
|
||||
}
|
||||
entry.RequestBodyJSON, entry.RequestBodyTruncated, entry.RequestBodyBytes = service.PrepareOpsRequestBodyForQueue(raw)
|
||||
opsErrorLogSanitized.Add(1)
|
||||
}
|
||||
|
||||
func setOpsSelectedAccount(c *gin.Context, accountID int64, platform ...string) {
|
||||
if c == nil || accountID <= 0 {
|
||||
return
|
||||
@ -711,7 +691,7 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
|
||||
|
||||
ErrorPhase: "upstream",
|
||||
ErrorType: "upstream_error",
|
||||
// Severity/retryability should reflect the upstream failure, not the final client status (200).
|
||||
// Severity should reflect the upstream failure, not the final client status (200).
|
||||
Severity: classifyOpsSeverity("upstream_error", effectiveUpstreamStatus),
|
||||
StatusCode: status,
|
||||
IsBusinessLimited: false,
|
||||
@ -728,9 +708,7 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
|
||||
UpstreamErrorDetail: upstreamErrorDetail,
|
||||
UpstreamErrors: events,
|
||||
|
||||
IsRetryable: classifyOpsIsRetryable("upstream_error", effectiveUpstreamStatus),
|
||||
RetryCount: 0,
|
||||
CreatedAt: time.Now(),
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
applyOpsLatencyFieldsFromContext(c, entry)
|
||||
|
||||
@ -754,10 +732,6 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
|
||||
entry.ClientIP = &clientIP
|
||||
}
|
||||
|
||||
// Store request headers/body only when an upstream error occurred to keep overhead minimal.
|
||||
entry.RequestHeadersJSON = extractOpsRetryRequestHeaders(c)
|
||||
attachOpsRequestBodyToEntry(c, entry)
|
||||
|
||||
// Skip logging if a passthrough rule with skip_monitoring=true matched.
|
||||
if v, ok := c.Get(service.OpsSkipPassthroughKey); ok {
|
||||
if skip, _ := v.(bool); skip {
|
||||
@ -870,9 +844,7 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
|
||||
ErrorSource: errorSource,
|
||||
ErrorOwner: errorOwner,
|
||||
|
||||
IsRetryable: classifyOpsIsRetryable(normalizedType, status),
|
||||
RetryCount: 0,
|
||||
CreatedAt: time.Now(),
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
applyOpsLatencyFieldsFromContext(c, entry)
|
||||
|
||||
@ -950,20 +922,10 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
|
||||
entry.ClientIP = &clientIP
|
||||
}
|
||||
|
||||
// Persist only a minimal, whitelisted set of request headers to improve retry fidelity.
|
||||
// Do NOT store Authorization/Cookie/etc.
|
||||
entry.RequestHeadersJSON = extractOpsRetryRequestHeaders(c)
|
||||
attachOpsRequestBodyToEntry(c, entry)
|
||||
|
||||
enqueueOpsErrorLog(ops, entry)
|
||||
}
|
||||
}
|
||||
|
||||
var opsRetryRequestHeaderAllowlist = []string{
|
||||
"anthropic-beta",
|
||||
"anthropic-version",
|
||||
}
|
||||
|
||||
// isCountTokensRequest checks if the request is a count_tokens request
|
||||
func isCountTokensRequest(c *gin.Context) bool {
|
||||
if c == nil || c.Request == nil || c.Request.URL == nil {
|
||||
@ -972,32 +934,6 @@ func isCountTokensRequest(c *gin.Context) bool {
|
||||
return strings.Contains(c.Request.URL.Path, "/count_tokens")
|
||||
}
|
||||
|
||||
func extractOpsRetryRequestHeaders(c *gin.Context) *string {
|
||||
if c == nil || c.Request == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
headers := make(map[string]string, 4)
|
||||
for _, key := range opsRetryRequestHeaderAllowlist {
|
||||
v := strings.TrimSpace(c.GetHeader(key))
|
||||
if v == "" {
|
||||
continue
|
||||
}
|
||||
// Keep headers small even if a client sends something unexpected.
|
||||
headers[key] = truncateString(v, 512)
|
||||
}
|
||||
if len(headers) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
raw, err := json.Marshal(headers)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
s := string(raw)
|
||||
return &s
|
||||
}
|
||||
|
||||
func applyOpsLatencyFieldsFromContext(c *gin.Context, entry *service.OpsInsertErrorLogInput) {
|
||||
if c == nil || entry == nil {
|
||||
return
|
||||
@ -1199,24 +1135,6 @@ func classifyOpsSeverity(errType string, status int) string {
|
||||
return "P3"
|
||||
}
|
||||
|
||||
func classifyOpsIsRetryable(errType string, statusCode int) bool {
|
||||
switch errType {
|
||||
case "authentication_error", "invalid_request_error":
|
||||
return false
|
||||
case "timeout_error":
|
||||
return true
|
||||
case "rate_limit_error":
|
||||
// May be transient (upstream or queue); retry can help.
|
||||
return true
|
||||
case "billing_error", "subscription_error":
|
||||
return false
|
||||
case "upstream_error", "overloaded_error":
|
||||
return statusCode >= 500 || statusCode == 429 || statusCode == 529
|
||||
default:
|
||||
return statusCode >= 500
|
||||
}
|
||||
}
|
||||
|
||||
func classifyOpsErrorLog(c *gin.Context, errType, message, code string, status int) (phase string, isBusinessLimited bool, errorOwner string, errorSource string) {
|
||||
phase = classifyOpsPhase(errType, message, code)
|
||||
routingCapacityLimited := isOpsRoutingCapacityLimited(c)
|
||||
|
||||
@ -44,49 +44,6 @@ func resetOpsErrorLoggerStateForTest(t *testing.T) {
|
||||
opsErrorLogDrained.Store(false)
|
||||
}
|
||||
|
||||
func TestAttachOpsRequestBodyToEntry_SanitizeAndTrim(t *testing.T) {
|
||||
resetOpsErrorLoggerStateForTest(t)
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil)
|
||||
|
||||
raw := []byte(`{"access_token":"secret-token","messages":[{"role":"user","content":"hello"}]}`)
|
||||
setOpsRequestContext(c, "claude-3", false, raw)
|
||||
|
||||
entry := &service.OpsInsertErrorLogInput{}
|
||||
attachOpsRequestBodyToEntry(c, entry)
|
||||
|
||||
require.NotNil(t, entry.RequestBodyBytes)
|
||||
require.Equal(t, len(raw), *entry.RequestBodyBytes)
|
||||
require.NotNil(t, entry.RequestBodyJSON)
|
||||
require.NotContains(t, *entry.RequestBodyJSON, "secret-token")
|
||||
require.Contains(t, *entry.RequestBodyJSON, "[REDACTED]")
|
||||
require.Equal(t, int64(1), OpsErrorLogSanitizedTotal())
|
||||
}
|
||||
|
||||
func TestAttachOpsRequestBodyToEntry_InvalidJSONKeepsSize(t *testing.T) {
|
||||
resetOpsErrorLoggerStateForTest(t)
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil)
|
||||
|
||||
raw := []byte("not-json")
|
||||
setOpsRequestContext(c, "claude-3", false, raw)
|
||||
|
||||
entry := &service.OpsInsertErrorLogInput{}
|
||||
attachOpsRequestBodyToEntry(c, entry)
|
||||
|
||||
require.Nil(t, entry.RequestBodyJSON)
|
||||
require.NotNil(t, entry.RequestBodyBytes)
|
||||
require.Equal(t, len(raw), *entry.RequestBodyBytes)
|
||||
require.False(t, entry.RequestBodyTruncated)
|
||||
require.Equal(t, int64(1), OpsErrorLogSanitizedTotal())
|
||||
}
|
||||
|
||||
func TestEnqueueOpsErrorLog_QueueFullDrop(t *testing.T) {
|
||||
resetOpsErrorLoggerStateForTest(t)
|
||||
|
||||
@ -108,39 +65,6 @@ func TestEnqueueOpsErrorLog_QueueFullDrop(t *testing.T) {
|
||||
require.Equal(t, int64(1), OpsErrorLogQueueLength())
|
||||
}
|
||||
|
||||
func TestAttachOpsRequestBodyToEntry_EarlyReturnBranches(t *testing.T) {
|
||||
resetOpsErrorLoggerStateForTest(t)
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
entry := &service.OpsInsertErrorLogInput{}
|
||||
attachOpsRequestBodyToEntry(nil, entry)
|
||||
attachOpsRequestBodyToEntry(&gin.Context{}, nil)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil)
|
||||
|
||||
// 无请求体 key
|
||||
attachOpsRequestBodyToEntry(c, entry)
|
||||
require.Nil(t, entry.RequestBodyJSON)
|
||||
require.Nil(t, entry.RequestBodyBytes)
|
||||
require.False(t, entry.RequestBodyTruncated)
|
||||
|
||||
// 错误类型
|
||||
c.Set(opsRequestBodyKey, "not-bytes")
|
||||
attachOpsRequestBodyToEntry(c, entry)
|
||||
require.Nil(t, entry.RequestBodyJSON)
|
||||
require.Nil(t, entry.RequestBodyBytes)
|
||||
|
||||
// 空 bytes
|
||||
c.Set(opsRequestBodyKey, []byte{})
|
||||
attachOpsRequestBodyToEntry(c, entry)
|
||||
require.Nil(t, entry.RequestBodyJSON)
|
||||
require.Nil(t, entry.RequestBodyBytes)
|
||||
|
||||
require.Equal(t, int64(0), OpsErrorLogSanitizedTotal())
|
||||
}
|
||||
|
||||
func TestEnqueueOpsErrorLog_EarlyReturnBranches(t *testing.T) {
|
||||
resetOpsErrorLoggerStateForTest(t)
|
||||
|
||||
|
||||
@ -54,15 +54,9 @@ INSERT INTO ops_error_logs (
|
||||
upstream_latency_ms,
|
||||
response_latency_ms,
|
||||
time_to_first_token_ms,
|
||||
request_body,
|
||||
request_body_truncated,
|
||||
request_body_bytes,
|
||||
request_headers,
|
||||
is_retryable,
|
||||
retry_count,
|
||||
created_at
|
||||
) VALUES (
|
||||
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35,$36,$37,$38,$39,$40,$41,$42,$43
|
||||
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35,$36,$37
|
||||
)`
|
||||
|
||||
func NewOpsRepository(db *sql.DB) service.OpsRepository {
|
||||
@ -170,12 +164,6 @@ func opsInsertErrorLogArgs(input *service.OpsInsertErrorLogInput) []any {
|
||||
opsNullInt64(input.UpstreamLatencyMs),
|
||||
opsNullInt64(input.ResponseLatencyMs),
|
||||
opsNullInt64(input.TimeToFirstTokenMs),
|
||||
opsNullString(input.RequestBodyJSON),
|
||||
input.RequestBodyTruncated,
|
||||
opsNullInt(input.RequestBodyBytes),
|
||||
opsNullString(input.RequestHeadersJSON),
|
||||
input.IsRetryable,
|
||||
input.RetryCount,
|
||||
input.CreatedAt,
|
||||
}
|
||||
}
|
||||
@ -222,13 +210,10 @@ SELECT
|
||||
COALESCE(e.upstream_status_code, e.status_code, 0),
|
||||
COALESCE(e.platform, ''),
|
||||
COALESCE(e.model, ''),
|
||||
COALESCE(e.is_retryable, false),
|
||||
COALESCE(e.retry_count, 0),
|
||||
COALESCE(e.resolved, false),
|
||||
e.resolved_at,
|
||||
e.resolved_by_user_id,
|
||||
COALESCE(u2.email, ''),
|
||||
e.resolved_retry_id,
|
||||
COALESCE(e.client_request_id, ''),
|
||||
COALESCE(e.request_id, ''),
|
||||
COALESCE(e.error_message, ''),
|
||||
@ -277,7 +262,6 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
|
||||
var resolvedAt sql.NullTime
|
||||
var resolvedBy sql.NullInt64
|
||||
var resolvedByName string
|
||||
var resolvedRetryID sql.NullInt64
|
||||
var requestType sql.NullInt64
|
||||
if err := rows.Scan(
|
||||
&item.ID,
|
||||
@ -290,13 +274,10 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
|
||||
&statusCode,
|
||||
&item.Platform,
|
||||
&item.Model,
|
||||
&item.IsRetryable,
|
||||
&item.RetryCount,
|
||||
&item.Resolved,
|
||||
&resolvedAt,
|
||||
&resolvedBy,
|
||||
&resolvedByName,
|
||||
&resolvedRetryID,
|
||||
&item.ClientRequestID,
|
||||
&item.RequestID,
|
||||
&item.Message,
|
||||
@ -327,10 +308,6 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
|
||||
item.ResolvedByUserID = &v
|
||||
}
|
||||
item.ResolvedByUserName = resolvedByName
|
||||
if resolvedRetryID.Valid {
|
||||
v := resolvedRetryID.Int64
|
||||
item.ResolvedRetryID = &v
|
||||
}
|
||||
item.StatusCode = int(statusCode.Int64)
|
||||
if clientIP.Valid {
|
||||
s := clientIP.String
|
||||
@ -393,12 +370,9 @@ SELECT
|
||||
COALESCE(e.upstream_status_code, e.status_code, 0),
|
||||
COALESCE(e.platform, ''),
|
||||
COALESCE(e.model, ''),
|
||||
COALESCE(e.is_retryable, false),
|
||||
COALESCE(e.retry_count, 0),
|
||||
COALESCE(e.resolved, false),
|
||||
e.resolved_at,
|
||||
e.resolved_by_user_id,
|
||||
e.resolved_retry_id,
|
||||
COALESCE(e.client_request_id, ''),
|
||||
COALESCE(e.request_id, ''),
|
||||
COALESCE(e.error_message, ''),
|
||||
@ -428,11 +402,7 @@ SELECT
|
||||
e.routing_latency_ms,
|
||||
e.upstream_latency_ms,
|
||||
e.response_latency_ms,
|
||||
e.time_to_first_token_ms,
|
||||
COALESCE(e.request_body::text, ''),
|
||||
e.request_body_truncated,
|
||||
e.request_body_bytes,
|
||||
COALESCE(e.request_headers::text, '')
|
||||
e.time_to_first_token_ms
|
||||
FROM ops_error_logs e
|
||||
LEFT JOIN users u ON e.user_id = u.id
|
||||
LEFT JOIN accounts a ON e.account_id = a.id
|
||||
@ -445,7 +415,6 @@ LIMIT 1`
|
||||
var upstreamStatusCode sql.NullInt64
|
||||
var resolvedAt sql.NullTime
|
||||
var resolvedBy sql.NullInt64
|
||||
var resolvedRetryID sql.NullInt64
|
||||
var clientIP sql.NullString
|
||||
var userID sql.NullInt64
|
||||
var apiKeyID sql.NullInt64
|
||||
@ -456,7 +425,6 @@ LIMIT 1`
|
||||
var upstreamLatency sql.NullInt64
|
||||
var responseLatency sql.NullInt64
|
||||
var ttft sql.NullInt64
|
||||
var requestBodyBytes sql.NullInt64
|
||||
var requestType sql.NullInt64
|
||||
|
||||
err := r.db.QueryRowContext(ctx, q, id).Scan(
|
||||
@ -470,12 +438,9 @@ LIMIT 1`
|
||||
&statusCode,
|
||||
&out.Platform,
|
||||
&out.Model,
|
||||
&out.IsRetryable,
|
||||
&out.RetryCount,
|
||||
&out.Resolved,
|
||||
&resolvedAt,
|
||||
&resolvedBy,
|
||||
&resolvedRetryID,
|
||||
&out.ClientRequestID,
|
||||
&out.RequestID,
|
||||
&out.Message,
|
||||
@ -506,10 +471,6 @@ LIMIT 1`
|
||||
&upstreamLatency,
|
||||
&responseLatency,
|
||||
&ttft,
|
||||
&out.RequestBody,
|
||||
&out.RequestBodyTruncated,
|
||||
&requestBodyBytes,
|
||||
&out.RequestHeaders,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -524,10 +485,6 @@ LIMIT 1`
|
||||
v := resolvedBy.Int64
|
||||
out.ResolvedByUserID = &v
|
||||
}
|
||||
if resolvedRetryID.Valid {
|
||||
v := resolvedRetryID.Int64
|
||||
out.ResolvedRetryID = &v
|
||||
}
|
||||
if clientIP.Valid {
|
||||
s := clientIP.String
|
||||
out.ClientIP = &s
|
||||
@ -572,25 +529,11 @@ LIMIT 1`
|
||||
v := ttft.Int64
|
||||
out.TimeToFirstTokenMs = &v
|
||||
}
|
||||
if requestBodyBytes.Valid {
|
||||
v := int(requestBodyBytes.Int64)
|
||||
out.RequestBodyBytes = &v
|
||||
}
|
||||
if requestType.Valid {
|
||||
v := int16(requestType.Int64)
|
||||
out.RequestType = &v
|
||||
}
|
||||
|
||||
// Normalize request_body to empty string when stored as JSON null.
|
||||
out.RequestBody = strings.TrimSpace(out.RequestBody)
|
||||
if out.RequestBody == "null" {
|
||||
out.RequestBody = ""
|
||||
}
|
||||
// Normalize request_headers to empty string when stored as JSON null.
|
||||
out.RequestHeaders = strings.TrimSpace(out.RequestHeaders)
|
||||
if out.RequestHeaders == "null" {
|
||||
out.RequestHeaders = ""
|
||||
}
|
||||
// Normalize upstream_errors to empty string when stored as JSON null.
|
||||
out.UpstreamErrors = strings.TrimSpace(out.UpstreamErrors)
|
||||
if out.UpstreamErrors == "null" {
|
||||
@ -600,398 +543,7 @@ LIMIT 1`
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
func (r *opsRepository) InsertRetryAttempt(ctx context.Context, input *service.OpsInsertRetryAttemptInput) (int64, error) {
|
||||
if r == nil || r.db == nil {
|
||||
return 0, fmt.Errorf("nil ops repository")
|
||||
}
|
||||
if input == nil {
|
||||
return 0, fmt.Errorf("nil input")
|
||||
}
|
||||
if input.SourceErrorID <= 0 {
|
||||
return 0, fmt.Errorf("invalid source_error_id")
|
||||
}
|
||||
if strings.TrimSpace(input.Mode) == "" {
|
||||
return 0, fmt.Errorf("invalid mode")
|
||||
}
|
||||
|
||||
q := `
|
||||
INSERT INTO ops_retry_attempts (
|
||||
requested_by_user_id,
|
||||
source_error_id,
|
||||
mode,
|
||||
pinned_account_id,
|
||||
status,
|
||||
started_at
|
||||
) VALUES (
|
||||
$1,$2,$3,$4,$5,$6
|
||||
) RETURNING id`
|
||||
|
||||
var id int64
|
||||
err := r.db.QueryRowContext(
|
||||
ctx,
|
||||
q,
|
||||
opsNullInt64(&input.RequestedByUserID),
|
||||
input.SourceErrorID,
|
||||
strings.TrimSpace(input.Mode),
|
||||
opsNullInt64(input.PinnedAccountID),
|
||||
strings.TrimSpace(input.Status),
|
||||
input.StartedAt,
|
||||
).Scan(&id)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (r *opsRepository) UpdateRetryAttempt(ctx context.Context, input *service.OpsUpdateRetryAttemptInput) error {
|
||||
if r == nil || r.db == nil {
|
||||
return fmt.Errorf("nil ops repository")
|
||||
}
|
||||
if input == nil {
|
||||
return fmt.Errorf("nil input")
|
||||
}
|
||||
if input.ID <= 0 {
|
||||
return fmt.Errorf("invalid id")
|
||||
}
|
||||
|
||||
q := `
|
||||
UPDATE ops_retry_attempts
|
||||
SET
|
||||
status = $2,
|
||||
finished_at = $3,
|
||||
duration_ms = $4,
|
||||
success = $5,
|
||||
http_status_code = $6,
|
||||
upstream_request_id = $7,
|
||||
used_account_id = $8,
|
||||
response_preview = $9,
|
||||
response_truncated = $10,
|
||||
result_request_id = $11,
|
||||
result_error_id = $12,
|
||||
error_message = $13
|
||||
WHERE id = $1`
|
||||
|
||||
_, err := r.db.ExecContext(
|
||||
ctx,
|
||||
q,
|
||||
input.ID,
|
||||
strings.TrimSpace(input.Status),
|
||||
nullTime(input.FinishedAt),
|
||||
input.DurationMs,
|
||||
nullBool(input.Success),
|
||||
nullInt(input.HTTPStatusCode),
|
||||
opsNullString(input.UpstreamRequestID),
|
||||
nullInt64(input.UsedAccountID),
|
||||
opsNullString(input.ResponsePreview),
|
||||
nullBool(input.ResponseTruncated),
|
||||
opsNullString(input.ResultRequestID),
|
||||
nullInt64(input.ResultErrorID),
|
||||
opsNullString(input.ErrorMessage),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *opsRepository) GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*service.OpsRetryAttempt, error) {
|
||||
if r == nil || r.db == nil {
|
||||
return nil, fmt.Errorf("nil ops repository")
|
||||
}
|
||||
if sourceErrorID <= 0 {
|
||||
return nil, fmt.Errorf("invalid source_error_id")
|
||||
}
|
||||
|
||||
q := `
|
||||
SELECT
|
||||
id,
|
||||
created_at,
|
||||
COALESCE(requested_by_user_id, 0),
|
||||
source_error_id,
|
||||
COALESCE(mode, ''),
|
||||
pinned_account_id,
|
||||
COALESCE(status, ''),
|
||||
started_at,
|
||||
finished_at,
|
||||
duration_ms,
|
||||
success,
|
||||
http_status_code,
|
||||
upstream_request_id,
|
||||
used_account_id,
|
||||
response_preview,
|
||||
response_truncated,
|
||||
result_request_id,
|
||||
result_error_id,
|
||||
error_message
|
||||
FROM ops_retry_attempts
|
||||
WHERE source_error_id = $1
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1`
|
||||
|
||||
var out service.OpsRetryAttempt
|
||||
var pinnedAccountID sql.NullInt64
|
||||
var requestedBy sql.NullInt64
|
||||
var startedAt sql.NullTime
|
||||
var finishedAt sql.NullTime
|
||||
var durationMs sql.NullInt64
|
||||
var success sql.NullBool
|
||||
var httpStatusCode sql.NullInt64
|
||||
var upstreamRequestID sql.NullString
|
||||
var usedAccountID sql.NullInt64
|
||||
var responsePreview sql.NullString
|
||||
var responseTruncated sql.NullBool
|
||||
var resultRequestID sql.NullString
|
||||
var resultErrorID sql.NullInt64
|
||||
var errorMessage sql.NullString
|
||||
|
||||
err := r.db.QueryRowContext(ctx, q, sourceErrorID).Scan(
|
||||
&out.ID,
|
||||
&out.CreatedAt,
|
||||
&requestedBy,
|
||||
&out.SourceErrorID,
|
||||
&out.Mode,
|
||||
&pinnedAccountID,
|
||||
&out.Status,
|
||||
&startedAt,
|
||||
&finishedAt,
|
||||
&durationMs,
|
||||
&success,
|
||||
&httpStatusCode,
|
||||
&upstreamRequestID,
|
||||
&usedAccountID,
|
||||
&responsePreview,
|
||||
&responseTruncated,
|
||||
&resultRequestID,
|
||||
&resultErrorID,
|
||||
&errorMessage,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out.RequestedByUserID = requestedBy.Int64
|
||||
if pinnedAccountID.Valid {
|
||||
v := pinnedAccountID.Int64
|
||||
out.PinnedAccountID = &v
|
||||
}
|
||||
if startedAt.Valid {
|
||||
t := startedAt.Time
|
||||
out.StartedAt = &t
|
||||
}
|
||||
if finishedAt.Valid {
|
||||
t := finishedAt.Time
|
||||
out.FinishedAt = &t
|
||||
}
|
||||
if durationMs.Valid {
|
||||
v := durationMs.Int64
|
||||
out.DurationMs = &v
|
||||
}
|
||||
if success.Valid {
|
||||
v := success.Bool
|
||||
out.Success = &v
|
||||
}
|
||||
if httpStatusCode.Valid {
|
||||
v := int(httpStatusCode.Int64)
|
||||
out.HTTPStatusCode = &v
|
||||
}
|
||||
if upstreamRequestID.Valid {
|
||||
s := upstreamRequestID.String
|
||||
out.UpstreamRequestID = &s
|
||||
}
|
||||
if usedAccountID.Valid {
|
||||
v := usedAccountID.Int64
|
||||
out.UsedAccountID = &v
|
||||
}
|
||||
if responsePreview.Valid {
|
||||
s := responsePreview.String
|
||||
out.ResponsePreview = &s
|
||||
}
|
||||
if responseTruncated.Valid {
|
||||
v := responseTruncated.Bool
|
||||
out.ResponseTruncated = &v
|
||||
}
|
||||
if resultRequestID.Valid {
|
||||
s := resultRequestID.String
|
||||
out.ResultRequestID = &s
|
||||
}
|
||||
if resultErrorID.Valid {
|
||||
v := resultErrorID.Int64
|
||||
out.ResultErrorID = &v
|
||||
}
|
||||
if errorMessage.Valid {
|
||||
s := errorMessage.String
|
||||
out.ErrorMessage = &s
|
||||
}
|
||||
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
func nullTime(t time.Time) sql.NullTime {
|
||||
if t.IsZero() {
|
||||
return sql.NullTime{}
|
||||
}
|
||||
return sql.NullTime{Time: t, Valid: true}
|
||||
}
|
||||
|
||||
func nullBool(v *bool) sql.NullBool {
|
||||
if v == nil {
|
||||
return sql.NullBool{}
|
||||
}
|
||||
return sql.NullBool{Bool: *v, Valid: true}
|
||||
}
|
||||
|
||||
func (r *opsRepository) ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*service.OpsRetryAttempt, error) {
|
||||
if r == nil || r.db == nil {
|
||||
return nil, fmt.Errorf("nil ops repository")
|
||||
}
|
||||
if sourceErrorID <= 0 {
|
||||
return nil, fmt.Errorf("invalid source_error_id")
|
||||
}
|
||||
if limit <= 0 {
|
||||
limit = 50
|
||||
}
|
||||
if limit > 200 {
|
||||
limit = 200
|
||||
}
|
||||
|
||||
q := `
|
||||
SELECT
|
||||
r.id,
|
||||
r.created_at,
|
||||
COALESCE(r.requested_by_user_id, 0),
|
||||
r.source_error_id,
|
||||
COALESCE(r.mode, ''),
|
||||
r.pinned_account_id,
|
||||
COALESCE(pa.name, ''),
|
||||
COALESCE(r.status, ''),
|
||||
r.started_at,
|
||||
r.finished_at,
|
||||
r.duration_ms,
|
||||
r.success,
|
||||
r.http_status_code,
|
||||
r.upstream_request_id,
|
||||
r.used_account_id,
|
||||
COALESCE(ua.name, ''),
|
||||
r.response_preview,
|
||||
r.response_truncated,
|
||||
r.result_request_id,
|
||||
r.result_error_id,
|
||||
r.error_message
|
||||
FROM ops_retry_attempts r
|
||||
LEFT JOIN accounts pa ON r.pinned_account_id = pa.id
|
||||
LEFT JOIN accounts ua ON r.used_account_id = ua.id
|
||||
WHERE r.source_error_id = $1
|
||||
ORDER BY r.created_at DESC
|
||||
LIMIT $2`
|
||||
|
||||
rows, err := r.db.QueryContext(ctx, q, sourceErrorID, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
|
||||
out := make([]*service.OpsRetryAttempt, 0, 16)
|
||||
for rows.Next() {
|
||||
var item service.OpsRetryAttempt
|
||||
var pinnedAccountID sql.NullInt64
|
||||
var pinnedAccountName string
|
||||
var requestedBy sql.NullInt64
|
||||
var startedAt sql.NullTime
|
||||
var finishedAt sql.NullTime
|
||||
var durationMs sql.NullInt64
|
||||
var success sql.NullBool
|
||||
var httpStatusCode sql.NullInt64
|
||||
var upstreamRequestID sql.NullString
|
||||
var usedAccountID sql.NullInt64
|
||||
var usedAccountName string
|
||||
var responsePreview sql.NullString
|
||||
var responseTruncated sql.NullBool
|
||||
var resultRequestID sql.NullString
|
||||
var resultErrorID sql.NullInt64
|
||||
var errorMessage sql.NullString
|
||||
|
||||
if err := rows.Scan(
|
||||
&item.ID,
|
||||
&item.CreatedAt,
|
||||
&requestedBy,
|
||||
&item.SourceErrorID,
|
||||
&item.Mode,
|
||||
&pinnedAccountID,
|
||||
&pinnedAccountName,
|
||||
&item.Status,
|
||||
&startedAt,
|
||||
&finishedAt,
|
||||
&durationMs,
|
||||
&success,
|
||||
&httpStatusCode,
|
||||
&upstreamRequestID,
|
||||
&usedAccountID,
|
||||
&usedAccountName,
|
||||
&responsePreview,
|
||||
&responseTruncated,
|
||||
&resultRequestID,
|
||||
&resultErrorID,
|
||||
&errorMessage,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
item.RequestedByUserID = requestedBy.Int64
|
||||
if pinnedAccountID.Valid {
|
||||
v := pinnedAccountID.Int64
|
||||
item.PinnedAccountID = &v
|
||||
}
|
||||
item.PinnedAccountName = pinnedAccountName
|
||||
if startedAt.Valid {
|
||||
t := startedAt.Time
|
||||
item.StartedAt = &t
|
||||
}
|
||||
if finishedAt.Valid {
|
||||
t := finishedAt.Time
|
||||
item.FinishedAt = &t
|
||||
}
|
||||
if durationMs.Valid {
|
||||
v := durationMs.Int64
|
||||
item.DurationMs = &v
|
||||
}
|
||||
if success.Valid {
|
||||
v := success.Bool
|
||||
item.Success = &v
|
||||
}
|
||||
if httpStatusCode.Valid {
|
||||
v := int(httpStatusCode.Int64)
|
||||
item.HTTPStatusCode = &v
|
||||
}
|
||||
if upstreamRequestID.Valid {
|
||||
item.UpstreamRequestID = &upstreamRequestID.String
|
||||
}
|
||||
if usedAccountID.Valid {
|
||||
v := usedAccountID.Int64
|
||||
item.UsedAccountID = &v
|
||||
}
|
||||
item.UsedAccountName = usedAccountName
|
||||
if responsePreview.Valid {
|
||||
item.ResponsePreview = &responsePreview.String
|
||||
}
|
||||
if responseTruncated.Valid {
|
||||
v := responseTruncated.Bool
|
||||
item.ResponseTruncated = &v
|
||||
}
|
||||
if resultRequestID.Valid {
|
||||
item.ResultRequestID = &resultRequestID.String
|
||||
}
|
||||
if resultErrorID.Valid {
|
||||
v := resultErrorID.Int64
|
||||
item.ResultErrorID = &v
|
||||
}
|
||||
if errorMessage.Valid {
|
||||
item.ErrorMessage = &errorMessage.String
|
||||
}
|
||||
out = append(out, &item)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (r *opsRepository) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error {
|
||||
func (r *opsRepository) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedAt *time.Time) error {
|
||||
if r == nil || r.db == nil {
|
||||
return fmt.Errorf("nil ops repository")
|
||||
}
|
||||
@ -1004,8 +556,7 @@ UPDATE ops_error_logs
|
||||
SET
|
||||
resolved = $2,
|
||||
resolved_at = $3,
|
||||
resolved_by_user_id = $4,
|
||||
resolved_retry_id = $5
|
||||
resolved_by_user_id = $4
|
||||
WHERE id = $1`
|
||||
|
||||
at := sql.NullTime{}
|
||||
@ -1023,7 +574,6 @@ WHERE id = $1`
|
||||
resolved,
|
||||
at,
|
||||
nullInt64(resolvedByUserID),
|
||||
nullInt64(resolvedRetryID),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
44
backend/internal/repository/ops_repo_replay_cleanup_test.go
Normal file
44
backend/internal/repository/ops_repo_replay_cleanup_test.go
Normal file
@ -0,0 +1,44 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
)
|
||||
|
||||
func TestOpsErrorLogInsertDoesNotPersistRequestReplayFields(t *testing.T) {
|
||||
disallowedColumns := []string{
|
||||
"request_body",
|
||||
"request_headers",
|
||||
"request_body_truncated",
|
||||
"request_body_bytes",
|
||||
"is_retryable",
|
||||
"retry_count",
|
||||
"resolved_retry_id",
|
||||
}
|
||||
|
||||
insertSQL := strings.ToLower(insertOpsErrorLogSQL)
|
||||
for _, column := range disallowedColumns {
|
||||
if strings.Contains(insertSQL, column) {
|
||||
t.Fatalf("ops error log insert still references dropped replay column %q", column)
|
||||
}
|
||||
}
|
||||
|
||||
inputType := reflect.TypeOf(service.OpsInsertErrorLogInput{})
|
||||
disallowedFields := []string{
|
||||
"RequestBodyJSON",
|
||||
"RequestBodyTruncated",
|
||||
"RequestBodyBytes",
|
||||
"RequestHeadersJSON",
|
||||
"IsRetryable",
|
||||
"RetryCount",
|
||||
"ResolvedRetryID",
|
||||
}
|
||||
for _, field := range disallowedFields {
|
||||
if _, ok := inputType.FieldByName(field); ok {
|
||||
t.Fatalf("OpsInsertErrorLogInput still carries replay field %q", field)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -174,22 +174,17 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
|
||||
// Error logs (legacy)
|
||||
ops.GET("/errors", h.Admin.Ops.GetErrorLogs)
|
||||
ops.GET("/errors/:id", h.Admin.Ops.GetErrorLogByID)
|
||||
ops.GET("/errors/:id/retries", h.Admin.Ops.ListRetryAttempts)
|
||||
ops.POST("/errors/:id/retry", h.Admin.Ops.RetryErrorRequest)
|
||||
ops.PUT("/errors/:id/resolve", h.Admin.Ops.UpdateErrorResolution)
|
||||
|
||||
// Request errors (client-visible failures)
|
||||
ops.GET("/request-errors", h.Admin.Ops.ListRequestErrors)
|
||||
ops.GET("/request-errors/:id", h.Admin.Ops.GetRequestError)
|
||||
ops.GET("/request-errors/:id/upstream-errors", h.Admin.Ops.ListRequestErrorUpstreamErrors)
|
||||
ops.POST("/request-errors/:id/retry-client", h.Admin.Ops.RetryRequestErrorClient)
|
||||
ops.POST("/request-errors/:id/upstream-errors/:idx/retry", h.Admin.Ops.RetryRequestErrorUpstreamEvent)
|
||||
ops.PUT("/request-errors/:id/resolve", h.Admin.Ops.ResolveRequestError)
|
||||
|
||||
// Upstream errors (independent upstream failures)
|
||||
ops.GET("/upstream-errors", h.Admin.Ops.ListUpstreamErrors)
|
||||
ops.GET("/upstream-errors/:id", h.Admin.Ops.GetUpstreamError)
|
||||
ops.POST("/upstream-errors/:id/retry", h.Admin.Ops.RetryUpstreamError)
|
||||
ops.PUT("/upstream-errors/:id/resolve", h.Admin.Ops.ResolveUpstreamError)
|
||||
|
||||
// Request drilldown (success + error)
|
||||
|
||||
@ -628,11 +628,6 @@ urlFallbackLoop:
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Capture upstream request body for ops retry of this attempt.
|
||||
if p.c != nil && len(p.body) > 0 {
|
||||
p.c.Set(OpsUpstreamRequestBodyKey, string(p.body))
|
||||
}
|
||||
|
||||
resp, err = p.httpUpstream.Do(upstreamReq, p.proxyURL, p.account.ID, p.account.Concurrency)
|
||||
if err == nil && resp == nil {
|
||||
err = errors.New("upstream returned nil response")
|
||||
|
||||
@ -188,11 +188,6 @@ func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardStreamPreservesBodyAnd
|
||||
require.NotContains(t, rec.Body.String(), `"cache_read_input_tokens":7`, "透传输出不应被网关改写")
|
||||
require.Equal(t, 7, result.Usage.CacheReadInputTokens, "计费 usage 解析应保留 cached_tokens 兼容")
|
||||
require.Empty(t, rec.Header().Get("Set-Cookie"), "响应头应经过安全过滤")
|
||||
rawBody, ok := c.Get(OpsUpstreamRequestBodyKey)
|
||||
require.True(t, ok)
|
||||
bodyBytes, ok := rawBody.([]byte)
|
||||
require.True(t, ok, "应以 []byte 形式缓存上游请求体,避免重复 string 拷贝")
|
||||
require.Equal(t, "claude-3-haiku-20240307", gjson.GetBytes(bodyBytes, "model").String(), "缓存的上游请求体应包含映射后的模型")
|
||||
}
|
||||
|
||||
func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardCountTokensPreservesBody(t *testing.T) {
|
||||
@ -938,10 +933,6 @@ func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardDirect_UpstreamRequest
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "upstream request failed")
|
||||
require.Equal(t, http.StatusBadGateway, rec.Code)
|
||||
rawBody, ok := c.Get(OpsUpstreamRequestBodyKey)
|
||||
require.True(t, ok)
|
||||
_, ok = rawBody.([]byte)
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
||||
func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardDirect_EmptyResponseBody(t *testing.T) {
|
||||
|
||||
@ -4525,9 +4525,6 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
||||
// Pre-filter: strip empty text blocks (including nested in tool_result) to prevent upstream 400.
|
||||
body = StripEmptyTextBlocks(body)
|
||||
|
||||
// 重试间复用同一请求体,避免每次 string(body) 产生额外分配。
|
||||
setOpsUpstreamRequestBody(c, body)
|
||||
|
||||
// 重试循环
|
||||
var resp *http.Response
|
||||
retryStart := time.Now()
|
||||
@ -5018,9 +5015,6 @@ func (s *GatewayService) forwardAnthropicAPIKeyPassthroughWithInput(
|
||||
// Pre-filter: strip empty text blocks (including nested in tool_result) to prevent upstream 400.
|
||||
input.Body = StripEmptyTextBlocks(input.Body)
|
||||
|
||||
// 重试间复用同一请求体,避免每次 string(body) 产生额外分配。
|
||||
setOpsUpstreamRequestBody(c, input.Body)
|
||||
|
||||
var resp *http.Response
|
||||
retryStart := time.Now()
|
||||
for attempt := 1; attempt <= maxRetryAttempts; attempt++ {
|
||||
@ -6201,7 +6195,6 @@ func (s *GatewayService) buildUpstreamRequestAnthropicVertex(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
setOpsUpstreamRequestBody(c, vertexBody)
|
||||
fullURL, err := buildVertexAnthropicURL(account.VertexProjectID(), account.VertexLocation(modelID), modelID, reqStream)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -123,10 +123,6 @@ func (s *GeminiMessagesCompatService) forwardClaudeBodyAsChatCompletions(
|
||||
}
|
||||
requestIDHeader = idHeader
|
||||
|
||||
if c != nil {
|
||||
c.Set(OpsUpstreamRequestBodyKey, string(geminiReq))
|
||||
}
|
||||
|
||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
safeErr := sanitizeUpstreamErrorMessage(err.Error())
|
||||
|
||||
@ -766,12 +766,6 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
|
||||
}
|
||||
requestIDHeader = idHeader
|
||||
|
||||
// Capture upstream request body for ops retry of this attempt.
|
||||
if c != nil {
|
||||
// In this code path `body` is already the JSON sent to upstream.
|
||||
c.Set(OpsUpstreamRequestBodyKey, string(body))
|
||||
}
|
||||
|
||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
safeErr := sanitizeUpstreamErrorMessage(err.Error())
|
||||
@ -1293,12 +1287,6 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin.
|
||||
}
|
||||
requestIDHeader = idHeader
|
||||
|
||||
// Capture upstream request body for ops retry of this attempt.
|
||||
if c != nil {
|
||||
// In this code path `body` is already the JSON sent to upstream.
|
||||
c.Set(OpsUpstreamRequestBodyKey, string(body))
|
||||
}
|
||||
|
||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
safeErr := sanitizeUpstreamErrorMessage(err.Error())
|
||||
|
||||
@ -2473,9 +2473,6 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Capture upstream request body for ops retry of this attempt.
|
||||
setOpsUpstreamRequestBody(c, body)
|
||||
|
||||
// 命中 WS 时仅走 WebSocket Mode;不再自动回退 HTTP。
|
||||
if wsDecision.Transport == OpenAIUpstreamTransportResponsesWebsocketV2 {
|
||||
wsReqBody := reqBody
|
||||
@ -2748,7 +2745,6 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("serialize invalid_encrypted_content retry body: %w", err)
|
||||
}
|
||||
setOpsUpstreamRequestBody(c, body)
|
||||
httpInvalidEncryptedContentRetryTried = true
|
||||
logger.LegacyPrintf("service.openai_gateway", "[OpenAI] Retrying non-WSv2 request once after invalid_encrypted_content (account: %s)", account.Name)
|
||||
continue
|
||||
@ -2786,6 +2782,10 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
reasoningEffort := extractOpenAIReasoningEffort(reqBody, originalModel)
|
||||
serviceTier := extractOpenAIServiceTier(reqBody)
|
||||
releaseOpenAIParsedRequestBody(c)
|
||||
|
||||
// Handle normal response
|
||||
var usage *OpenAIUsage
|
||||
var firstTokenMs *int
|
||||
@ -2821,9 +2821,6 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
||||
usage = &OpenAIUsage{}
|
||||
}
|
||||
|
||||
reasoningEffort := extractOpenAIReasoningEffort(reqBody, originalModel)
|
||||
serviceTier := extractOpenAIServiceTier(reqBody)
|
||||
|
||||
forwardResult := &OpenAIForwardResult{
|
||||
RequestID: resp.Header.Get("x-request-id"),
|
||||
Usage: *usage,
|
||||
@ -3006,7 +3003,6 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough(
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
setOpsUpstreamRequestBody(c, body)
|
||||
if c != nil {
|
||||
c.Set("openai_passthrough", true)
|
||||
}
|
||||
@ -3045,6 +3041,8 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough(
|
||||
return nil, s.handleErrorResponsePassthrough(ctx, resp, c, account, body)
|
||||
}
|
||||
|
||||
serviceTier := extractOpenAIServiceTierFromBody(body)
|
||||
|
||||
var usage *OpenAIUsage
|
||||
var firstTokenMs *int
|
||||
imageCount := 0
|
||||
@ -3081,7 +3079,7 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough(
|
||||
Usage: *usage,
|
||||
Model: reqModel,
|
||||
UpstreamModel: upstreamPassthroughModel,
|
||||
ServiceTier: extractOpenAIServiceTierFromBody(body),
|
||||
ServiceTier: serviceTier,
|
||||
ReasoningEffort: reasoningEffort,
|
||||
Stream: reqStream,
|
||||
OpenAIWSMode: false,
|
||||
@ -6503,6 +6501,13 @@ func getOpenAIRequestBodyMap(c *gin.Context, body []byte) (map[string]any, error
|
||||
return reqBody, nil
|
||||
}
|
||||
|
||||
func releaseOpenAIParsedRequestBody(c *gin.Context) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
delete(c.Keys, OpenAIParsedRequestBodyKey)
|
||||
}
|
||||
|
||||
func extractOpenAIReasoningEffort(reqBody map[string]any, requestedModel string) *string {
|
||||
if value, present := getOpenAIReasoningEffortFromReqBody(reqBody); present {
|
||||
if value == "" {
|
||||
|
||||
@ -588,11 +588,7 @@ func (s *OpenAIGatewayService) forwardOpenAIImagesAPIKey(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !parsed.Multipart {
|
||||
setOpsUpstreamRequestBody(c, forwardBody)
|
||||
}
|
||||
|
||||
upstreamCtx, releaseUpstreamCtx := detachUpstreamContext(ctx)
|
||||
upstreamCtx, releaseUpstreamCtx := detachStreamUpstreamContext(ctx, parsed.Stream)
|
||||
defer releaseUpstreamCtx()
|
||||
|
||||
token, _, err := s.GetAccessToken(upstreamCtx, account)
|
||||
|
||||
@ -979,8 +979,6 @@ func (s *OpenAIGatewayService) forwardOpenAIImagesOAuth(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
setOpsUpstreamRequestBody(c, responsesBody)
|
||||
|
||||
upstreamReq, err := s.buildUpstreamRequest(upstreamCtx, c, account, responsesBody, token, true, parsed.StickySessionSeed(), false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -26,7 +26,6 @@ type opsCleanupTarget struct {
|
||||
|
||||
type opsCleanupDeletedCounts struct {
|
||||
errorLogs int64
|
||||
retryAttempts int64
|
||||
alertEvents int64
|
||||
systemLogs int64
|
||||
logAudits int64
|
||||
@ -37,9 +36,8 @@ type opsCleanupDeletedCounts struct {
|
||||
|
||||
func (c opsCleanupDeletedCounts) String() string {
|
||||
return fmt.Sprintf(
|
||||
"error_logs=%d retry_attempts=%d alert_events=%d system_logs=%d log_audits=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d",
|
||||
"error_logs=%d alert_events=%d system_logs=%d log_audits=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d",
|
||||
c.errorLogs,
|
||||
c.retryAttempts,
|
||||
c.alertEvents,
|
||||
c.systemLogs,
|
||||
c.logAudits,
|
||||
|
||||
@ -299,7 +299,6 @@ func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDelet
|
||||
|
||||
targets := []opsCleanupTarget{
|
||||
{effective.ErrorLogRetentionDays, "ops_error_logs", "created_at", false, &out.errorLogs},
|
||||
{effective.ErrorLogRetentionDays, "ops_retry_attempts", "created_at", false, &out.retryAttempts},
|
||||
{effective.ErrorLogRetentionDays, "ops_alert_events", "created_at", false, &out.alertEvents},
|
||||
{effective.ErrorLogRetentionDays, "ops_system_logs", "created_at", false, &out.systemLogs},
|
||||
{effective.ErrorLogRetentionDays, "ops_system_log_cleanup_audits", "created_at", false, &out.logAudits},
|
||||
|
||||
@ -37,14 +37,10 @@ type OpsErrorLog struct {
|
||||
Platform string `json:"platform"`
|
||||
Model string `json:"model"`
|
||||
|
||||
IsRetryable bool `json:"is_retryable"`
|
||||
RetryCount int `json:"retry_count"`
|
||||
|
||||
Resolved bool `json:"resolved"`
|
||||
ResolvedAt *time.Time `json:"resolved_at"`
|
||||
ResolvedByUserID *int64 `json:"resolved_by_user_id"`
|
||||
ResolvedByUserName string `json:"resolved_by_user_name"`
|
||||
ResolvedRetryID *int64 `json:"resolved_retry_id"`
|
||||
ResolvedStatusRaw string `json:"-"`
|
||||
|
||||
ClientRequestID string `json:"client_request_id"`
|
||||
@ -89,12 +85,6 @@ type OpsErrorLogDetail struct {
|
||||
ResponseLatencyMs *int64 `json:"response_latency_ms"`
|
||||
TimeToFirstTokenMs *int64 `json:"time_to_first_token_ms"`
|
||||
|
||||
// Retry context
|
||||
RequestBody string `json:"request_body"`
|
||||
RequestBodyTruncated bool `json:"request_body_truncated"`
|
||||
RequestBodyBytes *int `json:"request_body_bytes"`
|
||||
RequestHeaders string `json:"request_headers,omitempty"`
|
||||
|
||||
// vNext metric semantics
|
||||
IsBusinessLimited bool `json:"is_business_limited"`
|
||||
}
|
||||
@ -136,55 +126,3 @@ type OpsErrorLogList struct {
|
||||
Page int `json:"page"`
|
||||
PageSize int `json:"page_size"`
|
||||
}
|
||||
|
||||
type OpsRetryAttempt struct {
|
||||
ID int64 `json:"id"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
|
||||
RequestedByUserID int64 `json:"requested_by_user_id"`
|
||||
SourceErrorID int64 `json:"source_error_id"`
|
||||
Mode string `json:"mode"`
|
||||
PinnedAccountID *int64 `json:"pinned_account_id"`
|
||||
PinnedAccountName string `json:"pinned_account_name"`
|
||||
|
||||
Status string `json:"status"`
|
||||
StartedAt *time.Time `json:"started_at"`
|
||||
FinishedAt *time.Time `json:"finished_at"`
|
||||
DurationMs *int64 `json:"duration_ms"`
|
||||
|
||||
// Persisted execution results (best-effort)
|
||||
Success *bool `json:"success"`
|
||||
HTTPStatusCode *int `json:"http_status_code"`
|
||||
UpstreamRequestID *string `json:"upstream_request_id"`
|
||||
UsedAccountID *int64 `json:"used_account_id"`
|
||||
UsedAccountName string `json:"used_account_name"`
|
||||
ResponsePreview *string `json:"response_preview"`
|
||||
ResponseTruncated *bool `json:"response_truncated"`
|
||||
|
||||
// Optional correlation
|
||||
ResultRequestID *string `json:"result_request_id"`
|
||||
ResultErrorID *int64 `json:"result_error_id"`
|
||||
|
||||
ErrorMessage *string `json:"error_message"`
|
||||
}
|
||||
|
||||
type OpsRetryResult struct {
|
||||
AttemptID int64 `json:"attempt_id"`
|
||||
Mode string `json:"mode"`
|
||||
Status string `json:"status"`
|
||||
|
||||
PinnedAccountID *int64 `json:"pinned_account_id"`
|
||||
UsedAccountID *int64 `json:"used_account_id"`
|
||||
|
||||
HTTPStatusCode int `json:"http_status_code"`
|
||||
UpstreamRequestID string `json:"upstream_request_id"`
|
||||
|
||||
ResponsePreview string `json:"response_preview"`
|
||||
ResponseTruncated bool `json:"response_truncated"`
|
||||
|
||||
ErrorMessage string `json:"error_message"`
|
||||
|
||||
StartedAt time.Time `json:"started_at"`
|
||||
FinishedAt time.Time `json:"finished_at"`
|
||||
DurationMs int64 `json:"duration_ms"`
|
||||
}
|
||||
|
||||
@ -16,11 +16,7 @@ type OpsRepository interface {
|
||||
DeleteSystemLogs(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error)
|
||||
InsertSystemLogCleanupAudit(ctx context.Context, input *OpsSystemLogCleanupAudit) error
|
||||
|
||||
InsertRetryAttempt(ctx context.Context, input *OpsInsertRetryAttemptInput) (int64, error)
|
||||
UpdateRetryAttempt(ctx context.Context, input *OpsUpdateRetryAttemptInput) error
|
||||
GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*OpsRetryAttempt, error)
|
||||
ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*OpsRetryAttempt, error)
|
||||
UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error
|
||||
UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedAt *time.Time) error
|
||||
|
||||
// Lightweight window stats (for realtime WS / quick sampling).
|
||||
GetWindowStats(ctx context.Context, filter *OpsDashboardFilter) (*OpsWindowStats, error)
|
||||
@ -121,51 +117,9 @@ type OpsInsertErrorLogInput struct {
|
||||
ResponseLatencyMs *int64
|
||||
TimeToFirstTokenMs *int64
|
||||
|
||||
RequestBodyJSON *string // sanitized json string (not raw bytes)
|
||||
RequestBodyTruncated bool
|
||||
RequestBodyBytes *int
|
||||
RequestHeadersJSON *string // optional json string
|
||||
|
||||
IsRetryable bool
|
||||
RetryCount int
|
||||
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
type OpsInsertRetryAttemptInput struct {
|
||||
RequestedByUserID int64
|
||||
SourceErrorID int64
|
||||
Mode string
|
||||
PinnedAccountID *int64
|
||||
|
||||
// running|queued etc.
|
||||
Status string
|
||||
StartedAt time.Time
|
||||
}
|
||||
|
||||
type OpsUpdateRetryAttemptInput struct {
|
||||
ID int64
|
||||
|
||||
// succeeded|failed
|
||||
Status string
|
||||
FinishedAt time.Time
|
||||
DurationMs int64
|
||||
|
||||
// Persisted execution results (best-effort)
|
||||
Success *bool
|
||||
HTTPStatusCode *int
|
||||
UpstreamRequestID *string
|
||||
UsedAccountID *int64
|
||||
ResponsePreview *string
|
||||
ResponseTruncated *bool
|
||||
|
||||
// Optional correlation (legacy fields kept)
|
||||
ResultRequestID *string
|
||||
ResultErrorID *int64
|
||||
|
||||
ErrorMessage *string
|
||||
}
|
||||
|
||||
type OpsInsertSystemMetricsInput struct {
|
||||
CreatedAt time.Time
|
||||
WindowMinutes int
|
||||
|
||||
@ -69,23 +69,7 @@ func (m *opsRepoMock) InsertSystemLogCleanupAudit(ctx context.Context, input *Op
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *opsRepoMock) InsertRetryAttempt(ctx context.Context, input *OpsInsertRetryAttemptInput) (int64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (m *opsRepoMock) UpdateRetryAttempt(ctx context.Context, input *OpsUpdateRetryAttemptInput) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *opsRepoMock) GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*OpsRetryAttempt, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *opsRepoMock) ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*OpsRetryAttempt, error) {
|
||||
return []*OpsRetryAttempt{}, nil
|
||||
}
|
||||
|
||||
func (m *opsRepoMock) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error {
|
||||
func (m *opsRepoMock) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedAt *time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -1,726 +0,0 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/domain"
|
||||
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/lib/pq"
|
||||
)
|
||||
|
||||
const (
|
||||
OpsRetryModeClient = "client"
|
||||
OpsRetryModeUpstream = "upstream"
|
||||
)
|
||||
|
||||
const (
|
||||
opsRetryStatusRunning = "running"
|
||||
opsRetryStatusSucceeded = "succeeded"
|
||||
opsRetryStatusFailed = "failed"
|
||||
)
|
||||
|
||||
const (
|
||||
opsRetryTimeout = 60 * time.Second
|
||||
opsRetryCaptureBytesLimit = 64 * 1024
|
||||
opsRetryResponsePreviewMax = 8 * 1024
|
||||
opsRetryMinIntervalPerError = 10 * time.Second
|
||||
opsRetryMaxAccountSwitches = 3
|
||||
)
|
||||
|
||||
var opsRetryRequestHeaderAllowlist = map[string]bool{
|
||||
"anthropic-beta": true,
|
||||
"anthropic-version": true,
|
||||
}
|
||||
|
||||
type opsRetryRequestType string
|
||||
|
||||
const (
|
||||
opsRetryTypeMessages opsRetryRequestType = "messages"
|
||||
opsRetryTypeOpenAI opsRetryRequestType = "openai_responses"
|
||||
opsRetryTypeGeminiV1B opsRetryRequestType = "gemini_v1beta"
|
||||
)
|
||||
|
||||
type limitedResponseWriter struct {
|
||||
header http.Header
|
||||
wroteHeader bool
|
||||
|
||||
limit int
|
||||
totalWritten int64
|
||||
buf bytes.Buffer
|
||||
}
|
||||
|
||||
func newLimitedResponseWriter(limit int) *limitedResponseWriter {
|
||||
if limit <= 0 {
|
||||
limit = 1
|
||||
}
|
||||
return &limitedResponseWriter{
|
||||
header: make(http.Header),
|
||||
limit: limit,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *limitedResponseWriter) Header() http.Header {
|
||||
return w.header
|
||||
}
|
||||
|
||||
func (w *limitedResponseWriter) WriteHeader(statusCode int) {
|
||||
if w.wroteHeader {
|
||||
return
|
||||
}
|
||||
w.wroteHeader = true
|
||||
}
|
||||
|
||||
func (w *limitedResponseWriter) Write(p []byte) (int, error) {
|
||||
if !w.wroteHeader {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
w.totalWritten += int64(len(p))
|
||||
|
||||
if w.buf.Len() < w.limit {
|
||||
remaining := w.limit - w.buf.Len()
|
||||
if len(p) > remaining {
|
||||
_, _ = w.buf.Write(p[:remaining])
|
||||
} else {
|
||||
_, _ = w.buf.Write(p)
|
||||
}
|
||||
}
|
||||
|
||||
// Pretend we wrote everything to avoid upstream/client code treating it as an error.
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (w *limitedResponseWriter) Flush() {}
|
||||
|
||||
func (w *limitedResponseWriter) bodyBytes() []byte {
|
||||
return w.buf.Bytes()
|
||||
}
|
||||
|
||||
func (w *limitedResponseWriter) truncated() bool {
|
||||
return w.totalWritten > int64(w.limit)
|
||||
}
|
||||
|
||||
const (
|
||||
OpsRetryModeUpstreamEvent = "upstream_event"
|
||||
)
|
||||
|
||||
func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, errorID int64, mode string, pinnedAccountID *int64) (*OpsRetryResult, error) {
|
||||
if err := s.RequireMonitoringEnabled(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.opsRepo == nil {
|
||||
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
|
||||
}
|
||||
|
||||
mode = strings.ToLower(strings.TrimSpace(mode))
|
||||
switch mode {
|
||||
case OpsRetryModeClient, OpsRetryModeUpstream:
|
||||
default:
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_INVALID_MODE", "mode must be client or upstream")
|
||||
}
|
||||
|
||||
errorLog, err := s.GetErrorLogByID(ctx, errorID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if errorLog == nil {
|
||||
return nil, infraerrors.NotFound("OPS_ERROR_NOT_FOUND", "ops error log not found")
|
||||
}
|
||||
if strings.TrimSpace(errorLog.RequestBody) == "" {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_NO_REQUEST_BODY", "No request body found to retry")
|
||||
}
|
||||
|
||||
var pinned *int64
|
||||
if mode == OpsRetryModeUpstream {
|
||||
if pinnedAccountID != nil && *pinnedAccountID > 0 {
|
||||
pinned = pinnedAccountID
|
||||
} else if errorLog.AccountID != nil && *errorLog.AccountID > 0 {
|
||||
pinned = errorLog.AccountID
|
||||
} else {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "pinned_account_id is required for upstream retry")
|
||||
}
|
||||
}
|
||||
|
||||
return s.retryWithErrorLog(ctx, requestedByUserID, errorID, mode, mode, pinned, errorLog)
|
||||
}
|
||||
|
||||
// RetryUpstreamEvent retries a specific upstream attempt captured inside ops_error_logs.upstream_errors.
|
||||
// idx is 0-based. It always pins the original event account_id.
|
||||
func (s *OpsService) RetryUpstreamEvent(ctx context.Context, requestedByUserID int64, errorID int64, idx int) (*OpsRetryResult, error) {
|
||||
if err := s.RequireMonitoringEnabled(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.opsRepo == nil {
|
||||
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
|
||||
}
|
||||
if idx < 0 {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_INVALID_UPSTREAM_IDX", "invalid upstream idx")
|
||||
}
|
||||
|
||||
errorLog, err := s.GetErrorLogByID(ctx, errorID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if errorLog == nil {
|
||||
return nil, infraerrors.NotFound("OPS_ERROR_NOT_FOUND", "ops error log not found")
|
||||
}
|
||||
|
||||
events, err := ParseOpsUpstreamErrors(errorLog.UpstreamErrors)
|
||||
if err != nil {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_EVENTS_INVALID", "invalid upstream_errors")
|
||||
}
|
||||
if idx >= len(events) {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_IDX_OOB", "upstream idx out of range")
|
||||
}
|
||||
ev := events[idx]
|
||||
if ev == nil {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_EVENT_MISSING", "upstream event missing")
|
||||
}
|
||||
if ev.AccountID <= 0 {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "account_id is required for upstream retry")
|
||||
}
|
||||
|
||||
upstreamBody := strings.TrimSpace(ev.UpstreamRequestBody)
|
||||
if upstreamBody == "" {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_NO_REQUEST_BODY", "No upstream request body found to retry")
|
||||
}
|
||||
|
||||
override := *errorLog
|
||||
override.RequestBody = upstreamBody
|
||||
pinned := ev.AccountID
|
||||
|
||||
// Persist as upstream_event, execute as upstream pinned retry.
|
||||
return s.retryWithErrorLog(ctx, requestedByUserID, errorID, OpsRetryModeUpstreamEvent, OpsRetryModeUpstream, &pinned, &override)
|
||||
}
|
||||
|
||||
func (s *OpsService) retryWithErrorLog(ctx context.Context, requestedByUserID int64, errorID int64, mode string, execMode string, pinnedAccountID *int64, errorLog *OpsErrorLogDetail) (*OpsRetryResult, error) {
|
||||
latest, err := s.opsRepo.GetLatestRetryAttemptForError(ctx, errorID)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, infraerrors.InternalServer("OPS_RETRY_LOAD_LATEST_FAILED", "Failed to check retry status").WithCause(err)
|
||||
}
|
||||
if latest != nil {
|
||||
if strings.EqualFold(latest.Status, opsRetryStatusRunning) || strings.EqualFold(latest.Status, "queued") {
|
||||
return nil, infraerrors.Conflict("OPS_RETRY_IN_PROGRESS", "A retry is already in progress for this error")
|
||||
}
|
||||
|
||||
lastAttemptAt := latest.CreatedAt
|
||||
if latest.FinishedAt != nil && !latest.FinishedAt.IsZero() {
|
||||
lastAttemptAt = *latest.FinishedAt
|
||||
} else if latest.StartedAt != nil && !latest.StartedAt.IsZero() {
|
||||
lastAttemptAt = *latest.StartedAt
|
||||
}
|
||||
|
||||
if time.Since(lastAttemptAt) < opsRetryMinIntervalPerError {
|
||||
return nil, infraerrors.Conflict("OPS_RETRY_TOO_FREQUENT", "Please wait before retrying this error again")
|
||||
}
|
||||
}
|
||||
|
||||
if errorLog == nil || strings.TrimSpace(errorLog.RequestBody) == "" {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_NO_REQUEST_BODY", "No request body found to retry")
|
||||
}
|
||||
|
||||
var pinned *int64
|
||||
if execMode == OpsRetryModeUpstream {
|
||||
if pinnedAccountID != nil && *pinnedAccountID > 0 {
|
||||
pinned = pinnedAccountID
|
||||
} else if errorLog.AccountID != nil && *errorLog.AccountID > 0 {
|
||||
pinned = errorLog.AccountID
|
||||
} else {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "account_id is required for upstream retry")
|
||||
}
|
||||
}
|
||||
|
||||
startedAt := time.Now()
|
||||
attemptID, err := s.opsRepo.InsertRetryAttempt(ctx, &OpsInsertRetryAttemptInput{
|
||||
RequestedByUserID: requestedByUserID,
|
||||
SourceErrorID: errorID,
|
||||
Mode: mode,
|
||||
PinnedAccountID: pinned,
|
||||
Status: opsRetryStatusRunning,
|
||||
StartedAt: startedAt,
|
||||
})
|
||||
if err != nil {
|
||||
var pqErr *pq.Error
|
||||
if errors.As(err, &pqErr) && string(pqErr.Code) == "23505" {
|
||||
return nil, infraerrors.Conflict("OPS_RETRY_IN_PROGRESS", "A retry is already in progress for this error")
|
||||
}
|
||||
return nil, infraerrors.InternalServer("OPS_RETRY_CREATE_ATTEMPT_FAILED", "Failed to create retry attempt").WithCause(err)
|
||||
}
|
||||
|
||||
result := &OpsRetryResult{
|
||||
AttemptID: attemptID,
|
||||
Mode: mode,
|
||||
Status: opsRetryStatusFailed,
|
||||
PinnedAccountID: pinned,
|
||||
HTTPStatusCode: 0,
|
||||
UpstreamRequestID: "",
|
||||
ResponsePreview: "",
|
||||
ResponseTruncated: false,
|
||||
ErrorMessage: "",
|
||||
StartedAt: startedAt,
|
||||
}
|
||||
|
||||
execCtx, cancel := context.WithTimeout(ctx, opsRetryTimeout)
|
||||
defer cancel()
|
||||
|
||||
execRes := s.executeRetry(execCtx, errorLog, execMode, pinned)
|
||||
|
||||
finishedAt := time.Now()
|
||||
result.FinishedAt = finishedAt
|
||||
result.DurationMs = finishedAt.Sub(startedAt).Milliseconds()
|
||||
|
||||
if execRes != nil {
|
||||
result.Status = execRes.status
|
||||
result.UsedAccountID = execRes.usedAccountID
|
||||
result.HTTPStatusCode = execRes.httpStatusCode
|
||||
result.UpstreamRequestID = execRes.upstreamRequestID
|
||||
result.ResponsePreview = execRes.responsePreview
|
||||
result.ResponseTruncated = execRes.responseTruncated
|
||||
result.ErrorMessage = execRes.errorMessage
|
||||
}
|
||||
|
||||
updateCtx, updateCancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer updateCancel()
|
||||
|
||||
var updateErrMsg *string
|
||||
if strings.TrimSpace(result.ErrorMessage) != "" {
|
||||
msg := result.ErrorMessage
|
||||
updateErrMsg = &msg
|
||||
}
|
||||
// Keep legacy result_request_id empty; use upstream_request_id instead.
|
||||
var resultRequestID *string
|
||||
|
||||
finalStatus := result.Status
|
||||
if strings.TrimSpace(finalStatus) == "" {
|
||||
finalStatus = opsRetryStatusFailed
|
||||
}
|
||||
|
||||
success := strings.EqualFold(finalStatus, opsRetryStatusSucceeded)
|
||||
httpStatus := result.HTTPStatusCode
|
||||
upstreamReqID := result.UpstreamRequestID
|
||||
usedAccountID := result.UsedAccountID
|
||||
preview := result.ResponsePreview
|
||||
truncated := result.ResponseTruncated
|
||||
|
||||
if err := s.opsRepo.UpdateRetryAttempt(updateCtx, &OpsUpdateRetryAttemptInput{
|
||||
ID: attemptID,
|
||||
Status: finalStatus,
|
||||
FinishedAt: finishedAt,
|
||||
DurationMs: result.DurationMs,
|
||||
Success: &success,
|
||||
HTTPStatusCode: &httpStatus,
|
||||
UpstreamRequestID: &upstreamReqID,
|
||||
UsedAccountID: usedAccountID,
|
||||
ResponsePreview: &preview,
|
||||
ResponseTruncated: &truncated,
|
||||
ResultRequestID: resultRequestID,
|
||||
ErrorMessage: updateErrMsg,
|
||||
}); err != nil {
|
||||
log.Printf("[Ops] UpdateRetryAttempt failed: %v", err)
|
||||
} else if success {
|
||||
if err := s.opsRepo.UpdateErrorResolution(updateCtx, errorID, true, &requestedByUserID, &attemptID, &finishedAt); err != nil {
|
||||
log.Printf("[Ops] UpdateErrorResolution failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type opsRetryExecution struct {
|
||||
status string
|
||||
|
||||
usedAccountID *int64
|
||||
httpStatusCode int
|
||||
upstreamRequestID string
|
||||
|
||||
responsePreview string
|
||||
responseTruncated bool
|
||||
|
||||
errorMessage string
|
||||
}
|
||||
|
||||
func (s *OpsService) executeRetry(ctx context.Context, errorLog *OpsErrorLogDetail, mode string, pinnedAccountID *int64) *opsRetryExecution {
|
||||
if errorLog == nil {
|
||||
return &opsRetryExecution{
|
||||
status: opsRetryStatusFailed,
|
||||
errorMessage: "missing error log",
|
||||
}
|
||||
}
|
||||
|
||||
reqType := detectOpsRetryType(errorLog.RequestPath)
|
||||
bodyBytes := []byte(errorLog.RequestBody)
|
||||
|
||||
switch reqType {
|
||||
case opsRetryTypeMessages:
|
||||
bodyBytes = FilterThinkingBlocksForRetry(bodyBytes)
|
||||
case opsRetryTypeOpenAI, opsRetryTypeGeminiV1B:
|
||||
// No-op
|
||||
}
|
||||
|
||||
switch strings.ToLower(strings.TrimSpace(mode)) {
|
||||
case OpsRetryModeUpstream:
|
||||
if pinnedAccountID == nil || *pinnedAccountID <= 0 {
|
||||
return &opsRetryExecution{
|
||||
status: opsRetryStatusFailed,
|
||||
errorMessage: "pinned_account_id required for upstream retry",
|
||||
}
|
||||
}
|
||||
return s.executePinnedRetry(ctx, reqType, errorLog, bodyBytes, *pinnedAccountID)
|
||||
case OpsRetryModeClient:
|
||||
return s.executeClientRetry(ctx, reqType, errorLog, bodyBytes)
|
||||
default:
|
||||
return &opsRetryExecution{
|
||||
status: opsRetryStatusFailed,
|
||||
errorMessage: "invalid retry mode",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func detectOpsRetryType(path string) opsRetryRequestType {
|
||||
p := strings.ToLower(strings.TrimSpace(path))
|
||||
switch {
|
||||
case strings.Contains(p, "/responses"), strings.Contains(p, "/images/"):
|
||||
return opsRetryTypeOpenAI
|
||||
case strings.Contains(p, "/v1beta/"):
|
||||
return opsRetryTypeGeminiV1B
|
||||
default:
|
||||
return opsRetryTypeMessages
|
||||
}
|
||||
}
|
||||
|
||||
func (s *OpsService) executePinnedRetry(ctx context.Context, reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte, pinnedAccountID int64) *opsRetryExecution {
|
||||
if s.accountRepo == nil {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account repository not available"}
|
||||
}
|
||||
|
||||
account, err := s.accountRepo.GetByID(ctx, pinnedAccountID)
|
||||
if err != nil {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: fmt.Sprintf("account not found: %v", err)}
|
||||
}
|
||||
if account == nil {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account not found"}
|
||||
}
|
||||
if !account.IsSchedulable() {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account is not schedulable"}
|
||||
}
|
||||
if errorLog.GroupID != nil && *errorLog.GroupID > 0 {
|
||||
if !containsInt64(account.GroupIDs, *errorLog.GroupID) {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "pinned account is not in the same group as the original request"}
|
||||
}
|
||||
}
|
||||
|
||||
var release func()
|
||||
if s.concurrencyService != nil {
|
||||
acq, err := s.concurrencyService.AcquireAccountSlot(ctx, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: fmt.Sprintf("acquire account slot failed: %v", err)}
|
||||
}
|
||||
if acq == nil || !acq.Acquired {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "account concurrency limit reached"}
|
||||
}
|
||||
release = acq.ReleaseFunc
|
||||
}
|
||||
if release != nil {
|
||||
defer release()
|
||||
}
|
||||
|
||||
usedID := account.ID
|
||||
exec := s.executeWithAccount(ctx, reqType, errorLog, body, account)
|
||||
exec.usedAccountID = &usedID
|
||||
if exec.status == "" {
|
||||
exec.status = opsRetryStatusFailed
|
||||
}
|
||||
return exec
|
||||
}
|
||||
|
||||
func (s *OpsService) executeClientRetry(ctx context.Context, reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte) *opsRetryExecution {
|
||||
groupID := errorLog.GroupID
|
||||
if groupID == nil || *groupID <= 0 {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "group_id missing; cannot reselect account"}
|
||||
}
|
||||
|
||||
model, stream, parsedErr := extractRetryModelAndStream(reqType, errorLog, body)
|
||||
if parsedErr != nil {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: parsedErr.Error()}
|
||||
}
|
||||
_ = stream
|
||||
|
||||
excluded := make(map[int64]struct{})
|
||||
switches := 0
|
||||
|
||||
for {
|
||||
if switches >= opsRetryMaxAccountSwitches {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "retry failed after exhausting account failovers"}
|
||||
}
|
||||
|
||||
selection, selErr := s.selectAccountForRetry(ctx, reqType, groupID, model, excluded)
|
||||
if selErr != nil {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: selErr.Error()}
|
||||
}
|
||||
if selection == nil || selection.Account == nil {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: ErrNoAvailableAccounts.Error()}
|
||||
}
|
||||
|
||||
account := selection.Account
|
||||
if !selection.Acquired || selection.ReleaseFunc == nil {
|
||||
excluded[account.ID] = struct{}{}
|
||||
switches++
|
||||
continue
|
||||
}
|
||||
|
||||
attemptCtx := ctx
|
||||
if switches > 0 {
|
||||
attemptCtx = WithAccountSwitchCount(attemptCtx, switches, false)
|
||||
}
|
||||
exec := func() *opsRetryExecution {
|
||||
defer selection.ReleaseFunc()
|
||||
return s.executeWithAccount(attemptCtx, reqType, errorLog, body, account)
|
||||
}()
|
||||
|
||||
if exec != nil {
|
||||
if exec.status == opsRetryStatusSucceeded {
|
||||
usedID := account.ID
|
||||
exec.usedAccountID = &usedID
|
||||
return exec
|
||||
}
|
||||
// If the gateway services ask for failover, try another account.
|
||||
if s.isFailoverError(exec.errorMessage) {
|
||||
excluded[account.ID] = struct{}{}
|
||||
switches++
|
||||
continue
|
||||
}
|
||||
usedID := account.ID
|
||||
exec.usedAccountID = &usedID
|
||||
return exec
|
||||
}
|
||||
|
||||
excluded[account.ID] = struct{}{}
|
||||
switches++
|
||||
}
|
||||
}
|
||||
|
||||
func (s *OpsService) selectAccountForRetry(ctx context.Context, reqType opsRetryRequestType, groupID *int64, model string, excludedIDs map[int64]struct{}) (*AccountSelectionResult, error) {
|
||||
switch reqType {
|
||||
case opsRetryTypeOpenAI:
|
||||
if s.openAIGatewayService == nil {
|
||||
return nil, fmt.Errorf("openai gateway service not available")
|
||||
}
|
||||
return s.openAIGatewayService.SelectAccountWithLoadAwareness(ctx, groupID, "", model, excludedIDs)
|
||||
case opsRetryTypeGeminiV1B, opsRetryTypeMessages:
|
||||
if s.gatewayService == nil {
|
||||
return nil, fmt.Errorf("gateway service not available")
|
||||
}
|
||||
return s.gatewayService.SelectAccountWithLoadAwareness(ctx, groupID, "", model, excludedIDs, "", int64(0)) // 重试不使用会话限制
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported retry type: %s", reqType)
|
||||
}
|
||||
}
|
||||
|
||||
func extractRetryModelAndStream(reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte) (model string, stream bool, err error) {
|
||||
switch reqType {
|
||||
case opsRetryTypeMessages:
|
||||
parsed, parseErr := ParseGatewayRequest(body, domain.PlatformAnthropic)
|
||||
if parseErr != nil {
|
||||
return "", false, fmt.Errorf("failed to parse messages request body: %w", parseErr)
|
||||
}
|
||||
return parsed.Model, parsed.Stream, nil
|
||||
case opsRetryTypeOpenAI:
|
||||
var v struct {
|
||||
Model string `json:"model"`
|
||||
Stream bool `json:"stream"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &v); err != nil {
|
||||
return "", false, fmt.Errorf("failed to parse openai request body: %w", err)
|
||||
}
|
||||
return strings.TrimSpace(v.Model), v.Stream, nil
|
||||
case opsRetryTypeGeminiV1B:
|
||||
if strings.TrimSpace(errorLog.Model) == "" {
|
||||
return "", false, fmt.Errorf("missing model for gemini v1beta retry")
|
||||
}
|
||||
return strings.TrimSpace(errorLog.Model), errorLog.Stream, nil
|
||||
default:
|
||||
return "", false, fmt.Errorf("unsupported retry type: %s", reqType)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *OpsService) executeWithAccount(ctx context.Context, reqType opsRetryRequestType, errorLog *OpsErrorLogDetail, body []byte, account *Account) *opsRetryExecution {
|
||||
if account == nil {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "missing account"}
|
||||
}
|
||||
|
||||
c, w := newOpsRetryContext(ctx, errorLog)
|
||||
|
||||
var err error
|
||||
switch reqType {
|
||||
case opsRetryTypeOpenAI:
|
||||
if s.openAIGatewayService == nil {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "openai gateway service not available"}
|
||||
}
|
||||
_, err = s.openAIGatewayService.Forward(ctx, c, account, body)
|
||||
case opsRetryTypeGeminiV1B:
|
||||
if s.geminiCompatService == nil || s.antigravityGatewayService == nil {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "gemini services not available"}
|
||||
}
|
||||
modelName := strings.TrimSpace(errorLog.Model)
|
||||
action := "generateContent"
|
||||
if errorLog.Stream {
|
||||
action = "streamGenerateContent"
|
||||
}
|
||||
if account.Platform == PlatformAntigravity {
|
||||
_, err = s.antigravityGatewayService.ForwardGemini(ctx, c, account, modelName, action, errorLog.Stream, body, false)
|
||||
} else {
|
||||
_, err = s.geminiCompatService.ForwardNative(ctx, c, account, modelName, action, errorLog.Stream, body)
|
||||
}
|
||||
case opsRetryTypeMessages:
|
||||
switch account.Platform {
|
||||
case PlatformAntigravity:
|
||||
if s.antigravityGatewayService == nil {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "antigravity gateway service not available"}
|
||||
}
|
||||
_, err = s.antigravityGatewayService.Forward(ctx, c, account, body, false)
|
||||
case PlatformGemini:
|
||||
if s.geminiCompatService == nil {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "gemini gateway service not available"}
|
||||
}
|
||||
_, err = s.geminiCompatService.Forward(ctx, c, account, body)
|
||||
default:
|
||||
if s.gatewayService == nil {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "gateway service not available"}
|
||||
}
|
||||
parsedReq, parseErr := ParseGatewayRequest(body, domain.PlatformAnthropic)
|
||||
if parseErr != nil {
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "failed to parse request body"}
|
||||
}
|
||||
_, err = s.gatewayService.Forward(ctx, c, account, parsedReq)
|
||||
}
|
||||
default:
|
||||
return &opsRetryExecution{status: opsRetryStatusFailed, errorMessage: "unsupported retry type"}
|
||||
}
|
||||
|
||||
statusCode := http.StatusOK
|
||||
if c != nil && c.Writer != nil {
|
||||
statusCode = c.Writer.Status()
|
||||
}
|
||||
|
||||
upstreamReqID := extractUpstreamRequestID(c)
|
||||
preview, truncated := extractResponsePreview(w)
|
||||
|
||||
exec := &opsRetryExecution{
|
||||
status: opsRetryStatusFailed,
|
||||
httpStatusCode: statusCode,
|
||||
upstreamRequestID: upstreamReqID,
|
||||
responsePreview: preview,
|
||||
responseTruncated: truncated,
|
||||
errorMessage: "",
|
||||
}
|
||||
|
||||
if err == nil && statusCode < 400 {
|
||||
exec.status = opsRetryStatusSucceeded
|
||||
return exec
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
exec.errorMessage = err.Error()
|
||||
} else {
|
||||
exec.errorMessage = fmt.Sprintf("upstream returned status %d", statusCode)
|
||||
}
|
||||
|
||||
return exec
|
||||
}
|
||||
|
||||
func newOpsRetryContext(ctx context.Context, errorLog *OpsErrorLogDetail) (*gin.Context, *limitedResponseWriter) {
|
||||
w := newLimitedResponseWriter(opsRetryCaptureBytesLimit)
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
path := "/"
|
||||
if errorLog != nil && strings.TrimSpace(errorLog.RequestPath) != "" {
|
||||
path = errorLog.RequestPath
|
||||
}
|
||||
|
||||
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "http://localhost"+path, bytes.NewReader(nil))
|
||||
req.Header.Set("content-type", "application/json")
|
||||
if errorLog != nil && strings.TrimSpace(errorLog.UserAgent) != "" {
|
||||
req.Header.Set("user-agent", errorLog.UserAgent)
|
||||
}
|
||||
// Restore a minimal, whitelisted subset of request headers to improve retry fidelity
|
||||
// (e.g. anthropic-beta / anthropic-version). Never replay auth credentials.
|
||||
if errorLog != nil && strings.TrimSpace(errorLog.RequestHeaders) != "" {
|
||||
var stored map[string]string
|
||||
if err := json.Unmarshal([]byte(errorLog.RequestHeaders), &stored); err == nil {
|
||||
for k, v := range stored {
|
||||
key := strings.TrimSpace(k)
|
||||
if key == "" {
|
||||
continue
|
||||
}
|
||||
if !opsRetryRequestHeaderAllowlist[strings.ToLower(key)] {
|
||||
continue
|
||||
}
|
||||
val := strings.TrimSpace(v)
|
||||
if val == "" {
|
||||
continue
|
||||
}
|
||||
req.Header.Set(key, val)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.Request = req
|
||||
SetOpenAIClientTransport(c, OpenAIClientTransportHTTP)
|
||||
return c, w
|
||||
}
|
||||
|
||||
func extractUpstreamRequestID(c *gin.Context) string {
|
||||
if c == nil || c.Writer == nil {
|
||||
return ""
|
||||
}
|
||||
h := c.Writer.Header()
|
||||
if h == nil {
|
||||
return ""
|
||||
}
|
||||
for _, key := range []string{"x-request-id", "X-Request-Id", "X-Request-ID"} {
|
||||
if v := strings.TrimSpace(h.Get(key)); v != "" {
|
||||
return v
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func extractResponsePreview(w *limitedResponseWriter) (preview string, truncated bool) {
|
||||
if w == nil {
|
||||
return "", false
|
||||
}
|
||||
b := bytes.TrimSpace(w.bodyBytes())
|
||||
if len(b) == 0 {
|
||||
return "", w.truncated()
|
||||
}
|
||||
if len(b) > opsRetryResponsePreviewMax {
|
||||
return string(b[:opsRetryResponsePreviewMax]), true
|
||||
}
|
||||
return string(b), w.truncated()
|
||||
}
|
||||
|
||||
func containsInt64(items []int64, needle int64) bool {
|
||||
for _, v := range items {
|
||||
if v == needle {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *OpsService) isFailoverError(message string) bool {
|
||||
msg := strings.ToLower(strings.TrimSpace(message))
|
||||
if msg == "" {
|
||||
return false
|
||||
}
|
||||
return strings.Contains(msg, "upstream error:") && strings.Contains(msg, "failover")
|
||||
}
|
||||
@ -1,47 +0,0 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewOpsRetryContext_SetsHTTPTransportAndRequestHeaders(t *testing.T) {
|
||||
errorLog := &OpsErrorLogDetail{
|
||||
OpsErrorLog: OpsErrorLog{
|
||||
RequestPath: "/openai/v1/responses",
|
||||
},
|
||||
UserAgent: "ops-retry-agent/1.0",
|
||||
RequestHeaders: `{
|
||||
"anthropic-beta":"beta-v1",
|
||||
"ANTHROPIC-VERSION":"2023-06-01",
|
||||
"authorization":"Bearer should-not-forward"
|
||||
}`,
|
||||
}
|
||||
|
||||
c, w := newOpsRetryContext(context.Background(), errorLog)
|
||||
require.NotNil(t, c)
|
||||
require.NotNil(t, w)
|
||||
require.NotNil(t, c.Request)
|
||||
|
||||
require.Equal(t, "/openai/v1/responses", c.Request.URL.Path)
|
||||
require.Equal(t, "application/json", c.Request.Header.Get("Content-Type"))
|
||||
require.Equal(t, "ops-retry-agent/1.0", c.Request.Header.Get("User-Agent"))
|
||||
require.Equal(t, "beta-v1", c.Request.Header.Get("anthropic-beta"))
|
||||
require.Equal(t, "2023-06-01", c.Request.Header.Get("anthropic-version"))
|
||||
require.Empty(t, c.Request.Header.Get("authorization"), "未在白名单内的敏感头不应被重放")
|
||||
require.Equal(t, OpenAIClientTransportHTTP, GetOpenAIClientTransport(c))
|
||||
}
|
||||
|
||||
func TestNewOpsRetryContext_InvalidHeadersJSONStillSetsHTTPTransport(t *testing.T) {
|
||||
errorLog := &OpsErrorLogDetail{
|
||||
RequestHeaders: "{invalid-json",
|
||||
}
|
||||
|
||||
c, _ := newOpsRetryContext(context.Background(), errorLog)
|
||||
require.NotNil(t, c)
|
||||
require.NotNil(t, c.Request)
|
||||
require.Equal(t, "/", c.Request.URL.Path)
|
||||
require.Equal(t, OpenAIClientTransportHTTP, GetOpenAIClientTransport(c))
|
||||
}
|
||||
@ -16,26 +16,9 @@ import (
|
||||
var ErrOpsDisabled = infraerrors.NotFound("OPS_DISABLED", "Ops monitoring is disabled")
|
||||
|
||||
const (
|
||||
opsMaxStoredRequestBodyBytes = 256 * 1024
|
||||
opsMaxStoredErrorBodyBytes = 20 * 1024
|
||||
opsMaxStoredErrorBodyBytes = 20 * 1024
|
||||
)
|
||||
|
||||
// PrepareOpsRequestBodyForQueue 在入队前对请求体执行脱敏与裁剪,返回可直接写入 OpsInsertErrorLogInput 的字段。
|
||||
// 该方法用于避免异步队列持有大块原始请求体,减少错误风暴下的内存放大风险。
|
||||
func PrepareOpsRequestBodyForQueue(raw []byte) (requestBodyJSON *string, truncated bool, requestBodyBytes *int) {
|
||||
if len(raw) == 0 {
|
||||
return nil, false, nil
|
||||
}
|
||||
sanitized, truncated, bytesLen := sanitizeAndTrimRequestBody(raw, opsMaxStoredRequestBodyBytes)
|
||||
if sanitized != "" {
|
||||
out := sanitized
|
||||
requestBodyJSON = &out
|
||||
}
|
||||
n := bytesLen
|
||||
requestBodyBytes = &n
|
||||
return requestBodyJSON, truncated, requestBodyBytes
|
||||
}
|
||||
|
||||
// OpsService provides ingestion and query APIs for the Ops monitoring module.
|
||||
type OpsService struct {
|
||||
opsRepo OpsRepository
|
||||
@ -138,8 +121,8 @@ func (s *OpsService) IsMonitoringEnabled(ctx context.Context) bool {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *OpsService) RecordError(ctx context.Context, entry *OpsInsertErrorLogInput, rawRequestBody []byte) error {
|
||||
prepared, ok, err := s.prepareErrorLogInput(ctx, entry, rawRequestBody)
|
||||
func (s *OpsService) RecordError(ctx context.Context, entry *OpsInsertErrorLogInput) error {
|
||||
prepared, ok, err := s.prepareErrorLogInput(ctx, entry)
|
||||
if err != nil {
|
||||
log.Printf("[Ops] RecordError prepare failed: %v", err)
|
||||
return err
|
||||
@ -162,7 +145,7 @@ func (s *OpsService) RecordErrorBatch(ctx context.Context, entries []*OpsInsertE
|
||||
}
|
||||
prepared := make([]*OpsInsertErrorLogInput, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
item, ok, err := s.prepareErrorLogInput(ctx, entry, nil)
|
||||
item, ok, err := s.prepareErrorLogInput(ctx, entry)
|
||||
if err != nil {
|
||||
log.Printf("[Ops] RecordErrorBatch prepare failed: %v", err)
|
||||
continue
|
||||
@ -198,7 +181,7 @@ func (s *OpsService) RecordErrorBatch(ctx context.Context, entries []*OpsInsertE
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *OpsService) prepareErrorLogInput(ctx context.Context, entry *OpsInsertErrorLogInput, rawRequestBody []byte) (*OpsInsertErrorLogInput, bool, error) {
|
||||
func (s *OpsService) prepareErrorLogInput(ctx context.Context, entry *OpsInsertErrorLogInput) (*OpsInsertErrorLogInput, bool, error) {
|
||||
if entry == nil {
|
||||
return nil, false, nil
|
||||
}
|
||||
@ -224,11 +207,6 @@ func (s *OpsService) prepareErrorLogInput(ctx context.Context, entry *OpsInsertE
|
||||
entry.ErrorType = "api_error"
|
||||
}
|
||||
|
||||
// Sanitize + trim request body (errors only).
|
||||
if len(rawRequestBody) > 0 {
|
||||
entry.RequestBodyJSON, entry.RequestBodyTruncated, entry.RequestBodyBytes = PrepareOpsRequestBodyForQueue(rawRequestBody)
|
||||
}
|
||||
|
||||
// Sanitize + truncate error_body to avoid storing sensitive data.
|
||||
if strings.TrimSpace(entry.ErrorBody) != "" {
|
||||
sanitized, _ := sanitizeErrorBodyForStorage(entry.ErrorBody, opsMaxStoredErrorBodyBytes)
|
||||
@ -315,25 +293,6 @@ func sanitizeOpsUpstreamErrors(entry *OpsInsertErrorLogInput) error {
|
||||
out.Detail = ""
|
||||
}
|
||||
|
||||
out.UpstreamRequestBody = strings.TrimSpace(out.UpstreamRequestBody)
|
||||
if out.UpstreamRequestBody != "" {
|
||||
// Reuse the same sanitization/trimming strategy as request body storage.
|
||||
// Keep it small so it is safe to persist in ops_error_logs JSON.
|
||||
sanitizedBody, truncated, _ := sanitizeAndTrimRequestBody([]byte(out.UpstreamRequestBody), 10*1024)
|
||||
if sanitizedBody != "" {
|
||||
out.UpstreamRequestBody = sanitizedBody
|
||||
if truncated {
|
||||
out.Kind = strings.TrimSpace(out.Kind)
|
||||
if out.Kind == "" {
|
||||
out.Kind = "upstream"
|
||||
}
|
||||
out.Kind = out.Kind + ":request_body_truncated"
|
||||
}
|
||||
} else {
|
||||
out.UpstreamRequestBody = ""
|
||||
}
|
||||
}
|
||||
|
||||
// Drop fully-empty events (can happen if only status code was known).
|
||||
if out.UpstreamStatusCode == 0 && out.Message == "" && out.Detail == "" {
|
||||
continue
|
||||
@ -381,27 +340,7 @@ func (s *OpsService) GetErrorLogByID(ctx context.Context, id int64) (*OpsErrorLo
|
||||
return detail, nil
|
||||
}
|
||||
|
||||
func (s *OpsService) ListRetryAttemptsByErrorID(ctx context.Context, errorID int64, limit int) ([]*OpsRetryAttempt, error) {
|
||||
if err := s.RequireMonitoringEnabled(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.opsRepo == nil {
|
||||
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
|
||||
}
|
||||
if errorID <= 0 {
|
||||
return nil, infraerrors.BadRequest("OPS_ERROR_INVALID_ID", "invalid error id")
|
||||
}
|
||||
items, err := s.opsRepo.ListRetryAttemptsByErrorID(ctx, errorID, limit)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return []*OpsRetryAttempt{}, nil
|
||||
}
|
||||
return nil, infraerrors.InternalServer("OPS_RETRY_LIST_FAILED", "Failed to list retry attempts").WithCause(err)
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func (s *OpsService) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64) error {
|
||||
func (s *OpsService) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64) error {
|
||||
if err := s.RequireMonitoringEnabled(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -418,10 +357,10 @@ func (s *OpsService) UpdateErrorResolution(ctx context.Context, errorID int64, r
|
||||
}
|
||||
return infraerrors.InternalServer("OPS_ERROR_LOAD_FAILED", "Failed to load ops error log").WithCause(err)
|
||||
}
|
||||
return s.opsRepo.UpdateErrorResolution(ctx, errorID, resolved, resolvedByUserID, resolvedRetryID, nil)
|
||||
return s.opsRepo.UpdateErrorResolution(ctx, errorID, resolved, resolvedByUserID, nil)
|
||||
}
|
||||
|
||||
func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, truncated bool, bytesLen int) {
|
||||
func sanitizeAndTrimJSONPayload(raw []byte, maxBytes int) (jsonString string, truncated bool, bytesLen int) {
|
||||
bytesLen = len(raw)
|
||||
if len(raw) == 0 {
|
||||
return "", false, 0
|
||||
@ -429,7 +368,7 @@ func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, tr
|
||||
|
||||
var decoded any
|
||||
if err := json.Unmarshal(raw, &decoded); err != nil {
|
||||
// If it's not valid JSON, don't store (retry would not be reliable anyway).
|
||||
// If it is not valid JSON, fall back to the caller's non-JSON handling.
|
||||
return "", false, bytesLen
|
||||
}
|
||||
|
||||
@ -465,7 +404,7 @@ func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, tr
|
||||
// This avoids downstream code that expects certain top-level keys from crashing.
|
||||
if root, ok := decoded.(map[string]any); ok {
|
||||
placeholder := shallowCopyMap(root)
|
||||
placeholder["request_body_truncated"] = true
|
||||
placeholder["payload_truncated"] = true
|
||||
|
||||
// Replace potentially huge arrays/strings, but keep the keys present.
|
||||
for _, k := range []string{"messages", "contents", "input", "prompt"} {
|
||||
@ -488,7 +427,7 @@ func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, tr
|
||||
}
|
||||
|
||||
// Final fallback: minimal valid JSON.
|
||||
encoded4, err4 := json.Marshal(map[string]any{"request_body_truncated": true})
|
||||
encoded4, err4 := json.Marshal(map[string]any{"payload_truncated": true})
|
||||
if err4 != nil {
|
||||
return "", true, bytesLen
|
||||
}
|
||||
@ -732,7 +671,7 @@ func sanitizeErrorBodyForStorage(raw string, maxBytes int) (sanitized string, tr
|
||||
}
|
||||
|
||||
// Prefer JSON-safe sanitization when possible.
|
||||
if out, trunc, _ := sanitizeAndTrimRequestBody([]byte(raw), maxBytes); out != "" {
|
||||
if out, trunc, _ := sanitizeAndTrimJSONPayload([]byte(raw), maxBytes); out != "" {
|
||||
return out, trunc
|
||||
}
|
||||
|
||||
|
||||
@ -31,11 +31,10 @@ func TestOpsServiceRecordErrorBatch_SanitizesAndBatches(t *testing.T) {
|
||||
UpstreamErrorDetail: strPtr(detail),
|
||||
UpstreamErrors: []*OpsUpstreamErrorEvent{
|
||||
{
|
||||
AccountID: -2,
|
||||
UpstreamStatusCode: 429,
|
||||
Message: " token leaked ",
|
||||
Detail: `{"refresh_token":"secret"}`,
|
||||
UpstreamRequestBody: `{"api_key":"secret","messages":[{"role":"user","content":"hello"}]}`,
|
||||
AccountID: -2,
|
||||
UpstreamStatusCode: 429,
|
||||
Message: " token leaked ",
|
||||
Detail: `{"refresh_token":"secret"}`,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@ -1,60 +0,0 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPrepareOpsRequestBodyForQueue_EmptyBody(t *testing.T) {
|
||||
requestBodyJSON, truncated, requestBodyBytes := PrepareOpsRequestBodyForQueue(nil)
|
||||
require.Nil(t, requestBodyJSON)
|
||||
require.False(t, truncated)
|
||||
require.Nil(t, requestBodyBytes)
|
||||
}
|
||||
|
||||
func TestPrepareOpsRequestBodyForQueue_InvalidJSON(t *testing.T) {
|
||||
raw := []byte("{invalid-json")
|
||||
requestBodyJSON, truncated, requestBodyBytes := PrepareOpsRequestBodyForQueue(raw)
|
||||
require.Nil(t, requestBodyJSON)
|
||||
require.False(t, truncated)
|
||||
require.NotNil(t, requestBodyBytes)
|
||||
require.Equal(t, len(raw), *requestBodyBytes)
|
||||
}
|
||||
|
||||
func TestPrepareOpsRequestBodyForQueue_RedactSensitiveFields(t *testing.T) {
|
||||
raw := []byte(`{
|
||||
"model":"claude-3-5-sonnet-20241022",
|
||||
"api_key":"sk-test-123",
|
||||
"headers":{"authorization":"Bearer secret-token"},
|
||||
"messages":[{"role":"user","content":"hello"}]
|
||||
}`)
|
||||
|
||||
requestBodyJSON, truncated, requestBodyBytes := PrepareOpsRequestBodyForQueue(raw)
|
||||
require.NotNil(t, requestBodyJSON)
|
||||
require.NotNil(t, requestBodyBytes)
|
||||
require.False(t, truncated)
|
||||
require.Equal(t, len(raw), *requestBodyBytes)
|
||||
|
||||
var body map[string]any
|
||||
require.NoError(t, json.Unmarshal([]byte(*requestBodyJSON), &body))
|
||||
require.Equal(t, "[REDACTED]", body["api_key"])
|
||||
headers, ok := body["headers"].(map[string]any)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, "[REDACTED]", headers["authorization"])
|
||||
}
|
||||
|
||||
func TestPrepareOpsRequestBodyForQueue_LargeBodyTruncated(t *testing.T) {
|
||||
largeMsg := strings.Repeat("x", opsMaxStoredRequestBodyBytes*2)
|
||||
raw := []byte(`{"model":"claude-3-5-sonnet-20241022","messages":[{"role":"user","content":"` + largeMsg + `"}]}`)
|
||||
|
||||
requestBodyJSON, truncated, requestBodyBytes := PrepareOpsRequestBodyForQueue(raw)
|
||||
require.NotNil(t, requestBodyJSON)
|
||||
require.NotNil(t, requestBodyBytes)
|
||||
require.True(t, truncated)
|
||||
require.Equal(t, len(raw), *requestBodyBytes)
|
||||
require.LessOrEqual(t, len(*requestBodyJSON), opsMaxStoredRequestBodyBytes)
|
||||
require.Contains(t, *requestBodyJSON, "request_body_truncated")
|
||||
}
|
||||
@ -45,11 +45,11 @@ func TestIsSensitiveKey_TokenBudgetKeysNotRedacted(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSanitizeAndTrimRequestBody_PreservesTokenBudgetFields(t *testing.T) {
|
||||
func TestSanitizeAndTrimJSONPayload_PreservesTokenBudgetFields(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
raw := []byte(`{"model":"claude-3","max_tokens":123,"thinking":{"type":"enabled","budget_tokens":456},"access_token":"abc","messages":[{"role":"user","content":"hi"}]}`)
|
||||
out, _, _ := sanitizeAndTrimRequestBody(raw, 10*1024)
|
||||
out, _, _ := sanitizeAndTrimJSONPayload(raw, 10*1024)
|
||||
if out == "" {
|
||||
t.Fatalf("expected non-empty sanitized output")
|
||||
}
|
||||
|
||||
@ -16,11 +16,6 @@ const (
|
||||
OpsUpstreamErrorDetailKey = "ops_upstream_error_detail"
|
||||
OpsUpstreamErrorsKey = "ops_upstream_errors"
|
||||
|
||||
// Best-effort capture of the current upstream request body so ops can
|
||||
// retry the specific upstream attempt (not just the client request).
|
||||
// This value is sanitized+trimmed before being persisted.
|
||||
OpsUpstreamRequestBodyKey = "ops_upstream_request_body"
|
||||
|
||||
// Optional stage latencies (milliseconds) for troubleshooting and alerting.
|
||||
OpsAuthLatencyMsKey = "ops_auth_latency_ms"
|
||||
OpsRoutingLatencyMsKey = "ops_routing_latency_ms"
|
||||
@ -44,14 +39,6 @@ const (
|
||||
OpsClientBusinessLimitedReasonIPRestriction = "api_key_ip_restriction"
|
||||
)
|
||||
|
||||
func setOpsUpstreamRequestBody(c *gin.Context, body []byte) {
|
||||
if c == nil || len(body) == 0 {
|
||||
return
|
||||
}
|
||||
// 热路径避免 string(body) 额外分配,按需在落库前再转换。
|
||||
c.Set(OpsUpstreamRequestBodyKey, body)
|
||||
}
|
||||
|
||||
func SetOpsLatencyMs(c *gin.Context, key string, value int64) {
|
||||
if c == nil || strings.TrimSpace(key) == "" || value < 0 {
|
||||
return
|
||||
@ -125,10 +112,6 @@ type OpsUpstreamErrorEvent struct {
|
||||
// Helps debug 404/routing errors by showing which endpoint was targeted.
|
||||
UpstreamURL string `json:"upstream_url,omitempty"`
|
||||
|
||||
// Best-effort upstream request capture (sanitized+trimmed).
|
||||
// Required for retrying a specific upstream attempt.
|
||||
UpstreamRequestBody string `json:"upstream_request_body,omitempty"`
|
||||
|
||||
// Best-effort upstream response capture (sanitized+trimmed).
|
||||
UpstreamResponseBody string `json:"upstream_response_body,omitempty"`
|
||||
|
||||
@ -148,7 +131,6 @@ func appendOpsUpstreamError(c *gin.Context, ev OpsUpstreamErrorEvent) {
|
||||
}
|
||||
ev.Platform = strings.TrimSpace(ev.Platform)
|
||||
ev.UpstreamRequestID = strings.TrimSpace(ev.UpstreamRequestID)
|
||||
ev.UpstreamRequestBody = strings.TrimSpace(ev.UpstreamRequestBody)
|
||||
ev.UpstreamResponseBody = strings.TrimSpace(ev.UpstreamResponseBody)
|
||||
ev.Kind = strings.TrimSpace(ev.Kind)
|
||||
ev.UpstreamURL = strings.TrimSpace(ev.UpstreamURL)
|
||||
@ -158,19 +140,6 @@ func appendOpsUpstreamError(c *gin.Context, ev OpsUpstreamErrorEvent) {
|
||||
ev.Message = sanitizeUpstreamErrorMessage(ev.Message)
|
||||
}
|
||||
|
||||
// If the caller didn't explicitly pass upstream request body but the gateway
|
||||
// stored it on the context, attach it so ops can retry this specific attempt.
|
||||
if ev.UpstreamRequestBody == "" {
|
||||
if v, ok := c.Get(OpsUpstreamRequestBodyKey); ok {
|
||||
switch raw := v.(type) {
|
||||
case string:
|
||||
ev.UpstreamRequestBody = strings.TrimSpace(raw)
|
||||
case []byte:
|
||||
ev.UpstreamRequestBody = strings.TrimSpace(string(raw))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var existing []*OpsUpstreamErrorEvent
|
||||
if v, ok := c.Get(OpsUpstreamErrorsKey); ok {
|
||||
if arr, ok := v.([]*OpsUpstreamErrorEvent); ok {
|
||||
|
||||
@ -1,10 +1,8 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@ -28,41 +26,3 @@ func TestSafeUpstreamURL(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAppendOpsUpstreamError_UsesRequestBodyBytesFromContext(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
|
||||
setOpsUpstreamRequestBody(c, []byte(`{"model":"gpt-5"}`))
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Kind: "http_error",
|
||||
Message: "upstream failed",
|
||||
})
|
||||
|
||||
v, ok := c.Get(OpsUpstreamErrorsKey)
|
||||
require.True(t, ok)
|
||||
events, ok := v.([]*OpsUpstreamErrorEvent)
|
||||
require.True(t, ok)
|
||||
require.Len(t, events, 1)
|
||||
require.Equal(t, `{"model":"gpt-5"}`, events[0].UpstreamRequestBody)
|
||||
}
|
||||
|
||||
func TestAppendOpsUpstreamError_UsesRequestBodyStringFromContext(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
|
||||
c.Set(OpsUpstreamRequestBodyKey, `{"model":"gpt-4"}`)
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Kind: "request_error",
|
||||
Message: "dial timeout",
|
||||
})
|
||||
|
||||
v, ok := c.Get(OpsUpstreamErrorsKey)
|
||||
require.True(t, ok)
|
||||
events, ok := v.([]*OpsUpstreamErrorEvent)
|
||||
require.True(t, ok)
|
||||
require.Len(t, events, 1)
|
||||
require.Equal(t, `{"model":"gpt-4"}`, events[0].UpstreamRequestBody)
|
||||
}
|
||||
|
||||
10
backend/internal/service/slice_helpers.go
Normal file
10
backend/internal/service/slice_helpers.go
Normal file
@ -0,0 +1,10 @@
|
||||
package service
|
||||
|
||||
func containsInt64(values []int64, target int64) bool {
|
||||
for _, v := range values {
|
||||
if v == target {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
16
backend/migrations/136_remove_ops_retry_replay.sql
Normal file
16
backend/migrations/136_remove_ops_retry_replay.sql
Normal file
@ -0,0 +1,16 @@
|
||||
-- Remove unused Ops retry/replay storage.
|
||||
-- The retry endpoints are no longer exposed, so keeping request bodies and
|
||||
-- retry audit rows only increases write width, memory retention, and DB size.
|
||||
|
||||
DROP TABLE IF EXISTS ops_retry_attempts CASCADE;
|
||||
|
||||
ALTER TABLE ops_error_logs
|
||||
DROP COLUMN IF EXISTS request_body,
|
||||
DROP COLUMN IF EXISTS request_headers,
|
||||
DROP COLUMN IF EXISTS request_body_truncated,
|
||||
DROP COLUMN IF EXISTS request_body_bytes,
|
||||
DROP COLUMN IF EXISTS is_retryable,
|
||||
DROP COLUMN IF EXISTS retry_count,
|
||||
DROP COLUMN IF EXISTS resolved_retry_id;
|
||||
|
||||
COMMENT ON TABLE ops_error_logs IS 'Ops error logs (vNext). Stores sanitized error details; request replay storage removed.';
|
||||
Loading…
x
Reference in New Issue
Block a user