sub2api/backend/internal/handler/ops_error_logger_test.go
benjamin ae6ee23e2e fix: 调整 Ops 错误分类的 SLA 排除逻辑
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-05-18 16:52:06 +08:00

503 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package handler
import (
"net/http"
"net/http/httptest"
"sync"
"testing"
middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
)
func resetOpsErrorLoggerStateForTest(t *testing.T) {
t.Helper()
opsErrorLogMu.Lock()
ch := opsErrorLogQueue
opsErrorLogQueue = nil
opsErrorLogStopping = true
opsErrorLogMu.Unlock()
if ch != nil {
close(ch)
}
opsErrorLogWorkersWg.Wait()
opsErrorLogOnce = sync.Once{}
opsErrorLogStopOnce = sync.Once{}
opsErrorLogWorkersWg = sync.WaitGroup{}
opsErrorLogMu = sync.RWMutex{}
opsErrorLogStopping = false
opsErrorLogQueueLen.Store(0)
opsErrorLogEnqueued.Store(0)
opsErrorLogDropped.Store(0)
opsErrorLogProcessed.Store(0)
opsErrorLogSanitized.Store(0)
opsErrorLogLastDropLogAt.Store(0)
opsErrorLogShutdownCh = make(chan struct{})
opsErrorLogShutdownOnce = sync.Once{}
opsErrorLogDrained.Store(false)
}
func TestAttachOpsRequestBodyToEntry_SanitizeAndTrim(t *testing.T) {
resetOpsErrorLoggerStateForTest(t)
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil)
raw := []byte(`{"access_token":"secret-token","messages":[{"role":"user","content":"hello"}]}`)
setOpsRequestContext(c, "claude-3", false, raw)
entry := &service.OpsInsertErrorLogInput{}
attachOpsRequestBodyToEntry(c, entry)
require.NotNil(t, entry.RequestBodyBytes)
require.Equal(t, len(raw), *entry.RequestBodyBytes)
require.NotNil(t, entry.RequestBodyJSON)
require.NotContains(t, *entry.RequestBodyJSON, "secret-token")
require.Contains(t, *entry.RequestBodyJSON, "[REDACTED]")
require.Equal(t, int64(1), OpsErrorLogSanitizedTotal())
}
func TestAttachOpsRequestBodyToEntry_InvalidJSONKeepsSize(t *testing.T) {
resetOpsErrorLoggerStateForTest(t)
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil)
raw := []byte("not-json")
setOpsRequestContext(c, "claude-3", false, raw)
entry := &service.OpsInsertErrorLogInput{}
attachOpsRequestBodyToEntry(c, entry)
require.Nil(t, entry.RequestBodyJSON)
require.NotNil(t, entry.RequestBodyBytes)
require.Equal(t, len(raw), *entry.RequestBodyBytes)
require.False(t, entry.RequestBodyTruncated)
require.Equal(t, int64(1), OpsErrorLogSanitizedTotal())
}
func TestEnqueueOpsErrorLog_QueueFullDrop(t *testing.T) {
resetOpsErrorLoggerStateForTest(t)
// 禁止 enqueueOpsErrorLog 触发 workers使用测试队列验证满队列降级。
opsErrorLogOnce.Do(func() {})
opsErrorLogMu.Lock()
opsErrorLogQueue = make(chan opsErrorLogJob, 1)
opsErrorLogMu.Unlock()
ops := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
entry := &service.OpsInsertErrorLogInput{ErrorPhase: "upstream", ErrorType: "upstream_error"}
enqueueOpsErrorLog(ops, entry)
enqueueOpsErrorLog(ops, entry)
require.Equal(t, int64(1), OpsErrorLogEnqueuedTotal())
require.Equal(t, int64(1), OpsErrorLogDroppedTotal())
require.Equal(t, int64(1), OpsErrorLogQueueLength())
}
func TestAttachOpsRequestBodyToEntry_EarlyReturnBranches(t *testing.T) {
resetOpsErrorLoggerStateForTest(t)
gin.SetMode(gin.TestMode)
entry := &service.OpsInsertErrorLogInput{}
attachOpsRequestBodyToEntry(nil, entry)
attachOpsRequestBodyToEntry(&gin.Context{}, nil)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil)
// 无请求体 key
attachOpsRequestBodyToEntry(c, entry)
require.Nil(t, entry.RequestBodyJSON)
require.Nil(t, entry.RequestBodyBytes)
require.False(t, entry.RequestBodyTruncated)
// 错误类型
c.Set(opsRequestBodyKey, "not-bytes")
attachOpsRequestBodyToEntry(c, entry)
require.Nil(t, entry.RequestBodyJSON)
require.Nil(t, entry.RequestBodyBytes)
// 空 bytes
c.Set(opsRequestBodyKey, []byte{})
attachOpsRequestBodyToEntry(c, entry)
require.Nil(t, entry.RequestBodyJSON)
require.Nil(t, entry.RequestBodyBytes)
require.Equal(t, int64(0), OpsErrorLogSanitizedTotal())
}
func TestEnqueueOpsErrorLog_EarlyReturnBranches(t *testing.T) {
resetOpsErrorLoggerStateForTest(t)
ops := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
entry := &service.OpsInsertErrorLogInput{ErrorPhase: "upstream", ErrorType: "upstream_error"}
// nil 入参分支
enqueueOpsErrorLog(nil, entry)
enqueueOpsErrorLog(ops, nil)
require.Equal(t, int64(0), OpsErrorLogEnqueuedTotal())
// shutdown 分支
close(opsErrorLogShutdownCh)
enqueueOpsErrorLog(ops, entry)
require.Equal(t, int64(0), OpsErrorLogEnqueuedTotal())
// stopping 分支
resetOpsErrorLoggerStateForTest(t)
opsErrorLogMu.Lock()
opsErrorLogStopping = true
opsErrorLogMu.Unlock()
enqueueOpsErrorLog(ops, entry)
require.Equal(t, int64(0), OpsErrorLogEnqueuedTotal())
// queue nil 分支(防止启动 worker 干扰)
resetOpsErrorLoggerStateForTest(t)
opsErrorLogOnce.Do(func() {})
opsErrorLogMu.Lock()
opsErrorLogQueue = nil
opsErrorLogMu.Unlock()
enqueueOpsErrorLog(ops, entry)
require.Equal(t, int64(0), OpsErrorLogEnqueuedTotal())
}
func TestOpsCaptureWriterPool_ResetOnRelease(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodGet, "/test", nil)
writer := acquireOpsCaptureWriter(c.Writer)
require.NotNil(t, writer)
_, err := writer.buf.WriteString("temp-error-body")
require.NoError(t, err)
releaseOpsCaptureWriter(writer)
reused := acquireOpsCaptureWriter(c.Writer)
defer releaseOpsCaptureWriter(reused)
require.Zero(t, reused.buf.Len(), "writer should be reset before reuse")
}
func TestOpsErrorLoggerMiddleware_DoesNotBreakOuterMiddlewares(t *testing.T) {
gin.SetMode(gin.TestMode)
r := gin.New()
r.Use(middleware2.Recovery())
r.Use(middleware2.RequestLogger())
r.Use(middleware2.Logger())
r.GET("/v1/messages", OpsErrorLoggerMiddleware(nil), func(c *gin.Context) {
c.Status(http.StatusNoContent)
})
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/v1/messages", nil)
require.NotPanics(t, func() {
r.ServeHTTP(rec, req)
})
require.Equal(t, http.StatusNoContent, rec.Code)
}
func TestIsKnownOpsErrorType(t *testing.T) {
known := []string{
"invalid_request_error",
"authentication_error",
"rate_limit_error",
"billing_error",
"subscription_error",
"upstream_error",
"overloaded_error",
"api_error",
"not_found_error",
"forbidden_error",
}
for _, k := range known {
require.True(t, isKnownOpsErrorType(k), "expected known: %s", k)
}
unknown := []string{"<nil>", "null", "", "random_error", "some_new_type", "<nil>\u003e"}
for _, u := range unknown {
require.False(t, isKnownOpsErrorType(u), "expected unknown: %q", u)
}
}
func TestNormalizeOpsErrorType(t *testing.T) {
tests := []struct {
name string
errType string
code string
want string
}{
// Known types pass through.
{"known invalid_request_error", "invalid_request_error", "", "invalid_request_error"},
{"known rate_limit_error", "rate_limit_error", "", "rate_limit_error"},
{"known upstream_error", "upstream_error", "", "upstream_error"},
// Unknown/garbage types are rejected and fall through to code-based or default.
{"nil literal from upstream", "<nil>", "", "api_error"},
{"null string", "null", "", "api_error"},
{"random string", "something_weird", "", "api_error"},
// Unknown type but known code still maps correctly.
{"nil with INSUFFICIENT_BALANCE code", "<nil>", "INSUFFICIENT_BALANCE", "billing_error"},
{"nil with USAGE_LIMIT_EXCEEDED code", "<nil>", "USAGE_LIMIT_EXCEEDED", "subscription_error"},
// Empty type falls through to code-based mapping.
{"empty type with balance code", "", "INSUFFICIENT_BALANCE", "billing_error"},
{"empty type with subscription code", "", "SUBSCRIPTION_NOT_FOUND", "subscription_error"},
{"empty type no code", "", "", "api_error"},
// Known type overrides conflicting code-based mapping.
{"known type overrides conflicting code", "rate_limit_error", "INSUFFICIENT_BALANCE", "rate_limit_error"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := normalizeOpsErrorType(tt.errType, tt.code)
require.Equal(t, tt.want, got)
})
}
}
func TestClassifyOpsNoAvailableAccountsExcludedFromSLA(t *testing.T) {
const message = "No available accounts"
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
markOpsRoutingCapacityLimited(c)
errType := normalizeOpsErrorType("api_error", "")
phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog(c, errType, message, "", http.StatusServiceUnavailable)
require.Equal(t, "api_error", errType)
require.Equal(t, "routing", phase)
require.True(t, isBusinessLimited)
require.Equal(t, "platform", errorOwner)
require.Equal(t, "gateway", errorSource)
}
func TestClassifyOpsRoutingCapacityMarkerExcludesMaskedSelectionFailureFromSLA(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
markOpsRoutingCapacityLimited(c)
phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog(
c,
"api_error",
"Service temporarily unavailable",
"",
http.StatusServiceUnavailable,
)
require.Equal(t, "routing", phase)
require.True(t, isBusinessLimited)
require.Equal(t, "platform", errorOwner)
require.Equal(t, "gateway", errorSource)
}
func TestClassifyOpsAuthClientErrorsExcludedFromSLA(t *testing.T) {
tests := []struct {
name string
errType string
message string
code string
status int
}{
{
name: "standard invalid API key",
errType: "api_error",
message: "Invalid API key",
code: "INVALID_API_KEY",
status: http.StatusUnauthorized,
},
{
name: "standard missing API key",
errType: "api_error",
message: "API key is required in Authorization header (Bearer scheme), x-api-key header, or x-goog-api-key header",
code: "API_KEY_REQUIRED",
status: http.StatusUnauthorized,
},
{
name: "google invalid API key",
errType: "api_error",
message: "Invalid API key",
code: "401",
status: http.StatusUnauthorized,
},
{
name: "google missing API key",
errType: "api_error",
message: "API key is required",
code: "401",
status: http.StatusUnauthorized,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
errType := normalizeOpsErrorType(tt.errType, tt.code)
phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog(c, errType, tt.message, tt.code, tt.status)
require.Equal(t, "api_error", errType)
require.Equal(t, "auth", phase)
require.True(t, isBusinessLimited)
require.Equal(t, "client", errorOwner)
require.Equal(t, "client_request", errorSource)
})
}
}
func TestClassifyOpsUnsupportedModelExcludedFromSLA(t *testing.T) {
tests := []string{
"No available accounts: no available accounts supporting model: made-up-model",
"No available accounts: no available OpenAI accounts supporting model: made-up-model",
"No available Gemini accounts: no available Gemini accounts supporting model: made-up-model",
"No available accounts: no available accounts supporting model: made-up-model (channel pricing restriction)",
}
for _, message := range tests {
t.Run(message, func(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
markOpsRoutingCapacityLimited(c)
errType := normalizeOpsErrorType("api_error", "")
phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog(c, errType, message, "", http.StatusServiceUnavailable)
require.Equal(t, "api_error", errType)
require.Equal(t, "routing", phase)
require.True(t, isBusinessLimited)
require.Equal(t, "platform", errorOwner)
require.Equal(t, "gateway", errorSource)
})
}
}
func TestClassifyOpsUnmarkedNoAvailableTextStillCountsForSLA(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog(
c,
"api_error",
"No available accounts",
"",
http.StatusServiceUnavailable,
)
require.Equal(t, "routing", phase)
require.False(t, isBusinessLimited)
require.Equal(t, "platform", errorOwner)
require.Equal(t, "gateway", errorSource)
}
func TestClassifyOpsUpstreamAuthTextStillCountsForSLA(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
service.SetOpsUpstreamError(c, http.StatusUnauthorized, "Invalid API key", "")
phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog(
c,
"api_error",
"Invalid API key",
"401",
http.StatusUnauthorized,
)
require.Equal(t, "upstream", phase)
require.False(t, isBusinessLimited)
require.Equal(t, "provider", errorOwner)
require.Equal(t, "upstream_http", errorSource)
}
func TestClassifyOpsUpstreamNoAvailableTextStillCountsForSLA(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
service.SetOpsUpstreamError(c, http.StatusServiceUnavailable, "No available accounts", "")
phase, isBusinessLimited, errorOwner, errorSource := classifyOpsErrorLog(
c,
"api_error",
"No available accounts",
"",
http.StatusServiceUnavailable,
)
require.Equal(t, "upstream", phase)
require.False(t, isBusinessLimited)
require.Equal(t, "provider", errorOwner)
require.Equal(t, "upstream_http", errorSource)
}
func TestSetOpsEndpointContext_SetsContextKeys(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil)
setOpsEndpointContext(c, "claude-3-5-sonnet-20241022", int16(2)) // stream
v, ok := c.Get(opsUpstreamModelKey)
require.True(t, ok)
vStr, ok := v.(string)
require.True(t, ok)
require.Equal(t, "claude-3-5-sonnet-20241022", vStr)
rt, ok := c.Get(opsRequestTypeKey)
require.True(t, ok)
rtVal, ok := rt.(int16)
require.True(t, ok)
require.Equal(t, int16(2), rtVal)
}
func TestSetOpsEndpointContext_EmptyModelNotStored(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil)
setOpsEndpointContext(c, "", int16(1))
_, ok := c.Get(opsUpstreamModelKey)
require.False(t, ok, "empty upstream model should not be stored")
rt, ok := c.Get(opsRequestTypeKey)
require.True(t, ok)
rtVal, ok := rt.(int16)
require.True(t, ok)
require.Equal(t, int16(1), rtVal)
}
func TestSetOpsEndpointContext_NilContext(t *testing.T) {
require.NotPanics(t, func() {
setOpsEndpointContext(nil, "model", int16(1))
})
}