From 82bc1e199f28b58d109aef0340f42823f5900b8d Mon Sep 17 00:00:00 2001 From: win Date: Wed, 20 May 2026 22:43:20 +0800 Subject: [PATCH] chore: remove unused real-time log stream / request event bus MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 删除 fork 独有的实时日志相关功能(上游 Wei-Shaw/sub2api 不存在): A. OpsLogBroadcaster + SSE 日志流(前端有用但用户不需要): - backend/internal/service/ops_log_broadcaster{,_test}.go - backend/internal/handler/ops_log_stream_middleware.go - backend/internal/handler/admin/ops_log_stream_handler.go - backend/internal/server/routes/admin.go: GET /admin/ops/logs/{stream,recent} - backend/internal/server/routes/{gateway,windsurf_gateway}.go: opsLogStream middleware - backend/internal/service/wire.go: ProvideOpsLogBroadcaster - frontend/src/views/admin/ops/OpsLogStreamView.vue - frontend/src/api/admin/ops.ts: subscribeOpsLogStream, getRecentOpsLogs, OpsLogEntry/OpsLogFilter/OpsLogRecentResponse 类型 - frontend/src/router/index.ts: AdminOpsLogStream 路由 - frontend/src/components/layout/AppSidebar.vue: 侧边栏入口 - frontend/src/i18n/locales/{en,zh}.ts: nav.opsLogStream + admin.ops.logStream 全部文案 B. RequestEventBus + WS 请求事件流(前端零调用 dead code): - backend/internal/service/request_event_bus{,_test}.go - backend/internal/handler/admin/ops_ws_requests_handler.go - backend/internal/server/routes/admin.go: GET /admin/ops/ws/requests - backend/internal/handler/gateway_handler.go: RequestEventBus 字段/参数 + reqStartTime + reqEventAccountID/reqEventStatus 跟踪 + defer Publish - backend/internal/service/wire.go: NewRequestEventBus - backend/internal/handler/admin/ops_handler.go: OpsHandler 中 requestEventBus + logBroadcaster 字段,简化 NewOpsHandler 签名 保留: - /admin/ops/ws/qps (前端 QPS 监控仍在用) - /admin/ops/realtime-traffic (前端在用) - OpsErrorLoggerMiddleware (与本次无关) 签名变更: - NewOpsHandler(opsService) — 移除 requestEventBus, logBroadcaster - NewGatewayHandler(...): 移除 requestEventBus 末位参数 - ProvideRouter / SetupRouter / registerRoutes / RegisterGatewayRoutes / RegisterWindsurfGatewayRoutes: 移除 opsLogBroadcaster 参数 - 同步更新 wire_gen.go + 测试调用点 验证: - 后端 go build/vet 通过 - 前端 pnpm run build 通过 (9.48s) - 测试: 2 个 baseline 既存失败 (TestProxyImportData..., TestWindsurfTierAccessService_Snapshot_HappyPath) 与本次无关 --- backend/cmd/server/wire_gen.go | 8 +- backend/internal/handler/admin/ops_handler.go | 8 +- .../handler/admin/ops_log_stream_handler.go | 210 ---------- .../admin/ops_runtime_logging_handler_test.go | 6 +- .../admin/ops_system_log_handler_test.go | 28 +- .../handler/admin/ops_ws_requests_handler.go | 198 --------- backend/internal/handler/gateway_handler.go | 35 -- .../handler/ops_log_stream_middleware.go | 100 ----- backend/internal/server/http.go | 3 +- backend/internal/server/router.go | 8 +- backend/internal/server/routes/admin.go | 5 +- backend/internal/server/routes/gateway.go | 3 - .../internal/server/routes/gateway_test.go | 1 - .../server/routes/windsurf_gateway.go | 3 - .../internal/service/ops_log_broadcaster.go | 237 ----------- .../service/ops_log_broadcaster_test.go | 267 ------------ backend/internal/service/request_event_bus.go | 75 ---- .../service/request_event_bus_test.go | 100 ----- backend/internal/service/wire.go | 13 - frontend/src/api/admin/ops.ts | 185 +-------- frontend/src/components/layout/AppSidebar.vue | 1 - frontend/src/i18n/locales/en.ts | 34 -- frontend/src/i18n/locales/zh.ts | 34 -- frontend/src/router/index.ts | 12 - .../src/views/admin/ops/OpsLogStreamView.vue | 390 ------------------ 25 files changed, 29 insertions(+), 1935 deletions(-) delete mode 100644 backend/internal/handler/admin/ops_log_stream_handler.go delete mode 100644 backend/internal/handler/admin/ops_ws_requests_handler.go delete mode 100644 backend/internal/handler/ops_log_stream_middleware.go delete mode 100644 backend/internal/service/ops_log_broadcaster.go delete mode 100644 backend/internal/service/ops_log_broadcaster_test.go delete mode 100644 backend/internal/service/request_event_bus.go delete mode 100644 backend/internal/service/request_event_bus_test.go delete mode 100644 frontend/src/views/admin/ops/OpsLogStreamView.vue diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index af535d5c..3e72a49d 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -207,9 +207,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { defaultLoadBalancer := payment.ProvideDefaultLoadBalancer(client, encryptionKey) paymentService := service.ProvidePaymentService(client, registry, defaultLoadBalancer, redeemService, subscriptionService, paymentConfigService, userRepository, groupRepository, affiliateService, notificationEmailService) settingHandler := handler.ProvideAdminSettingHandler(settingService, emailService, turnstileService, opsService, paymentConfigService, paymentService, userAttributeService, notificationEmailService) - requestEventBus := service.NewRequestEventBus() - opsLogBroadcaster := service.ProvideOpsLogBroadcaster() - opsHandler := admin.NewOpsHandler(opsService, requestEventBus, opsLogBroadcaster) + opsHandler := admin.NewOpsHandler(opsService) updateCache := repository.NewUpdateCache(redisClient) gitHubReleaseClient := repository.ProvideGitHubReleaseClient(configConfig) serviceBuildInfo := provideServiceBuildInfo(buildInfo) @@ -252,7 +250,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { usageRecordWorkerPool := service.NewUsageRecordWorkerPool(configConfig) userMsgQueueCache := repository.NewUserMsgQueueCache(redisClient) userMessageQueueService := service.ProvideUserMessageQueueService(userMsgQueueCache, rpmCache, configConfig) - gatewayHandler := handler.NewGatewayHandler(gatewayService, geminiMessagesCompatService, antigravityGatewayService, windsurfGatewayService, userService, concurrencyService, billingCacheService, usageService, apiKeyService, usageRecordWorkerPool, errorPassthroughService, contentModerationService, userMessageQueueService, configConfig, settingService, requestEventBus) + gatewayHandler := handler.NewGatewayHandler(gatewayService, geminiMessagesCompatService, antigravityGatewayService, windsurfGatewayService, userService, concurrencyService, billingCacheService, usageService, apiKeyService, usageRecordWorkerPool, errorPassthroughService, contentModerationService, userMessageQueueService, configConfig, settingService) openAIGatewayHandler := handler.NewOpenAIGatewayHandler(openAIGatewayService, concurrencyService, billingCacheService, apiKeyService, usageRecordWorkerPool, errorPassthroughService, contentModerationService, configConfig) handlerSettingHandler := handler.ProvideSettingHandler(settingService, buildInfo, notificationEmailService) totpHandler := handler.NewTotpHandler(totpService) @@ -266,7 +264,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { adminAuthMiddleware := middleware.NewAdminAuthMiddleware(authService, userService, settingService) apiKeyAuthMiddleware := middleware.NewAPIKeyAuthMiddleware(apiKeyService, subscriptionService, configConfig) healthService := service.NewHealthService(db, redisClient) - engine := server.ProvideRouter(configConfig, handlers, jwtAuthMiddleware, adminAuthMiddleware, apiKeyAuthMiddleware, apiKeyService, subscriptionService, opsService, settingService, healthService, redisClient, opsLogBroadcaster) + engine := server.ProvideRouter(configConfig, handlers, jwtAuthMiddleware, adminAuthMiddleware, apiKeyAuthMiddleware, apiKeyService, subscriptionService, opsService, settingService, healthService, redisClient) httpServer := server.ProvideHTTPServer(configConfig, engine) opsMetricsCollector := service.ProvideOpsMetricsCollector(opsRepository, settingRepository, accountRepository, concurrencyService, db, redisClient, configConfig) opsAggregationService := service.ProvideOpsAggregationService(opsRepository, settingRepository, db, redisClient, configConfig) diff --git a/backend/internal/handler/admin/ops_handler.go b/backend/internal/handler/admin/ops_handler.go index 7e05fcbd..418c302f 100644 --- a/backend/internal/handler/admin/ops_handler.go +++ b/backend/internal/handler/admin/ops_handler.go @@ -14,9 +14,7 @@ import ( ) type OpsHandler struct { - opsService *service.OpsService - requestEventBus *service.RequestEventBus - logBroadcaster *service.OpsLogBroadcaster + opsService *service.OpsService } // GetErrorLogByID returns ops error log detail. @@ -70,8 +68,8 @@ func parseOpsViewParam(c *gin.Context) string { } } -func NewOpsHandler(opsService *service.OpsService, requestEventBus *service.RequestEventBus, logBroadcaster *service.OpsLogBroadcaster) *OpsHandler { - return &OpsHandler{opsService: opsService, requestEventBus: requestEventBus, logBroadcaster: logBroadcaster} +func NewOpsHandler(opsService *service.OpsService) *OpsHandler { + return &OpsHandler{opsService: opsService} } // GetErrorLogs lists ops error logs. diff --git a/backend/internal/handler/admin/ops_log_stream_handler.go b/backend/internal/handler/admin/ops_log_stream_handler.go deleted file mode 100644 index fd41f3ef..00000000 --- a/backend/internal/handler/admin/ops_log_stream_handler.go +++ /dev/null @@ -1,210 +0,0 @@ -package admin - -import ( - "encoding/json" - "fmt" - "io" - "net/http" - "strconv" - "strings" - "time" - - "github.com/Wei-Shaw/sub2api/internal/pkg/response" - "github.com/Wei-Shaw/sub2api/internal/service" - "github.com/gin-gonic/gin" -) - -const ( - opsLogStreamHeartbeat = 25 * time.Second - opsLogStreamRecentMax = 200 - opsLogStreamSubBufEntries = 1024 - opsLogStreamModelMaxLen = 256 -) - -// LogStream serves a Server-Sent Events feed of every gateway request. -// -// GET /api/v1/admin/ops/logs/stream?min_status=400&model=glm-4.7&account_id=42&min_latency_ms=2000 -// -// Filter query params (all optional, AND-combined): -// -// min_status — int only emit when entry.status >= this value -// model — exact match on model key -// account_id — int64 -// group_id — int64 -// min_latency_ms — int64 only emit when entry.latency_ms >= this value -// -// The handler keeps the connection open until the client disconnects, the -// monitoring is disabled, or the broadcaster is torn down. A heartbeat -// comment line is sent every 25s so reverse proxies don't time out idle -// streams. -func (h *OpsHandler) LogStream(c *gin.Context) { - if h.logBroadcaster == nil { - response.Error(c, http.StatusServiceUnavailable, "log broadcaster not configured") - return - } - // nil opsService is allowed for lightweight deployments / tests where - // the OpsService dependency is intentionally absent. The admin auth - // middleware on the route group still enforces JWT + admin role, so - // the stream is never reachable anonymously. - if h.opsService != nil { - if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil { - response.ErrorFrom(c, err) - return - } - } - - filter, err := parseOpsLogFilter(c) - if err != nil { - response.BadRequest(c, err.Error()) - return - } - - c.Header("Content-Type", "text/event-stream") - c.Header("Cache-Control", "no-cache") - c.Header("Connection", "keep-alive") - c.Header("X-Accel-Buffering", "no") - - flusher, ok := c.Writer.(http.Flusher) - if !ok { - response.Error(c, http.StatusInternalServerError, "streaming unsupported") - return - } - - ch, unsubscribe := h.logBroadcaster.Subscribe(filter, opsLogStreamSubBufEntries) - defer unsubscribe() - - // Prime client with recent buffered history (so a fresh dashboard tab - // renders something immediately instead of staying blank). - for _, e := range h.logBroadcaster.Snapshot(filter, opsLogStreamRecentMax) { - if err := writeOpsLogSSE(c.Writer, &e); err != nil { - return - } - } - flusher.Flush() - - heartbeat := time.NewTicker(opsLogStreamHeartbeat) - defer heartbeat.Stop() - - ctxDone := c.Request.Context().Done() - for { - select { - case <-ctxDone: - return - case <-heartbeat.C: - if _, err := io.WriteString(c.Writer, ": ping\n\n"); err != nil { - return - } - flusher.Flush() - case entry := <-ch: - if err := writeOpsLogSSE(c.Writer, &entry); err != nil { - return - } - flusher.Flush() - } - } -} - -// LogStreamRecent returns the broadcaster history without subscribing. -// Useful for one-shot polling when the admin panel cannot keep an open -// SSE connection (e.g. behind a buffering proxy). -// -// GET /api/v1/admin/ops/logs/recent?min_status=400&max=500 -func (h *OpsHandler) LogStreamRecent(c *gin.Context) { - if h.logBroadcaster == nil { - response.Error(c, http.StatusServiceUnavailable, "log broadcaster not configured") - return - } - if h.opsService != nil { - if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil { - response.ErrorFrom(c, err) - return - } - } - filter, err := parseOpsLogFilter(c) - if err != nil { - response.BadRequest(c, err.Error()) - return - } - maxN := opsLogStreamRecentMax - if v := strings.TrimSpace(c.Query("max")); v != "" { - if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 2000 { - maxN = n - } - } - - published, dropped, subs := h.logBroadcaster.Stats() - response.Success(c, gin.H{ - "entries": h.logBroadcaster.Snapshot(filter, maxN), - "published_total": published, - "dropped_total": dropped, - "subscribers": subs, - }) -} - -func parseOpsLogFilter(c *gin.Context) (service.OpsLogFilter, error) { - f := service.OpsLogFilter{} - if v := strings.TrimSpace(c.Query("min_status")); v != "" { - n, err := strconv.Atoi(v) - if err != nil || n < 0 { - return f, fmt.Errorf("invalid min_status") - } - f.MinStatus = n - } - if v := strings.TrimSpace(c.Query("model")); v != "" { - // Cap input to keep an authenticated admin from stuffing huge - // strings into long-lived subscription state. - if len(v) > opsLogStreamModelMaxLen { - return f, fmt.Errorf("model too long (max %d)", opsLogStreamModelMaxLen) - } - f.Model = v - } - if v := strings.TrimSpace(c.Query("account_id")); v != "" { - n, err := strconv.ParseInt(v, 10, 64) - // Reject 0 — matches() treats AccountID==0 as "match all", so a - // param of 0 would silently degrade to no-filter without telling - // the user. Demand a positive id (callers wanting all should omit - // the param). - if err != nil || n <= 0 { - return f, fmt.Errorf("invalid account_id") - } - f.AccountID = n - } - if v := strings.TrimSpace(c.Query("group_id")); v != "" { - n, err := strconv.ParseInt(v, 10, 64) - if err != nil || n <= 0 { - return f, fmt.Errorf("invalid group_id") - } - f.GroupID = n - } - if v := strings.TrimSpace(c.Query("min_latency_ms")); v != "" { - n, err := strconv.ParseInt(v, 10, 64) - if err != nil || n < 0 { - return f, fmt.Errorf("invalid min_latency_ms") - } - f.MinLatencyMs = n - } - return f, nil -} - -// writeOpsLogSSE writes one SSE frame: an `event: log` line followed by a -// single `data:` line containing the JSON-encoded entry, terminated by a -// blank line per the SSE protocol. -// -// This assumes the JSON payload contains no bare LF — which holds because -// every string field in OpsLogEntry passes through encoding/json escaping. -// If a future field is added that emits raw bytes (e.g. a []byte body), -// the marshalled output must be split across multiple `data:` lines. -func writeOpsLogSSE(w io.Writer, e *service.OpsLogEntry) error { - payload, err := json.Marshal(e) - if err != nil { - return err - } - if _, err := io.WriteString(w, "event: log\ndata: "); err != nil { - return err - } - if _, err := w.Write(payload); err != nil { - return err - } - _, err = io.WriteString(w, "\n\n") - return err -} diff --git a/backend/internal/handler/admin/ops_runtime_logging_handler_test.go b/backend/internal/handler/admin/ops_runtime_logging_handler_test.go index 723138c2..0e84b4f9 100644 --- a/backend/internal/handler/admin/ops_runtime_logging_handler_test.go +++ b/backend/internal/handler/admin/ops_runtime_logging_handler_test.go @@ -116,7 +116,7 @@ func newRuntimeOpsService(t *testing.T) *service.OpsService { } func TestOpsRuntimeLoggingHandler_GetConfig(t *testing.T) { - h := NewOpsHandler(newRuntimeOpsService(t), nil, nil) + h := NewOpsHandler(newRuntimeOpsService(t)) r := newOpsRuntimeRouter(h, false) w := httptest.NewRecorder() @@ -128,7 +128,7 @@ func TestOpsRuntimeLoggingHandler_GetConfig(t *testing.T) { } func TestOpsRuntimeLoggingHandler_UpdateUnauthorized(t *testing.T) { - h := NewOpsHandler(newRuntimeOpsService(t), nil, nil) + h := NewOpsHandler(newRuntimeOpsService(t)) r := newOpsRuntimeRouter(h, false) body := `{"level":"debug","enable_sampling":false,"sampling_initial":100,"sampling_thereafter":100,"caller":true,"stacktrace_level":"error","retention_days":30}` @@ -142,7 +142,7 @@ func TestOpsRuntimeLoggingHandler_UpdateUnauthorized(t *testing.T) { } func TestOpsRuntimeLoggingHandler_UpdateAndResetSuccess(t *testing.T) { - h := NewOpsHandler(newRuntimeOpsService(t), nil, nil) + h := NewOpsHandler(newRuntimeOpsService(t)) r := newOpsRuntimeRouter(h, true) payload := map[string]any{ diff --git a/backend/internal/handler/admin/ops_system_log_handler_test.go b/backend/internal/handler/admin/ops_system_log_handler_test.go index 9599fc11..7528acd8 100644 --- a/backend/internal/handler/admin/ops_system_log_handler_test.go +++ b/backend/internal/handler/admin/ops_system_log_handler_test.go @@ -35,7 +35,7 @@ func newOpsSystemLogTestRouter(handler *OpsHandler, withUser bool) *gin.Engine { } func TestOpsSystemLogHandler_ListUnavailable(t *testing.T) { - h := NewOpsHandler(nil, nil, nil) + h := NewOpsHandler(nil) r := newOpsSystemLogTestRouter(h, false) w := httptest.NewRecorder() @@ -48,7 +48,7 @@ func TestOpsSystemLogHandler_ListUnavailable(t *testing.T) { func TestOpsSystemLogHandler_ListInvalidUserID(t *testing.T) { svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - h := NewOpsHandler(svc, nil, nil) + h := NewOpsHandler(svc) r := newOpsSystemLogTestRouter(h, false) w := httptest.NewRecorder() @@ -61,7 +61,7 @@ func TestOpsSystemLogHandler_ListInvalidUserID(t *testing.T) { func TestOpsSystemLogHandler_ListInvalidAccountID(t *testing.T) { svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - h := NewOpsHandler(svc, nil, nil) + h := NewOpsHandler(svc) r := newOpsSystemLogTestRouter(h, false) w := httptest.NewRecorder() @@ -76,7 +76,7 @@ func TestOpsSystemLogHandler_ListMonitoringDisabled(t *testing.T) { svc := service.NewOpsService(nil, nil, &config.Config{ Ops: config.OpsConfig{Enabled: false}, }, nil, nil, nil, nil, nil, nil, nil, nil) - h := NewOpsHandler(svc, nil, nil) + h := NewOpsHandler(svc) r := newOpsSystemLogTestRouter(h, false) w := httptest.NewRecorder() @@ -89,7 +89,7 @@ func TestOpsSystemLogHandler_ListMonitoringDisabled(t *testing.T) { func TestOpsSystemLogHandler_ListSuccess(t *testing.T) { svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - h := NewOpsHandler(svc, nil, nil) + h := NewOpsHandler(svc) r := newOpsSystemLogTestRouter(h, false) w := httptest.NewRecorder() @@ -110,7 +110,7 @@ func TestOpsSystemLogHandler_ListSuccess(t *testing.T) { func TestOpsSystemLogHandler_CleanupUnauthorized(t *testing.T) { svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - h := NewOpsHandler(svc, nil, nil) + h := NewOpsHandler(svc) r := newOpsSystemLogTestRouter(h, false) w := httptest.NewRecorder() @@ -124,7 +124,7 @@ func TestOpsSystemLogHandler_CleanupUnauthorized(t *testing.T) { func TestOpsSystemLogHandler_CleanupInvalidPayload(t *testing.T) { svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - h := NewOpsHandler(svc, nil, nil) + h := NewOpsHandler(svc) r := newOpsSystemLogTestRouter(h, true) w := httptest.NewRecorder() @@ -138,7 +138,7 @@ func TestOpsSystemLogHandler_CleanupInvalidPayload(t *testing.T) { func TestOpsSystemLogHandler_CleanupInvalidTime(t *testing.T) { svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - h := NewOpsHandler(svc, nil, nil) + h := NewOpsHandler(svc) r := newOpsSystemLogTestRouter(h, true) w := httptest.NewRecorder() @@ -152,7 +152,7 @@ func TestOpsSystemLogHandler_CleanupInvalidTime(t *testing.T) { func TestOpsSystemLogHandler_CleanupInvalidEndTime(t *testing.T) { svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - h := NewOpsHandler(svc, nil, nil) + h := NewOpsHandler(svc) r := newOpsSystemLogTestRouter(h, true) w := httptest.NewRecorder() @@ -166,7 +166,7 @@ func TestOpsSystemLogHandler_CleanupInvalidEndTime(t *testing.T) { func TestOpsSystemLogHandler_CleanupServiceUnavailable(t *testing.T) { svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - h := NewOpsHandler(svc, nil, nil) + h := NewOpsHandler(svc) r := newOpsSystemLogTestRouter(h, true) w := httptest.NewRecorder() @@ -182,7 +182,7 @@ func TestOpsSystemLogHandler_CleanupMonitoringDisabled(t *testing.T) { svc := service.NewOpsService(nil, nil, &config.Config{ Ops: config.OpsConfig{Enabled: false}, }, nil, nil, nil, nil, nil, nil, nil, nil) - h := NewOpsHandler(svc, nil, nil) + h := NewOpsHandler(svc) r := newOpsSystemLogTestRouter(h, true) w := httptest.NewRecorder() @@ -197,7 +197,7 @@ func TestOpsSystemLogHandler_CleanupMonitoringDisabled(t *testing.T) { func TestOpsSystemLogHandler_Health(t *testing.T) { sink := service.NewOpsSystemLogSink(nil) svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, sink) - h := NewOpsHandler(svc, nil, nil) + h := NewOpsHandler(svc) r := newOpsSystemLogTestRouter(h, false) w := httptest.NewRecorder() @@ -209,7 +209,7 @@ func TestOpsSystemLogHandler_Health(t *testing.T) { } func TestOpsSystemLogHandler_HealthUnavailableAndMonitoringDisabled(t *testing.T) { - h := NewOpsHandler(nil, nil, nil) + h := NewOpsHandler(nil) r := newOpsSystemLogTestRouter(h, false) w := httptest.NewRecorder() @@ -222,7 +222,7 @@ func TestOpsSystemLogHandler_HealthUnavailableAndMonitoringDisabled(t *testing.T svc := service.NewOpsService(nil, nil, &config.Config{ Ops: config.OpsConfig{Enabled: false}, }, nil, nil, nil, nil, nil, nil, nil, nil) - h = NewOpsHandler(svc, nil, nil) + h = NewOpsHandler(svc) r = newOpsSystemLogTestRouter(h, false) w = httptest.NewRecorder() req = httptest.NewRequest(http.MethodGet, "/logs/health", nil) diff --git a/backend/internal/handler/admin/ops_ws_requests_handler.go b/backend/internal/handler/admin/ops_ws_requests_handler.go deleted file mode 100644 index 323018c6..00000000 --- a/backend/internal/handler/admin/ops_ws_requests_handler.go +++ /dev/null @@ -1,198 +0,0 @@ -package admin - -import ( - "context" - "encoding/json" - "net/http" - "sync" - "time" - - "github.com/Wei-Shaw/sub2api/internal/pkg/logger" - "github.com/Wei-Shaw/sub2api/internal/service" - "github.com/gin-gonic/gin" - "github.com/gorilla/websocket" -) - -type requestStreamWSMessage struct { - Type string `json:"type"` - Data service.RequestEvent `json:"data"` -} - -// RequestStreamWSHandler streams real-time request events to WebSocket clients. -// GET /api/v1/admin/ops/ws/requests -// -// Each connected client receives a JSON message per gateway dispatch: -// -// {"type":"request_event","data":{"timestamp":...,"method":"POST","path":"/v1/messages", -// "model":"claude-3-5-sonnet-20241022","account_id":42,"status":"success","latency_ms":1230}} -func (h *OpsHandler) RequestStreamWSHandler(c *gin.Context) { - clientIP := requestClientIP(c.Request) - - if h == nil || h.opsService == nil { - c.JSON(http.StatusServiceUnavailable, gin.H{"error": "ops service not initialized"}) - return - } - if h.requestEventBus == nil { - c.JSON(http.StatusServiceUnavailable, gin.H{"error": "request event bus not initialized"}) - return - } - - if !h.opsService.IsRealtimeMonitoringEnabled(c.Request.Context()) { - conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) - if err != nil { - c.JSON(http.StatusNotFound, gin.H{"error": "ops realtime monitoring is disabled"}) - return - } - closeWS(conn, opsWSCloseRealtimeDisabled, "realtime_disabled") - return - } - - if !tryAcquireOpsWSTotalSlot(opsWSLimits.MaxConns) { - logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] connection limit reached: %d/%d", wsConnCount.Load(), opsWSLimits.MaxConns) - c.JSON(http.StatusServiceUnavailable, gin.H{"error": "too many connections"}) - return - } - defer func() { - if wsConnCount.Add(-1) == 0 { - scheduleQPSWSIdleStop() - } - }() - - if opsWSLimits.MaxConnsPerIP > 0 && clientIP != "" { - if !tryAcquireOpsWSIPSlot(clientIP, opsWSLimits.MaxConnsPerIP) { - logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] per-ip limit reached: ip=%s limit=%d", clientIP, opsWSLimits.MaxConnsPerIP) - c.JSON(http.StatusServiceUnavailable, gin.H{"error": "too many connections"}) - return - } - defer releaseOpsWSIPSlot(clientIP) - } - - conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) - if err != nil { - logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] upgrade failed: %v", err) - return - } - defer func() { _ = conn.Close() }() - - handleRequestStreamWebSocket(c.Request.Context(), conn, h.requestEventBus) -} - -func handleRequestStreamWebSocket(parentCtx context.Context, conn *websocket.Conn, bus *service.RequestEventBus) { - if conn == nil || bus == nil { - return - } - - ctx, cancel := context.WithCancel(parentCtx) - defer cancel() - - subID, eventCh := bus.Subscribe() - defer bus.Unsubscribe(subID) - - var closeOnce sync.Once - closeConn := func() { - closeOnce.Do(func() { _ = conn.Close() }) - } - - closeFrameCh := make(chan []byte, 1) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - defer cancel() - - conn.SetReadLimit(qpsWSMaxReadBytes) - if err := conn.SetReadDeadline(time.Now().Add(qpsWSPongWait)); err != nil { - logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] set read deadline failed: %v", err) - return - } - conn.SetPongHandler(func(string) error { - return conn.SetReadDeadline(time.Now().Add(qpsWSPongWait)) - }) - conn.SetCloseHandler(func(code int, text string) error { - select { - case closeFrameCh <- websocket.FormatCloseMessage(code, text): - default: - } - cancel() - return nil - }) - - for { - _, _, err := conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) { - logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] read failed: %v", err) - } - return - } - } - }() - - pingTicker := time.NewTicker(qpsWSPingInterval) - defer pingTicker.Stop() - - writeWithTimeout := func(messageType int, data []byte) error { - if err := conn.SetWriteDeadline(time.Now().Add(qpsWSWriteTimeout)); err != nil { - return err - } - return conn.WriteMessage(messageType, data) - } - - sendClose := func(closeFrame []byte) { - if closeFrame == nil { - closeFrame = websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") - } - _ = writeWithTimeout(websocket.CloseMessage, closeFrame) - } - - for { - select { - case evt, ok := <-eventCh: - if !ok { - // channel closed by Unsubscribe - sendClose(nil) - closeConn() - wg.Wait() - return - } - msg, err := json.Marshal(requestStreamWSMessage{Type: "request_event", Data: evt}) - if err != nil { - continue - } - if err := writeWithTimeout(websocket.TextMessage, msg); err != nil { - logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] write failed: %v", err) - cancel() - closeConn() - wg.Wait() - return - } - - case <-pingTicker.C: - if err := writeWithTimeout(websocket.PingMessage, nil); err != nil { - logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] ping failed: %v", err) - cancel() - closeConn() - wg.Wait() - return - } - - case closeFrame := <-closeFrameCh: - sendClose(closeFrame) - closeConn() - wg.Wait() - return - - case <-ctx.Done(): - var closeFrame []byte - select { - case closeFrame = <-closeFrameCh: - default: - } - sendClose(closeFrame) - closeConn() - wg.Wait() - return - } - } -} diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 406ed819..0902f6e5 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -50,7 +50,6 @@ type GatewayHandler struct { contentModerationService *service.ContentModerationService concurrencyHelper *ConcurrencyHelper userMsgQueueHelper *UserMsgQueueHelper - requestEventBus *service.RequestEventBus maxAccountSwitches int maxAccountSwitchesGemini int cfg *config.Config @@ -74,7 +73,6 @@ func NewGatewayHandler( userMsgQueueService *service.UserMessageQueueService, cfg *config.Config, settingService *service.SettingService, - requestEventBus *service.RequestEventBus, ) *GatewayHandler { pingInterval := time.Duration(0) maxAccountSwitches := 10 @@ -109,7 +107,6 @@ func NewGatewayHandler( contentModerationService: contentModerationService, concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatClaude, pingInterval), userMsgQueueHelper: umqHelper, - requestEventBus: requestEventBus, maxAccountSwitches: maxAccountSwitches, maxAccountSwitchesGemini: maxAccountSwitchesGemini, cfg: cfg, @@ -120,7 +117,6 @@ func NewGatewayHandler( // Messages handles Claude API compatible messages endpoint // POST /v1/messages func (h *GatewayHandler) Messages(c *gin.Context) { - reqStartTime := time.Now() // 从context获取apiKey和user(ApiKeyAuth中间件已设置) apiKey, ok := middleware2.GetAPIKeyFromContext(c) if !ok { @@ -172,25 +168,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) { // 解析渠道级模型映射 channelMapping, _ := h.gatewayService.ResolveChannelMappingAndRestrict(c.Request.Context(), apiKey.GroupID, reqModel) - // 实时请求查看器:记录每次请求的结果(账号、模型、状态、延迟) - var ( - reqEventAccountID int64 - reqEventStatus = "error" - ) - defer func() { - if h.requestEventBus != nil { - h.requestEventBus.Publish(service.RequestEvent{ - Timestamp: reqStartTime, - Method: c.Request.Method, - Path: c.FullPath(), - Model: reqModel, - AccountID: reqEventAccountID, - Status: reqEventStatus, - LatencyMS: time.Since(reqStartTime).Milliseconds(), - }) - } - }() - // 设置 max_tokens=1 + haiku 探测请求标识到 context 中 // 必须在 SetClaudeCodeClientContext 之前设置,因为 ClaudeCodeValidator 需要读取此标识进行绕过判断 if isMaxTokensOneHaikuRequest(reqModel, parsedReq.MaxTokens, reqStream) { @@ -463,8 +440,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) { if accountReleaseFunc != nil { accountReleaseFunc() } - reqEventAccountID = account.ID - reqEventStatus = "rate_limited" h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "RPM rate limit exceeded, please retry later", streamStarted) return } @@ -537,10 +512,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) { return } - // 实时请求查看器:标记 Gemini 路径成功 - reqEventAccountID = account.ID - reqEventStatus = "success" - // RPM 计数递增(Forward 成功后) // 注意:TOCTOU 竞态是已知且可接受的设计权衡,与 WindowCost 一致的 soft-limit 模式。 // 在高并发下可能短暂超出 RPM 限制,但不会导致请求失败。 @@ -750,8 +721,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) { if accountReleaseFunc != nil { accountReleaseFunc() } - reqEventAccountID = account.ID - reqEventStatus = "rate_limited" h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "RPM rate limit exceeded, please retry later", streamStarted) return } @@ -957,10 +926,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) { return } - // 实时请求查看器:标记 Anthropic 路径成功 - reqEventAccountID = account.ID - reqEventStatus = "success" - // RPM 计数递增(Forward 成功后) // 注意:TOCTOU 竞态是已知且可接受的设计权衡,与 WindowCost 一致的 soft-limit 模式。 // 在高并发下可能短暂超出 RPM 限制,但不会导致请求失败。 diff --git a/backend/internal/handler/ops_log_stream_middleware.go b/backend/internal/handler/ops_log_stream_middleware.go deleted file mode 100644 index 0d68d991..00000000 --- a/backend/internal/handler/ops_log_stream_middleware.go +++ /dev/null @@ -1,100 +0,0 @@ -package handler - -import ( - "strings" - "time" - - "github.com/gin-gonic/gin" - - servermiddleware "github.com/Wei-Shaw/sub2api/internal/server/middleware" - "github.com/Wei-Shaw/sub2api/internal/service" -) - -// OpsLogStreamMiddleware fans every gateway request out to the in-memory -// OpsLogBroadcaster so admin tools can subscribe to a real-time SSE feed. -// -// This is intentionally separate from OpsErrorLoggerMiddleware: -// - OpsErrorLoggerMiddleware persists 4xx/5xx into the database for audit. -// - This middleware streams every request (success + failure) for live UX. -// -// The broadcaster.Publish call is non-blocking by design (see the -// implementation): a slow/missing subscriber NEVER stalls the request path. -// Empty broadcaster (nil receiver, or no subscribers) is a no-op. -func OpsLogStreamMiddleware(b *service.OpsLogBroadcaster) gin.HandlerFunc { - if b == nil { - return func(c *gin.Context) { c.Next() } - } - return func(c *gin.Context) { - start := time.Now() - c.Next() - - entry := service.OpsLogEntry{ - Time: start, - Method: c.Request.Method, - Path: c.Request.URL.Path, - Status: c.Writer.Status(), - LatencyMs: time.Since(start).Milliseconds(), - } - - if v, ok := c.Get(opsModelKey); ok { - if s, ok := v.(string); ok { - entry.Model = s - } - } - if v, ok := c.Get(opsStreamKey); ok { - if streamFlag, ok := v.(bool); ok { - entry.Stream = streamFlag - } - } - if v, ok := c.Get(opsAccountIDKey); ok { - switch t := v.(type) { - case int64: - entry.AccountID = t - case int: - entry.AccountID = int64(t) - } - } - - // Best-effort api-key + group + user from middleware context. - if apiKey, ok := servermiddleware.GetAPIKeyFromContext(c); ok && apiKey != nil { - entry.APIKeyID = apiKey.ID - if apiKey.GroupID != nil { - entry.GroupID = *apiKey.GroupID - } - entry.UserID = apiKey.UserID - } - - // Pull upstream error context (set by gateway services on retries). - if v, ok := c.Get(service.OpsUpstreamStatusCodeKey); ok { - switch t := v.(type) { - case int: - entry.UpstreamCode = t - case int64: - entry.UpstreamCode = int(t) - } - } - if v, ok := c.Get(service.OpsUpstreamErrorMessageKey); ok { - if s, ok := v.(string); ok { - entry.ErrorMessage = trimForStream(s) - } - } - if v, ok := c.Get(service.OpsUpstreamErrorDetailKey); ok { - if s, ok := v.(string); ok { - entry.ErrorDetail = trimForStream(s) - } - } - - b.Publish(entry) - } -} - -// trimForStream caps long error strings so a single broken upstream cannot -// flood the SSE channel with megabyte-sized error blobs. -func trimForStream(s string) string { - const max = 512 - s = strings.TrimSpace(s) - if len(s) > max { - return s[:max] + "…" - } - return s -} diff --git a/backend/internal/server/http.go b/backend/internal/server/http.go index 01f231e7..fad4d8e0 100644 --- a/backend/internal/server/http.go +++ b/backend/internal/server/http.go @@ -40,7 +40,6 @@ func ProvideRouter( settingService *service.SettingService, healthService *service.HealthService, redisClient *redis.Client, - opsLogBroadcaster *service.OpsLogBroadcaster, ) *gin.Engine { if cfg.Server.Mode == "release" { gin.SetMode(gin.ReleaseMode) @@ -97,7 +96,7 @@ func ProvideRouter( service.SetWebSearchManager(websearch.NewManager(configs, redisClient)) }) - return SetupRouter(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, healthService, cfg, redisClient, opsLogBroadcaster) + return SetupRouter(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, healthService, cfg, redisClient) } // ProvideHTTPServer 提供 HTTP 服务器 diff --git a/backend/internal/server/router.go b/backend/internal/server/router.go index 9fdf3da5..765e6c02 100644 --- a/backend/internal/server/router.go +++ b/backend/internal/server/router.go @@ -33,7 +33,6 @@ func SetupRouter( healthService *service.HealthService, cfg *config.Config, redisClient *redis.Client, - opsLogBroadcaster *service.OpsLogBroadcaster, ) *gin.Engine { // 缓存 iframe 页面的 origin 列表,用于动态注入 CSP frame-src var cachedFrameOrigins atomic.Pointer[[]string] @@ -83,7 +82,7 @@ func SetupRouter( } // 注册路由 - registerRoutes(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, healthService, cfg, redisClient, opsLogBroadcaster) + registerRoutes(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, healthService, cfg, redisClient) return r } @@ -102,7 +101,6 @@ func registerRoutes( healthService *service.HealthService, cfg *config.Config, redisClient *redis.Client, - opsLogBroadcaster *service.OpsLogBroadcaster, ) { // 通用路由(健康检查、状态等) routes.RegisterCommonRoutes(r, healthService) @@ -114,10 +112,10 @@ func registerRoutes( routes.RegisterAuthRoutes(v1, h, jwtAuth, redisClient, settingService) routes.RegisterUserRoutes(v1, h, jwtAuth, settingService) routes.RegisterAdminRoutes(v1, h, adminAuth) - routes.RegisterGatewayRoutes(r, h, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, opsLogBroadcaster) + routes.RegisterGatewayRoutes(r, h, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg) // Windsurf gateway routes - routes.RegisterWindsurfGatewayRoutes(r, h, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, opsLogBroadcaster) + routes.RegisterWindsurfGatewayRoutes(r, h, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg) routes.RegisterPaymentRoutes(v1, h.Payment, h.PaymentWebhook, h.Admin.Payment, jwtAuth, adminAuth, settingService) diff --git a/backend/internal/server/routes/admin.go b/backend/internal/server/routes/admin.go index 65eed1ee..67164cb9 100644 --- a/backend/internal/server/routes/admin.go +++ b/backend/internal/server/routes/admin.go @@ -132,8 +132,6 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) { ops.GET("/user-concurrency", h.Admin.Ops.GetUserConcurrencyStats) ops.GET("/account-availability", h.Admin.Ops.GetAccountAvailability) ops.GET("/realtime-traffic", h.Admin.Ops.GetRealtimeTrafficSummary) - ops.GET("/logs/stream", h.Admin.Ops.LogStream) - ops.GET("/logs/recent", h.Admin.Ops.LogStreamRecent) // Alerts (rules + events) ops.GET("/alert-rules", h.Admin.Ops.ListAlertRules) @@ -170,11 +168,10 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) { settings.PUT("/metric-thresholds", h.Admin.Ops.UpdateMetricThresholds) } - // WebSocket realtime (QPS/TPS and request stream) + // WebSocket realtime (QPS/TPS) ws := ops.Group("/ws") { ws.GET("/qps", h.Admin.Ops.QPSWSHandler) - ws.GET("/requests", h.Admin.Ops.RequestStreamWSHandler) } // Error logs (legacy) diff --git a/backend/internal/server/routes/gateway.go b/backend/internal/server/routes/gateway.go index b773697b..9541cda1 100644 --- a/backend/internal/server/routes/gateway.go +++ b/backend/internal/server/routes/gateway.go @@ -21,12 +21,10 @@ func RegisterGatewayRoutes( opsService *service.OpsService, settingService *service.SettingService, cfg *config.Config, - opsLogBroadcaster *service.OpsLogBroadcaster, ) { bodyLimit := middleware.RequestBodyLimit(cfg.Gateway.MaxBodySize) clientRequestID := middleware.ClientRequestID() opsErrorLogger := handler.OpsErrorLoggerMiddleware(opsService) - opsLogStream := handler.OpsLogStreamMiddleware(opsLogBroadcaster) endpointNorm := handler.InboundEndpointMiddleware() // 未分组 Key 拦截中间件(按协议格式区分错误响应) @@ -38,7 +36,6 @@ func RegisterGatewayRoutes( gateway.Use(bodyLimit) gateway.Use(clientRequestID) gateway.Use(opsErrorLogger) - gateway.Use(opsLogStream) gateway.Use(endpointNorm) gateway.Use(gin.HandlerFunc(apiKeyAuth)) gateway.Use(requireGroupAnthropic) diff --git a/backend/internal/server/routes/gateway_test.go b/backend/internal/server/routes/gateway_test.go index 02abd828..19ef5686 100644 --- a/backend/internal/server/routes/gateway_test.go +++ b/backend/internal/server/routes/gateway_test.go @@ -37,7 +37,6 @@ func newGatewayRoutesTestRouter() *gin.Engine { nil, nil, &config.Config{}, - nil, ) return router diff --git a/backend/internal/server/routes/windsurf_gateway.go b/backend/internal/server/routes/windsurf_gateway.go index 21f3dd7e..c0cd9bfb 100644 --- a/backend/internal/server/routes/windsurf_gateway.go +++ b/backend/internal/server/routes/windsurf_gateway.go @@ -18,7 +18,6 @@ func RegisterWindsurfGatewayRoutes( opsService *service.OpsService, settingService *service.SettingService, cfg *config.Config, - opsLogBroadcaster *service.OpsLogBroadcaster, ) { if h.Gateway == nil { return @@ -27,7 +26,6 @@ func RegisterWindsurfGatewayRoutes( bodyLimit := middleware.RequestBodyLimit(cfg.Gateway.MaxBodySize) clientRequestID := middleware.ClientRequestID() opsErrorLogger := handler.OpsErrorLoggerMiddleware(opsService) - opsLogStream := handler.OpsLogStreamMiddleware(opsLogBroadcaster) endpointNorm := handler.InboundEndpointMiddleware() requireGroupAnthropic := middleware.RequireGroupAssignment(settingService, middleware.AnthropicErrorWriter) @@ -35,7 +33,6 @@ func RegisterWindsurfGatewayRoutes( windsurfV1.Use(bodyLimit) windsurfV1.Use(clientRequestID) windsurfV1.Use(opsErrorLogger) - windsurfV1.Use(opsLogStream) windsurfV1.Use(endpointNorm) windsurfV1.Use(middleware.ForcePlatform(service.PlatformWindsurf)) windsurfV1.Use(gin.HandlerFunc(apiKeyAuth)) diff --git a/backend/internal/service/ops_log_broadcaster.go b/backend/internal/service/ops_log_broadcaster.go deleted file mode 100644 index ae25f780..00000000 --- a/backend/internal/service/ops_log_broadcaster.go +++ /dev/null @@ -1,237 +0,0 @@ -// Package service exposes domain services. opslog provides a lightweight -// in-memory pub/sub for streaming admin log events without persisting them. -package service - -import ( - "sync" - "sync/atomic" - "time" -) - -// OpsLogEntry is one streamed log event. All fields are optional except Time -// — any missing data simply renders as empty in the admin UI. -type OpsLogEntry struct { - Time time.Time `json:"time"` - Method string `json:"method,omitempty"` - Path string `json:"path,omitempty"` - Status int `json:"status"` - LatencyMs int64 `json:"latency_ms"` - Model string `json:"model,omitempty"` - Stream bool `json:"stream,omitempty"` - AccountID int64 `json:"account_id,omitempty"` - GroupID int64 `json:"group_id,omitempty"` - APIKeyID int64 `json:"api_key_id,omitempty"` - UserID int64 `json:"user_id,omitempty"` - Turns int `json:"turns,omitempty"` - PromptChars int `json:"prompt_chars,omitempty"` - ErrorMessage string `json:"error_message,omitempty"` - ErrorDetail string `json:"error_detail,omitempty"` - UpstreamCode int `json:"upstream_status,omitempty"` -} - -// OpsLogFilter restricts which entries a subscriber receives. Empty fields -// match everything; non-empty fields are AND-combined. -type OpsLogFilter struct { - MinStatus int - Model string - AccountID int64 - GroupID int64 - MinLatencyMs int64 -} - -// matches reports whether the entry passes the filter. -func (f OpsLogFilter) matches(e *OpsLogEntry) bool { - if f.MinStatus > 0 && e.Status < f.MinStatus { - return false - } - if f.Model != "" && e.Model != f.Model { - return false - } - if f.AccountID > 0 && e.AccountID != f.AccountID { - return false - } - if f.GroupID > 0 && e.GroupID != f.GroupID { - return false - } - if f.MinLatencyMs > 0 && e.LatencyMs < f.MinLatencyMs { - return false - } - return true -} - -// OpsLogBroadcaster is a lock-free-ish fan-out broadcaster with a bounded -// ring buffer for history (so freshly connected clients can prime their UI) -// and per-subscriber non-blocking sends (so a slow client never stalls a -// publish on the hot request path). -type OpsLogBroadcaster struct { - subsMu sync.RWMutex - subscribers map[int64]*opsLogSubscription - nextID atomic.Int64 - - historyMu sync.Mutex - history []OpsLogEntry - histHead int - histLen int - histCap int - - // publishedTotal / droppedTotal expose simple ops counters for an - // admin dashboard cell. Atomic so callers don't need to lock. - publishedTotal atomic.Int64 - droppedTotal atomic.Int64 -} - -type opsLogSubscription struct { - ch chan OpsLogEntry - filter OpsLogFilter - // closed is set atomically by unsubscribe() before any cleanup. Publish - // reads this flag under subsMu.RLock and skips closed subscriptions - // instead of attempting a send-on-closed-channel (which would panic). - closed atomic.Bool -} - -// NewOpsLogBroadcaster constructs a broadcaster. historyCap controls how -// many recent entries are kept for newly connected clients; 1000 is a sane -// default. Pass historyCap<=0 to disable the buffer entirely. -func NewOpsLogBroadcaster(historyCap int) *OpsLogBroadcaster { - if historyCap < 0 { - historyCap = 0 - } - b := &OpsLogBroadcaster{ - subscribers: make(map[int64]*opsLogSubscription), - histCap: historyCap, - } - if historyCap > 0 { - b.history = make([]OpsLogEntry, historyCap) - } - return b -} - -// Publish fans the entry out to every matching subscriber and appends it to -// the history buffer. Never blocks: if a subscriber's channel is full, the -// entry is dropped for that subscriber and the broadcaster's drop counter -// is incremented. Hot-path safe. -// -// The same entry value is delivered (by value) to all subscribers and to -// the ring buffer — no shared mutable pointer is leaked, so subscribers -// holding references to past entries cannot observe later mutations. -func (b *OpsLogBroadcaster) Publish(entry OpsLogEntry) { - if entry.Time.IsZero() { - entry.Time = time.Now() - } - b.publishedTotal.Add(1) - - b.appendHistory(entry) - - b.subsMu.RLock() - subs := make([]*opsLogSubscription, 0, len(b.subscribers)) - for _, s := range b.subscribers { - subs = append(subs, s) - } - b.subsMu.RUnlock() - - for _, s := range subs { - // Skip subscriptions that have been unsubscribed since we snapped - // the list. Without this check, a concurrent unsubscribe → close(ch) - // would race the send below and panic on send-to-closed-channel. - if s.closed.Load() { - continue - } - if !s.filter.matches(&entry) { - continue - } - select { - case s.ch <- entry: - default: - b.droppedTotal.Add(1) - } - } -} - -// Subscribe registers a new listener. The returned channel is buffered; -// callers MUST drain it. Cancel by calling the returned unsubscribe func -// (idempotent and safe from any goroutine). -// -// IMPORTANT: unsubscribe does NOT close the channel. Closing it would race -// with concurrent Publish goroutines that may already be holding a snapshot -// of the subscription pointer (causing a panic on send-to-closed-channel). -// Instead, unsubscribe (a) sets the closed atomic flag — Publish skips sends -// to flagged subs — and (b) removes from the subscriber map. Any in-flight -// send that slips past the flag check still proceeds harmlessly into the -// channel buffer and is garbage-collected with the channel once the caller -// drops its reference. Subscribers that need to know the broadcaster is -// done with them should rely on the parent ctx, not channel close. -func (b *OpsLogBroadcaster) Subscribe(filter OpsLogFilter, bufSize int) (<-chan OpsLogEntry, func()) { - if bufSize <= 0 { - bufSize = 1024 - } - id := b.nextID.Add(1) - sub := &opsLogSubscription{ - ch: make(chan OpsLogEntry, bufSize), - filter: filter, - } - b.subsMu.Lock() - b.subscribers[id] = sub - b.subsMu.Unlock() - - var unsubOnce sync.Once - unsubscribe := func() { - unsubOnce.Do(func() { - sub.closed.Store(true) - b.subsMu.Lock() - delete(b.subscribers, id) - b.subsMu.Unlock() - }) - } - return sub.ch, unsubscribe -} - -// Snapshot returns a copy of the recent history (oldest → newest), filtered -// by the given filter. Used by /admin/ops/logs/recent to prime newly opened -// dashboards before live events arrive. -func (b *OpsLogBroadcaster) Snapshot(filter OpsLogFilter, maxEntries int) []OpsLogEntry { - b.historyMu.Lock() - defer b.historyMu.Unlock() - - if b.histLen == 0 { - return nil - } - out := make([]OpsLogEntry, 0, b.histLen) - start := b.histHead - b.histLen - if start < 0 { - start += b.histCap - } - for i := 0; i < b.histLen; i++ { - idx := (start + i) % b.histCap - e := b.history[idx] - if !filter.matches(&e) { - continue - } - out = append(out, e) - } - if maxEntries > 0 && len(out) > maxEntries { - out = out[len(out)-maxEntries:] - } - return out -} - -// Stats reports cumulative publish/drop counts and the current subscriber -// count for diagnostics. -func (b *OpsLogBroadcaster) Stats() (published, dropped int64, subscribers int) { - b.subsMu.RLock() - subscribers = len(b.subscribers) - b.subsMu.RUnlock() - return b.publishedTotal.Load(), b.droppedTotal.Load(), subscribers -} - -func (b *OpsLogBroadcaster) appendHistory(e OpsLogEntry) { - if b.histCap == 0 { - return - } - b.historyMu.Lock() - defer b.historyMu.Unlock() - b.history[b.histHead] = e - b.histHead = (b.histHead + 1) % b.histCap - if b.histLen < b.histCap { - b.histLen++ - } -} diff --git a/backend/internal/service/ops_log_broadcaster_test.go b/backend/internal/service/ops_log_broadcaster_test.go deleted file mode 100644 index c6ff261b..00000000 --- a/backend/internal/service/ops_log_broadcaster_test.go +++ /dev/null @@ -1,267 +0,0 @@ -package service - -import ( - "sync" - "testing" - "time" -) - -func TestOpsLogBroadcaster_FanOutDeliversToMatchingSubscribers(t *testing.T) { - b := NewOpsLogBroadcaster(16) - - chHigh, unHigh := b.Subscribe(OpsLogFilter{MinStatus: 500}, 8) - defer unHigh() - chAll, unAll := b.Subscribe(OpsLogFilter{}, 8) - defer unAll() - - b.Publish(OpsLogEntry{Status: 200, Model: "claude-sonnet-4.6"}) - b.Publish(OpsLogEntry{Status: 503, Model: "kimi-k2.5"}) - - got200 := receiveOrTimeout(t, chAll, 200*time.Millisecond) - got503 := receiveOrTimeout(t, chAll, 200*time.Millisecond) - if got200.Status != 200 || got503.Status != 503 { - t.Fatalf("unexpected fan-out: %d / %d", got200.Status, got503.Status) - } - - gotHigh := receiveOrTimeout(t, chHigh, 200*time.Millisecond) - if gotHigh.Status != 503 { - t.Fatalf("filter MinStatus=500 should drop 200, got %d", gotHigh.Status) - } - expectNoMessage(t, chHigh, 50*time.Millisecond) -} - -func TestOpsLogBroadcaster_FilterByModelAndAccount(t *testing.T) { - b := NewOpsLogBroadcaster(0) - - chKimi, unKimi := b.Subscribe(OpsLogFilter{Model: "kimi-k2.5"}, 4) - defer unKimi() - chAcct, unAcct := b.Subscribe(OpsLogFilter{AccountID: 42}, 4) - defer unAcct() - - b.Publish(OpsLogEntry{Status: 200, Model: "claude-sonnet-4.6", AccountID: 1}) - b.Publish(OpsLogEntry{Status: 200, Model: "kimi-k2.5", AccountID: 42}) - - got := receiveOrTimeout(t, chKimi, 200*time.Millisecond) - if got.Model != "kimi-k2.5" { - t.Fatalf("expected kimi entry, got %+v", got) - } - expectNoMessage(t, chKimi, 50*time.Millisecond) - - gotA := receiveOrTimeout(t, chAcct, 200*time.Millisecond) - if gotA.AccountID != 42 { - t.Fatalf("expected account 42, got %d", gotA.AccountID) - } -} - -func TestOpsLogBroadcaster_NeverBlocksOnSlowSubscriber(t *testing.T) { - b := NewOpsLogBroadcaster(0) - - // Subscriber with buffer=1, never reads. After the second Publish, - // the entry must be dropped instead of blocking the publisher. - _, unsub := b.Subscribe(OpsLogFilter{}, 1) - defer unsub() - - done := make(chan struct{}) - go func() { - for i := 0; i < 100; i++ { - b.Publish(OpsLogEntry{Status: 200}) - } - close(done) - }() - - select { - case <-done: - case <-time.After(2 * time.Second): - t.Fatal("publisher blocked on slow subscriber") - } - - _, dropped, _ := b.Stats() - if dropped == 0 { - t.Fatal("expected dropped count > 0 when subscriber buffer overflows") - } -} - -func TestOpsLogBroadcaster_HistorySnapshot(t *testing.T) { - b := NewOpsLogBroadcaster(3) - - for i := 1; i <= 5; i++ { - b.Publish(OpsLogEntry{Status: 200 + i}) - } - - got := b.Snapshot(OpsLogFilter{}, 0) - if len(got) != 3 { - t.Fatalf("expected ring of 3, got %d", len(got)) - } - // Oldest → newest - if got[0].Status != 203 || got[1].Status != 204 || got[2].Status != 205 { - t.Fatalf("expected 203/204/205, got %d/%d/%d", got[0].Status, got[1].Status, got[2].Status) - } -} - -func TestOpsLogBroadcaster_HistoryAppliesFilter(t *testing.T) { - b := NewOpsLogBroadcaster(8) - - b.Publish(OpsLogEntry{Status: 200}) - b.Publish(OpsLogEntry{Status: 500}) - b.Publish(OpsLogEntry{Status: 503}) - - got := b.Snapshot(OpsLogFilter{MinStatus: 500}, 0) - if len(got) != 2 { - t.Fatalf("expected 2 high-status entries, got %d: %+v", len(got), got) - } -} - -func TestOpsLogBroadcaster_UnsubscribeIdempotent(t *testing.T) { - b := NewOpsLogBroadcaster(0) - _, unsub := b.Subscribe(OpsLogFilter{}, 1) - unsub() - unsub() // second call must not panic - unsub() // and a third -} - -func TestOpsLogBroadcaster_ZeroTimeFilledIn(t *testing.T) { - b := NewOpsLogBroadcaster(2) - - b.Publish(OpsLogEntry{Status: 200}) // Time intentionally zero - got := b.Snapshot(OpsLogFilter{}, 0) - if len(got) != 1 { - t.Fatalf("expected 1 entry, got %d", len(got)) - } - if got[0].Time.IsZero() { - t.Fatal("Publish should populate zero Time with time.Now()") - } -} - -func TestOpsLogBroadcaster_ConcurrentSafe(t *testing.T) { - b := NewOpsLogBroadcaster(64) - - // Spin a few subscribers and producers; rely on -race to surface - // any concurrency bugs in the fan-out path. - var wg sync.WaitGroup - for i := 0; i < 4; i++ { - ch, unsub := b.Subscribe(OpsLogFilter{}, 32) - wg.Add(1) - go func() { - defer wg.Done() - defer unsub() - deadline := time.Now().Add(200 * time.Millisecond) - for time.Now().Before(deadline) { - select { - case <-ch: - case <-time.After(5 * time.Millisecond): - } - } - }() - } - - for i := 0; i < 4; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for j := 0; j < 200; j++ { - b.Publish(OpsLogEntry{Status: 200, Model: "x"}) - } - }() - } - - wg.Wait() - pub, _, _ := b.Stats() - if pub != 800 { - t.Fatalf("expected 800 publishes, got %d", pub) - } -} - -// TestOpsLogBroadcaster_ConcurrentUnsubscribeNoPanic exercises the exact -// race the audit identified: a Publish goroutine has snapped a subscription -// pointer while another goroutine unsubscribes (close(ch)) the moment before -// the send. Without the closed-flag guard in Publish, this races into -// "send on closed channel" and panics. With the guard, Publish observes -// closed=true and skips the send. Run with -race. -func TestOpsLogBroadcaster_ConcurrentUnsubscribeNoPanic(t *testing.T) { - b := NewOpsLogBroadcaster(0) - - var wg sync.WaitGroup - for i := 0; i < 8; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for j := 0; j < 200; j++ { - ch, unsub := b.Subscribe(OpsLogFilter{}, 1) - // Drain non-blockingly until the next publish lands. - done := make(chan struct{}) - go func() { - defer close(done) - timer := time.NewTimer(50 * time.Millisecond) - defer timer.Stop() - for { - select { - case <-ch: - case <-timer.C: - return - } - } - }() - b.Publish(OpsLogEntry{Status: 200}) - unsub() - b.Publish(OpsLogEntry{Status: 200}) // post-unsub publish must not panic - <-done - } - }() - } - wg.Wait() -} - -// TestOpsLogBroadcaster_SnapshotConcurrentWithPublish ensures Snapshot is -// safe under concurrent Publish (verifies subsMu vs historyMu coexistence). -func TestOpsLogBroadcaster_SnapshotConcurrentWithPublish(t *testing.T) { - b := NewOpsLogBroadcaster(32) - - stop := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(2) - - go func() { - defer wg.Done() - for { - select { - case <-stop: - return - default: - b.Publish(OpsLogEntry{Status: 200}) - } - } - }() - - go func() { - defer wg.Done() - for i := 0; i < 200; i++ { - _ = b.Snapshot(OpsLogFilter{}, 0) - } - }() - - time.Sleep(50 * time.Millisecond) - close(stop) - wg.Wait() -} - -// helpers -------------------------------------------------------------- - -func receiveOrTimeout(t *testing.T, ch <-chan OpsLogEntry, d time.Duration) OpsLogEntry { - t.Helper() - select { - case e := <-ch: - return e - case <-time.After(d): - t.Fatalf("timeout waiting for entry after %s", d) - } - return OpsLogEntry{} -} - -func expectNoMessage(t *testing.T, ch <-chan OpsLogEntry, d time.Duration) { - t.Helper() - select { - case e := <-ch: - t.Fatalf("unexpected message: %+v", e) - case <-time.After(d): - } -} diff --git a/backend/internal/service/request_event_bus.go b/backend/internal/service/request_event_bus.go deleted file mode 100644 index 0664f71f..00000000 --- a/backend/internal/service/request_event_bus.go +++ /dev/null @@ -1,75 +0,0 @@ -package service - -import ( - "sync" - "sync/atomic" - "time" -) - -const requestEventBufSize = 64 - -// RequestEvent is published for every gateway dispatch completion. -type RequestEvent struct { - Timestamp time.Time `json:"timestamp"` - Method string `json:"method"` - Path string `json:"path"` - Model string `json:"model"` - AccountID int64 `json:"account_id"` - // Status is "success", "error", or "rate_limited". - Status string `json:"status"` - LatencyMS int64 `json:"latency_ms"` -} - -// RequestEventBus is a fan-out hub for real-time request events. -// Publishers call Publish; subscribers call Subscribe/Unsubscribe. -// Each subscriber gets its own buffered channel. If the buffer is full -// the event is dropped for that subscriber (non-blocking publish). -type RequestEventBus struct { - mu sync.RWMutex - subscribers map[uint64]chan RequestEvent - nextID atomic.Uint64 -} - -func NewRequestEventBus() *RequestEventBus { - return &RequestEventBus{ - subscribers: make(map[uint64]chan RequestEvent), - } -} - -// Subscribe registers a new subscriber and returns its ID and a receive-only channel. -func (b *RequestEventBus) Subscribe() (uint64, <-chan RequestEvent) { - id := b.nextID.Add(1) - ch := make(chan RequestEvent, requestEventBufSize) - b.mu.Lock() - b.subscribers[id] = ch - b.mu.Unlock() - return id, ch -} - -// Unsubscribe removes a subscriber and closes its channel. -func (b *RequestEventBus) Unsubscribe(id uint64) { - b.mu.Lock() - ch, ok := b.subscribers[id] - if ok { - delete(b.subscribers, id) - } - b.mu.Unlock() - if ok { - close(ch) - } -} - -// Publish sends an event to all current subscribers without blocking. -func (b *RequestEventBus) Publish(e RequestEvent) { - if b == nil { - return - } - b.mu.RLock() - defer b.mu.RUnlock() - for _, ch := range b.subscribers { - select { - case ch <- e: - default: - } - } -} diff --git a/backend/internal/service/request_event_bus_test.go b/backend/internal/service/request_event_bus_test.go deleted file mode 100644 index 9c26912e..00000000 --- a/backend/internal/service/request_event_bus_test.go +++ /dev/null @@ -1,100 +0,0 @@ -package service - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestRequestEventBus_PublishToSubscriber(t *testing.T) { - bus := NewRequestEventBus() - - id, ch := bus.Subscribe() - defer bus.Unsubscribe(id) - - evt := RequestEvent{Model: "claude-3", Status: "success", LatencyMS: 100} - bus.Publish(evt) - - select { - case got := <-ch: - assert.Equal(t, evt, got) - case <-time.After(time.Second): - t.Fatal("timed out waiting for event") - } -} - -func TestRequestEventBus_MultipleSubscribers(t *testing.T) { - bus := NewRequestEventBus() - - id1, ch1 := bus.Subscribe() - id2, ch2 := bus.Subscribe() - defer bus.Unsubscribe(id1) - defer bus.Unsubscribe(id2) - - evt := RequestEvent{Model: "claude-3", Status: "error"} - bus.Publish(evt) - - for _, ch := range []<-chan RequestEvent{ch1, ch2} { - select { - case got := <-ch: - assert.Equal(t, evt, got) - case <-time.After(time.Second): - t.Fatal("timed out waiting for event on one subscriber") - } - } -} - -func TestRequestEventBus_UnsubscribeClosesChannel(t *testing.T) { - bus := NewRequestEventBus() - id, ch := bus.Subscribe() - - bus.Unsubscribe(id) - - // Channel should be closed. - _, ok := <-ch - assert.False(t, ok, "channel should be closed after Unsubscribe") -} - -func TestRequestEventBus_UnsubscribedMissesEvents(t *testing.T) { - bus := NewRequestEventBus() - id, _ := bus.Subscribe() - bus.Unsubscribe(id) - - // Publish after unsubscribe should not panic. - require.NotPanics(t, func() { - bus.Publish(RequestEvent{Model: "test"}) - }) -} - -func TestRequestEventBus_DropWhenFull(t *testing.T) { - bus := NewRequestEventBus() - id, ch := bus.Subscribe() - defer bus.Unsubscribe(id) - - // Fill the buffer then publish one more — should drop, not block. - evt := RequestEvent{Model: "model", Status: "success"} - for i := 0; i < requestEventBufSize; i++ { - bus.Publish(evt) - } - // This publish should return immediately (dropped). - done := make(chan struct{}) - go func() { - bus.Publish(evt) - close(done) - }() - select { - case <-done: - case <-time.After(time.Second): - t.Fatal("Publish blocked when buffer was full") - } - assert.Len(t, ch, requestEventBufSize) -} - -func TestRequestEventBus_NilSafePublish(t *testing.T) { - var bus *RequestEventBus - require.NotPanics(t, func() { - bus.Publish(RequestEvent{Model: "test"}) - }) -} diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index cc303d43..b2309e31 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -453,7 +453,6 @@ var ProviderSet = wire.NewSet( NewAnnouncementService, NewAdminService, NewRPMTokenBucketService, - NewRequestEventBus, NewGatewayService, NewOpenAIGatewayService, NewOAuthService, @@ -533,7 +532,6 @@ var ProviderSet = wire.NewSet( ProvideWindsurfRefreshService, ProvideWindsurfProbeService, ProvideWindsurfTierAccessService, - ProvideOpsLogBroadcaster, ProvideChannelMonitorService, ProvideChannelMonitorRunner, NewChannelMonitorRequestTemplateService, @@ -556,17 +554,6 @@ func ProvideWindsurfTierAccessService(cfg *config.Config, accountRepo AccountRep return NewWindsurfTierAccessService(accountRepo) } -// ProvideOpsLogBroadcaster builds the in-memory ops log fan-out. -// -// Always returns a non-nil broadcaster — middleware/handler call sites -// rely on a stable receiver and gracefully no-op when the feature is -// effectively disabled (e.g. monitoring globally turned off via OpsService). -// History capacity is fixed at 1000 entries to bound memory. -func ProvideOpsLogBroadcaster() *OpsLogBroadcaster { - const historyCap = 1000 - return NewOpsLogBroadcaster(historyCap) -} - // ProvideWindsurfLSService creates WindsurfLSService (nil when windsurf is disabled). func ProvideWindsurfLSService(cfg *config.Config) *WindsurfLSService { if !cfg.Windsurf.Enabled { diff --git a/frontend/src/api/admin/ops.ts b/frontend/src/api/admin/ops.ts index fff5014b..69235668 100644 --- a/frontend/src/api/admin/ops.ts +++ b/frontend/src/api/admin/ops.ts @@ -1324,190 +1324,7 @@ export const opsAPI = { updateMetricThresholds, listSystemLogs, cleanupSystemLogs, - getSystemLogSinkHealth, - getRecentOpsLogs, - subscribeOpsLogStream + getSystemLogSinkHealth } export default opsAPI - -// ===== Real-time ops log stream ===================================== - -export interface OpsLogEntry { - time: string - method?: string - path?: string - status: number - latency_ms: number - model?: string - stream?: boolean - account_id?: number - group_id?: number - api_key_id?: number - user_id?: number - turns?: number - prompt_chars?: number - error_message?: string - error_detail?: string - upstream_status?: number -} - -export interface OpsLogFilter { - min_status?: number - model?: string - account_id?: number - group_id?: number - min_latency_ms?: number -} - -export interface OpsLogRecentResponse { - entries: OpsLogEntry[] - published_total: number - dropped_total: number - subscribers: number -} - -function buildLogQuery(filter: OpsLogFilter): string { - const params = new URLSearchParams() - if (filter.min_status && filter.min_status > 0) params.set('min_status', String(filter.min_status)) - if (filter.model) params.set('model', filter.model) - if (filter.account_id && filter.account_id > 0) params.set('account_id', String(filter.account_id)) - if (filter.group_id && filter.group_id > 0) params.set('group_id', String(filter.group_id)) - if (filter.min_latency_ms && filter.min_latency_ms > 0) params.set('min_latency_ms', String(filter.min_latency_ms)) - const qs = params.toString() - return qs ? `?${qs}` : '' -} - -export async function getRecentOpsLogs( - filter: OpsLogFilter = {}, - max?: number -): Promise { - const params: Record = {} - if (filter.min_status) params.min_status = filter.min_status - if (filter.model) params.model = filter.model - if (filter.account_id) params.account_id = filter.account_id - if (filter.group_id) params.group_id = filter.group_id - if (filter.min_latency_ms) params.min_latency_ms = filter.min_latency_ms - if (max) params.max = max - - const { data } = await apiClient.get('/admin/ops/logs/recent', { params }) - return data -} - -export interface SubscribeLogsOptions { - onEntry: (entry: OpsLogEntry) => void - onStatus?: (status: 'connecting' | 'live' | 'closed' | 'error') => void - onError?: (err: Error) => void -} - -export type LogStreamHandle = { - close: () => void -} - -/** - * Subscribe to /admin/ops/logs/stream via fetch + ReadableStream. - * - * EventSource doesn't allow custom headers, so we cannot use it for our - * Bearer-token authenticated SSE endpoint. fetch with `accept: text/event-stream` - * gets us the same wire protocol with Authorization support. - * - * The returned handle is idempotent — calling close() multiple times is safe. - */ -export function subscribeOpsLogStream( - filter: OpsLogFilter, - opts: SubscribeLogsOptions -): LogStreamHandle { - const ctrl = new AbortController() - let closed = false - - const close = () => { - if (closed) return - closed = true - ctrl.abort() - opts.onStatus?.('closed') - } - - const run = async () => { - opts.onStatus?.('connecting') - const baseURL = (apiClient.defaults.baseURL ?? '/api/v1').replace(/\/+$/, '') - const url = `${baseURL}/admin/ops/logs/stream${buildLogQuery(filter)}` - const token = localStorage.getItem('auth_token') ?? '' - - let resp: Response - try { - resp = await fetch(url, { - method: 'GET', - signal: ctrl.signal, - credentials: 'include', - headers: { - accept: 'text/event-stream', - ...(token ? { Authorization: `Bearer ${token}` } : {}) - } - }) - } catch (e: any) { - if (closed || ctrl.signal.aborted) return - opts.onStatus?.('error') - opts.onError?.(e instanceof Error ? e : new Error(String(e))) - return - } - - if (!resp.ok || !resp.body) { - opts.onStatus?.('error') - opts.onError?.(new Error(`SSE ${resp.status} ${resp.statusText}`)) - return - } - - opts.onStatus?.('live') - const reader = resp.body.getReader() - const decoder = new TextDecoder('utf-8') - let buffer = '' - - try { - while (!closed) { - const { done, value } = await reader.read() - if (done) break - // Normalize CRLF and bare CR to LF before scanning. The SSE wire - // format permits \r\n line endings; without this, a trailing \r - // would leak into the JSON payload and silently break JSON.parse. - buffer += decoder.decode(value, { stream: true }).replace(/\r\n?/g, '\n') - - // SSE events are separated by a blank line. - let sep: number - while ((sep = buffer.indexOf('\n\n')) !== -1) { - const rawEvent = buffer.slice(0, sep) - buffer = buffer.slice(sep + 2) - - let dataLine = '' - for (const line of rawEvent.split('\n')) { - if (line.startsWith(':')) continue // comment / heartbeat - if (line.startsWith('data:')) { - // Per SSE spec, multiple `data:` lines in one event are - // joined by '\n', not concatenated. Our JSON-encoded entries - // never contain unescaped LF, but we follow the spec to - // future-proof against a field that emits raw bytes. - const piece = line.slice(5).replace(/^ /, '') - dataLine += dataLine ? '\n' + piece : piece - } - } - if (!dataLine) continue - try { - const parsed = JSON.parse(dataLine) as OpsLogEntry - opts.onEntry(parsed) - } catch { - // skip malformed payload — server uses well-formed JSON, this is defensive - } - } - } - } catch (e: any) { - if (!closed) { - opts.onStatus?.('error') - opts.onError?.(e instanceof Error ? e : new Error(String(e))) - } - } finally { - if (!closed) opts.onStatus?.('closed') - } - } - - void run() - return { close } -} diff --git a/frontend/src/components/layout/AppSidebar.vue b/frontend/src/components/layout/AppSidebar.vue index bede24e9..3d7f1604 100644 --- a/frontend/src/components/layout/AppSidebar.vue +++ b/frontend/src/components/layout/AppSidebar.vue @@ -718,7 +718,6 @@ const adminNavItems = computed((): NavItem[] => { const baseItems: NavItem[] = [ { path: '/admin/dashboard', label: t('nav.dashboard'), icon: DashboardIcon }, { path: '/admin/ops', label: t('nav.ops'), icon: ChartIcon, featureFlag: flagOpsMonitoring }, - { path: '/admin/ops/logs', label: t('nav.opsLogStream'), icon: ChartIcon, featureFlag: flagOpsMonitoring }, { path: '/admin/users', label: t('nav.users'), icon: UsersIcon, hideInSimpleMode: true }, { path: '/admin/groups', label: t('nav.groups'), icon: FolderIcon, hideInSimpleMode: true }, { diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index 0b975672..661c9b15 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -367,7 +367,6 @@ export default { proxies: 'Proxies', redeemCodes: 'Redeem Codes', ops: 'Ops', - opsLogStream: 'Live Logs', promoCodes: 'Promo Codes', settings: 'Settings', myAccount: 'My Account', @@ -4466,39 +4465,6 @@ export default { ops: { title: 'Ops Monitoring', description: 'Operational monitoring and troubleshooting', - logStream: { - title: 'Live Request Logs', - description: 'Subscribe to a real-time SSE stream of every gateway request', - empty: 'No logs yet — waiting for new requests or adjust filters', - loadError: 'Failed to load live log stream', - pause: 'Pause', - resume: 'Resume', - clearLogs: 'Clear', - scrollToBottom: 'Jump to latest', - statusBar: 'Showing {shown} · {pending} queued while paused · {dropped} dropped', - status: { - live: 'Live', - connecting: 'Connecting', - closed: 'Disconnected', - error: 'Error' - }, - filter: { - allStatuses: 'All statuses', - modelPlaceholder: 'Model (exact)', - accountIdPlaceholder: 'Account ID', - minLatencyMs: 'Min latency ms' - }, - col: { - time: 'Time', - method: 'Method', - path: 'Path', - status: 'Status', - latency: 'Latency', - model: 'Model', - accountId: 'Account', - error: 'Error' - } - }, // Dashboard systemHealth: 'System Health', overview: 'Overview', diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index 6b595bef..f50e218b 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -367,7 +367,6 @@ export default { proxies: 'IP管理', redeemCodes: '兑换码', ops: '运维监控', - opsLogStream: '实时日志', promoCodes: '优惠码', settings: '系统设置', myAccount: '我的账户', @@ -4622,39 +4621,6 @@ export default { ops: { title: '运维监控', description: '运维监控与排障', - logStream: { - title: '实时请求日志', - description: '订阅每条网关请求的 SSE 实时流', - empty: '暂无日志,请等待新请求或调整过滤器', - loadError: '加载实时日志失败', - pause: '暂停', - resume: '继续', - clearLogs: '清空', - scrollToBottom: '回到最新', - statusBar: '当前显示 {shown} 条,暂停队列 {pending} 条,已丢弃 {dropped} 条', - status: { - live: '实时', - connecting: '连接中', - closed: '已断开', - error: '错误' - }, - filter: { - allStatuses: '全部状态', - modelPlaceholder: '模型 (精确匹配)', - accountIdPlaceholder: 'Account ID', - minLatencyMs: '最小延迟 ms' - }, - col: { - time: '时间', - method: '方法', - path: '路径', - status: '状态', - latency: '延迟', - model: '模型', - accountId: '账号', - error: '错误' - } - }, // Dashboard systemHealth: '系统健康', overview: '概览', diff --git a/frontend/src/router/index.ts b/frontend/src/router/index.ts index e60703e5..1170214c 100644 --- a/frontend/src/router/index.ts +++ b/frontend/src/router/index.ts @@ -401,18 +401,6 @@ const routes: RouteRecordRaw[] = [ descriptionKey: 'admin.ops.description' } }, - { - path: '/admin/ops/logs', - name: 'AdminOpsLogStream', - component: () => import('@/views/admin/ops/OpsLogStreamView.vue'), - meta: { - requiresAuth: true, - requiresAdmin: true, - title: 'Live Log Stream', - titleKey: 'admin.ops.logStream.title', - descriptionKey: 'admin.ops.logStream.description' - } - }, { path: '/admin/users', name: 'AdminUsers', diff --git a/frontend/src/views/admin/ops/OpsLogStreamView.vue b/frontend/src/views/admin/ops/OpsLogStreamView.vue deleted file mode 100644 index ff3e5c5d..00000000 --- a/frontend/src/views/admin/ops/OpsLogStreamView.vue +++ /dev/null @@ -1,390 +0,0 @@ - - - \ No newline at end of file