chore: remove unused real-time log stream / request event bus

删除 fork 独有的实时日志相关功能(上游 Wei-Shaw/sub2api 不存在):

A. OpsLogBroadcaster + SSE 日志流(前端有用但用户不需要):
- backend/internal/service/ops_log_broadcaster{,_test}.go
- backend/internal/handler/ops_log_stream_middleware.go
- backend/internal/handler/admin/ops_log_stream_handler.go
- backend/internal/server/routes/admin.go: GET /admin/ops/logs/{stream,recent}
- backend/internal/server/routes/{gateway,windsurf_gateway}.go: opsLogStream middleware
- backend/internal/service/wire.go: ProvideOpsLogBroadcaster
- frontend/src/views/admin/ops/OpsLogStreamView.vue
- frontend/src/api/admin/ops.ts: subscribeOpsLogStream, getRecentOpsLogs,
  OpsLogEntry/OpsLogFilter/OpsLogRecentResponse 类型
- frontend/src/router/index.ts: AdminOpsLogStream 路由
- frontend/src/components/layout/AppSidebar.vue: 侧边栏入口
- frontend/src/i18n/locales/{en,zh}.ts: nav.opsLogStream + admin.ops.logStream 全部文案

B. RequestEventBus + WS 请求事件流(前端零调用 dead code):
- backend/internal/service/request_event_bus{,_test}.go
- backend/internal/handler/admin/ops_ws_requests_handler.go
- backend/internal/server/routes/admin.go: GET /admin/ops/ws/requests
- backend/internal/handler/gateway_handler.go: RequestEventBus 字段/参数 +
  reqStartTime + reqEventAccountID/reqEventStatus 跟踪 + defer Publish
- backend/internal/service/wire.go: NewRequestEventBus
- backend/internal/handler/admin/ops_handler.go: OpsHandler 中
  requestEventBus + logBroadcaster 字段,简化 NewOpsHandler 签名

保留:
- /admin/ops/ws/qps (前端 QPS 监控仍在用)
- /admin/ops/realtime-traffic (前端在用)
- OpsErrorLoggerMiddleware (与本次无关)

签名变更:
- NewOpsHandler(opsService) — 移除 requestEventBus, logBroadcaster
- NewGatewayHandler(...): 移除 requestEventBus 末位参数
- ProvideRouter / SetupRouter / registerRoutes / RegisterGatewayRoutes /
  RegisterWindsurfGatewayRoutes: 移除 opsLogBroadcaster 参数
- 同步更新 wire_gen.go + 测试调用点

验证:
- 后端 go build/vet 通过
- 前端 pnpm run build 通过 (9.48s)
- 测试: 2 个 baseline 既存失败 (TestProxyImportData...,
  TestWindsurfTierAccessService_Snapshot_HappyPath) 与本次无关
This commit is contained in:
win 2026-05-20 22:43:20 +08:00
parent 502d57652f
commit 82bc1e199f
25 changed files with 29 additions and 1935 deletions

View File

@ -207,9 +207,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
defaultLoadBalancer := payment.ProvideDefaultLoadBalancer(client, encryptionKey)
paymentService := service.ProvidePaymentService(client, registry, defaultLoadBalancer, redeemService, subscriptionService, paymentConfigService, userRepository, groupRepository, affiliateService, notificationEmailService)
settingHandler := handler.ProvideAdminSettingHandler(settingService, emailService, turnstileService, opsService, paymentConfigService, paymentService, userAttributeService, notificationEmailService)
requestEventBus := service.NewRequestEventBus()
opsLogBroadcaster := service.ProvideOpsLogBroadcaster()
opsHandler := admin.NewOpsHandler(opsService, requestEventBus, opsLogBroadcaster)
opsHandler := admin.NewOpsHandler(opsService)
updateCache := repository.NewUpdateCache(redisClient)
gitHubReleaseClient := repository.ProvideGitHubReleaseClient(configConfig)
serviceBuildInfo := provideServiceBuildInfo(buildInfo)
@ -252,7 +250,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
usageRecordWorkerPool := service.NewUsageRecordWorkerPool(configConfig)
userMsgQueueCache := repository.NewUserMsgQueueCache(redisClient)
userMessageQueueService := service.ProvideUserMessageQueueService(userMsgQueueCache, rpmCache, configConfig)
gatewayHandler := handler.NewGatewayHandler(gatewayService, geminiMessagesCompatService, antigravityGatewayService, windsurfGatewayService, userService, concurrencyService, billingCacheService, usageService, apiKeyService, usageRecordWorkerPool, errorPassthroughService, contentModerationService, userMessageQueueService, configConfig, settingService, requestEventBus)
gatewayHandler := handler.NewGatewayHandler(gatewayService, geminiMessagesCompatService, antigravityGatewayService, windsurfGatewayService, userService, concurrencyService, billingCacheService, usageService, apiKeyService, usageRecordWorkerPool, errorPassthroughService, contentModerationService, userMessageQueueService, configConfig, settingService)
openAIGatewayHandler := handler.NewOpenAIGatewayHandler(openAIGatewayService, concurrencyService, billingCacheService, apiKeyService, usageRecordWorkerPool, errorPassthroughService, contentModerationService, configConfig)
handlerSettingHandler := handler.ProvideSettingHandler(settingService, buildInfo, notificationEmailService)
totpHandler := handler.NewTotpHandler(totpService)
@ -266,7 +264,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
adminAuthMiddleware := middleware.NewAdminAuthMiddleware(authService, userService, settingService)
apiKeyAuthMiddleware := middleware.NewAPIKeyAuthMiddleware(apiKeyService, subscriptionService, configConfig)
healthService := service.NewHealthService(db, redisClient)
engine := server.ProvideRouter(configConfig, handlers, jwtAuthMiddleware, adminAuthMiddleware, apiKeyAuthMiddleware, apiKeyService, subscriptionService, opsService, settingService, healthService, redisClient, opsLogBroadcaster)
engine := server.ProvideRouter(configConfig, handlers, jwtAuthMiddleware, adminAuthMiddleware, apiKeyAuthMiddleware, apiKeyService, subscriptionService, opsService, settingService, healthService, redisClient)
httpServer := server.ProvideHTTPServer(configConfig, engine)
opsMetricsCollector := service.ProvideOpsMetricsCollector(opsRepository, settingRepository, accountRepository, concurrencyService, db, redisClient, configConfig)
opsAggregationService := service.ProvideOpsAggregationService(opsRepository, settingRepository, db, redisClient, configConfig)

View File

@ -14,9 +14,7 @@ import (
)
type OpsHandler struct {
opsService *service.OpsService
requestEventBus *service.RequestEventBus
logBroadcaster *service.OpsLogBroadcaster
opsService *service.OpsService
}
// GetErrorLogByID returns ops error log detail.
@ -70,8 +68,8 @@ func parseOpsViewParam(c *gin.Context) string {
}
}
func NewOpsHandler(opsService *service.OpsService, requestEventBus *service.RequestEventBus, logBroadcaster *service.OpsLogBroadcaster) *OpsHandler {
return &OpsHandler{opsService: opsService, requestEventBus: requestEventBus, logBroadcaster: logBroadcaster}
func NewOpsHandler(opsService *service.OpsService) *OpsHandler {
return &OpsHandler{opsService: opsService}
}
// GetErrorLogs lists ops error logs.

View File

@ -1,210 +0,0 @@
package admin
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/response"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
)
const (
opsLogStreamHeartbeat = 25 * time.Second
opsLogStreamRecentMax = 200
opsLogStreamSubBufEntries = 1024
opsLogStreamModelMaxLen = 256
)
// LogStream serves a Server-Sent Events feed of every gateway request.
//
// GET /api/v1/admin/ops/logs/stream?min_status=400&model=glm-4.7&account_id=42&min_latency_ms=2000
//
// Filter query params (all optional, AND-combined):
//
// min_status — int only emit when entry.status >= this value
// model — exact match on model key
// account_id — int64
// group_id — int64
// min_latency_ms — int64 only emit when entry.latency_ms >= this value
//
// The handler keeps the connection open until the client disconnects, the
// monitoring is disabled, or the broadcaster is torn down. A heartbeat
// comment line is sent every 25s so reverse proxies don't time out idle
// streams.
func (h *OpsHandler) LogStream(c *gin.Context) {
if h.logBroadcaster == nil {
response.Error(c, http.StatusServiceUnavailable, "log broadcaster not configured")
return
}
// nil opsService is allowed for lightweight deployments / tests where
// the OpsService dependency is intentionally absent. The admin auth
// middleware on the route group still enforces JWT + admin role, so
// the stream is never reachable anonymously.
if h.opsService != nil {
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
}
filter, err := parseOpsLogFilter(c)
if err != nil {
response.BadRequest(c, err.Error())
return
}
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("X-Accel-Buffering", "no")
flusher, ok := c.Writer.(http.Flusher)
if !ok {
response.Error(c, http.StatusInternalServerError, "streaming unsupported")
return
}
ch, unsubscribe := h.logBroadcaster.Subscribe(filter, opsLogStreamSubBufEntries)
defer unsubscribe()
// Prime client with recent buffered history (so a fresh dashboard tab
// renders something immediately instead of staying blank).
for _, e := range h.logBroadcaster.Snapshot(filter, opsLogStreamRecentMax) {
if err := writeOpsLogSSE(c.Writer, &e); err != nil {
return
}
}
flusher.Flush()
heartbeat := time.NewTicker(opsLogStreamHeartbeat)
defer heartbeat.Stop()
ctxDone := c.Request.Context().Done()
for {
select {
case <-ctxDone:
return
case <-heartbeat.C:
if _, err := io.WriteString(c.Writer, ": ping\n\n"); err != nil {
return
}
flusher.Flush()
case entry := <-ch:
if err := writeOpsLogSSE(c.Writer, &entry); err != nil {
return
}
flusher.Flush()
}
}
}
// LogStreamRecent returns the broadcaster history without subscribing.
// Useful for one-shot polling when the admin panel cannot keep an open
// SSE connection (e.g. behind a buffering proxy).
//
// GET /api/v1/admin/ops/logs/recent?min_status=400&max=500
func (h *OpsHandler) LogStreamRecent(c *gin.Context) {
if h.logBroadcaster == nil {
response.Error(c, http.StatusServiceUnavailable, "log broadcaster not configured")
return
}
if h.opsService != nil {
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
}
filter, err := parseOpsLogFilter(c)
if err != nil {
response.BadRequest(c, err.Error())
return
}
maxN := opsLogStreamRecentMax
if v := strings.TrimSpace(c.Query("max")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 2000 {
maxN = n
}
}
published, dropped, subs := h.logBroadcaster.Stats()
response.Success(c, gin.H{
"entries": h.logBroadcaster.Snapshot(filter, maxN),
"published_total": published,
"dropped_total": dropped,
"subscribers": subs,
})
}
func parseOpsLogFilter(c *gin.Context) (service.OpsLogFilter, error) {
f := service.OpsLogFilter{}
if v := strings.TrimSpace(c.Query("min_status")); v != "" {
n, err := strconv.Atoi(v)
if err != nil || n < 0 {
return f, fmt.Errorf("invalid min_status")
}
f.MinStatus = n
}
if v := strings.TrimSpace(c.Query("model")); v != "" {
// Cap input to keep an authenticated admin from stuffing huge
// strings into long-lived subscription state.
if len(v) > opsLogStreamModelMaxLen {
return f, fmt.Errorf("model too long (max %d)", opsLogStreamModelMaxLen)
}
f.Model = v
}
if v := strings.TrimSpace(c.Query("account_id")); v != "" {
n, err := strconv.ParseInt(v, 10, 64)
// Reject 0 — matches() treats AccountID==0 as "match all", so a
// param of 0 would silently degrade to no-filter without telling
// the user. Demand a positive id (callers wanting all should omit
// the param).
if err != nil || n <= 0 {
return f, fmt.Errorf("invalid account_id")
}
f.AccountID = n
}
if v := strings.TrimSpace(c.Query("group_id")); v != "" {
n, err := strconv.ParseInt(v, 10, 64)
if err != nil || n <= 0 {
return f, fmt.Errorf("invalid group_id")
}
f.GroupID = n
}
if v := strings.TrimSpace(c.Query("min_latency_ms")); v != "" {
n, err := strconv.ParseInt(v, 10, 64)
if err != nil || n < 0 {
return f, fmt.Errorf("invalid min_latency_ms")
}
f.MinLatencyMs = n
}
return f, nil
}
// writeOpsLogSSE writes one SSE frame: an `event: log` line followed by a
// single `data:` line containing the JSON-encoded entry, terminated by a
// blank line per the SSE protocol.
//
// This assumes the JSON payload contains no bare LF — which holds because
// every string field in OpsLogEntry passes through encoding/json escaping.
// If a future field is added that emits raw bytes (e.g. a []byte body),
// the marshalled output must be split across multiple `data:` lines.
func writeOpsLogSSE(w io.Writer, e *service.OpsLogEntry) error {
payload, err := json.Marshal(e)
if err != nil {
return err
}
if _, err := io.WriteString(w, "event: log\ndata: "); err != nil {
return err
}
if _, err := w.Write(payload); err != nil {
return err
}
_, err = io.WriteString(w, "\n\n")
return err
}

View File

@ -116,7 +116,7 @@ func newRuntimeOpsService(t *testing.T) *service.OpsService {
}
func TestOpsRuntimeLoggingHandler_GetConfig(t *testing.T) {
h := NewOpsHandler(newRuntimeOpsService(t), nil, nil)
h := NewOpsHandler(newRuntimeOpsService(t))
r := newOpsRuntimeRouter(h, false)
w := httptest.NewRecorder()
@ -128,7 +128,7 @@ func TestOpsRuntimeLoggingHandler_GetConfig(t *testing.T) {
}
func TestOpsRuntimeLoggingHandler_UpdateUnauthorized(t *testing.T) {
h := NewOpsHandler(newRuntimeOpsService(t), nil, nil)
h := NewOpsHandler(newRuntimeOpsService(t))
r := newOpsRuntimeRouter(h, false)
body := `{"level":"debug","enable_sampling":false,"sampling_initial":100,"sampling_thereafter":100,"caller":true,"stacktrace_level":"error","retention_days":30}`
@ -142,7 +142,7 @@ func TestOpsRuntimeLoggingHandler_UpdateUnauthorized(t *testing.T) {
}
func TestOpsRuntimeLoggingHandler_UpdateAndResetSuccess(t *testing.T) {
h := NewOpsHandler(newRuntimeOpsService(t), nil, nil)
h := NewOpsHandler(newRuntimeOpsService(t))
r := newOpsRuntimeRouter(h, true)
payload := map[string]any{

View File

@ -35,7 +35,7 @@ func newOpsSystemLogTestRouter(handler *OpsHandler, withUser bool) *gin.Engine {
}
func TestOpsSystemLogHandler_ListUnavailable(t *testing.T) {
h := NewOpsHandler(nil, nil, nil)
h := NewOpsHandler(nil)
r := newOpsSystemLogTestRouter(h, false)
w := httptest.NewRecorder()
@ -48,7 +48,7 @@ func TestOpsSystemLogHandler_ListUnavailable(t *testing.T) {
func TestOpsSystemLogHandler_ListInvalidUserID(t *testing.T) {
svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
h := NewOpsHandler(svc, nil, nil)
h := NewOpsHandler(svc)
r := newOpsSystemLogTestRouter(h, false)
w := httptest.NewRecorder()
@ -61,7 +61,7 @@ func TestOpsSystemLogHandler_ListInvalidUserID(t *testing.T) {
func TestOpsSystemLogHandler_ListInvalidAccountID(t *testing.T) {
svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
h := NewOpsHandler(svc, nil, nil)
h := NewOpsHandler(svc)
r := newOpsSystemLogTestRouter(h, false)
w := httptest.NewRecorder()
@ -76,7 +76,7 @@ func TestOpsSystemLogHandler_ListMonitoringDisabled(t *testing.T) {
svc := service.NewOpsService(nil, nil, &config.Config{
Ops: config.OpsConfig{Enabled: false},
}, nil, nil, nil, nil, nil, nil, nil, nil)
h := NewOpsHandler(svc, nil, nil)
h := NewOpsHandler(svc)
r := newOpsSystemLogTestRouter(h, false)
w := httptest.NewRecorder()
@ -89,7 +89,7 @@ func TestOpsSystemLogHandler_ListMonitoringDisabled(t *testing.T) {
func TestOpsSystemLogHandler_ListSuccess(t *testing.T) {
svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
h := NewOpsHandler(svc, nil, nil)
h := NewOpsHandler(svc)
r := newOpsSystemLogTestRouter(h, false)
w := httptest.NewRecorder()
@ -110,7 +110,7 @@ func TestOpsSystemLogHandler_ListSuccess(t *testing.T) {
func TestOpsSystemLogHandler_CleanupUnauthorized(t *testing.T) {
svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
h := NewOpsHandler(svc, nil, nil)
h := NewOpsHandler(svc)
r := newOpsSystemLogTestRouter(h, false)
w := httptest.NewRecorder()
@ -124,7 +124,7 @@ func TestOpsSystemLogHandler_CleanupUnauthorized(t *testing.T) {
func TestOpsSystemLogHandler_CleanupInvalidPayload(t *testing.T) {
svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
h := NewOpsHandler(svc, nil, nil)
h := NewOpsHandler(svc)
r := newOpsSystemLogTestRouter(h, true)
w := httptest.NewRecorder()
@ -138,7 +138,7 @@ func TestOpsSystemLogHandler_CleanupInvalidPayload(t *testing.T) {
func TestOpsSystemLogHandler_CleanupInvalidTime(t *testing.T) {
svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
h := NewOpsHandler(svc, nil, nil)
h := NewOpsHandler(svc)
r := newOpsSystemLogTestRouter(h, true)
w := httptest.NewRecorder()
@ -152,7 +152,7 @@ func TestOpsSystemLogHandler_CleanupInvalidTime(t *testing.T) {
func TestOpsSystemLogHandler_CleanupInvalidEndTime(t *testing.T) {
svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
h := NewOpsHandler(svc, nil, nil)
h := NewOpsHandler(svc)
r := newOpsSystemLogTestRouter(h, true)
w := httptest.NewRecorder()
@ -166,7 +166,7 @@ func TestOpsSystemLogHandler_CleanupInvalidEndTime(t *testing.T) {
func TestOpsSystemLogHandler_CleanupServiceUnavailable(t *testing.T) {
svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
h := NewOpsHandler(svc, nil, nil)
h := NewOpsHandler(svc)
r := newOpsSystemLogTestRouter(h, true)
w := httptest.NewRecorder()
@ -182,7 +182,7 @@ func TestOpsSystemLogHandler_CleanupMonitoringDisabled(t *testing.T) {
svc := service.NewOpsService(nil, nil, &config.Config{
Ops: config.OpsConfig{Enabled: false},
}, nil, nil, nil, nil, nil, nil, nil, nil)
h := NewOpsHandler(svc, nil, nil)
h := NewOpsHandler(svc)
r := newOpsSystemLogTestRouter(h, true)
w := httptest.NewRecorder()
@ -197,7 +197,7 @@ func TestOpsSystemLogHandler_CleanupMonitoringDisabled(t *testing.T) {
func TestOpsSystemLogHandler_Health(t *testing.T) {
sink := service.NewOpsSystemLogSink(nil)
svc := service.NewOpsService(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, sink)
h := NewOpsHandler(svc, nil, nil)
h := NewOpsHandler(svc)
r := newOpsSystemLogTestRouter(h, false)
w := httptest.NewRecorder()
@ -209,7 +209,7 @@ func TestOpsSystemLogHandler_Health(t *testing.T) {
}
func TestOpsSystemLogHandler_HealthUnavailableAndMonitoringDisabled(t *testing.T) {
h := NewOpsHandler(nil, nil, nil)
h := NewOpsHandler(nil)
r := newOpsSystemLogTestRouter(h, false)
w := httptest.NewRecorder()
@ -222,7 +222,7 @@ func TestOpsSystemLogHandler_HealthUnavailableAndMonitoringDisabled(t *testing.T
svc := service.NewOpsService(nil, nil, &config.Config{
Ops: config.OpsConfig{Enabled: false},
}, nil, nil, nil, nil, nil, nil, nil, nil)
h = NewOpsHandler(svc, nil, nil)
h = NewOpsHandler(svc)
r = newOpsSystemLogTestRouter(h, false)
w = httptest.NewRecorder()
req = httptest.NewRequest(http.MethodGet, "/logs/health", nil)

View File

@ -1,198 +0,0 @@
package admin
import (
"context"
"encoding/json"
"net/http"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
type requestStreamWSMessage struct {
Type string `json:"type"`
Data service.RequestEvent `json:"data"`
}
// RequestStreamWSHandler streams real-time request events to WebSocket clients.
// GET /api/v1/admin/ops/ws/requests
//
// Each connected client receives a JSON message per gateway dispatch:
//
// {"type":"request_event","data":{"timestamp":...,"method":"POST","path":"/v1/messages",
// "model":"claude-3-5-sonnet-20241022","account_id":42,"status":"success","latency_ms":1230}}
func (h *OpsHandler) RequestStreamWSHandler(c *gin.Context) {
clientIP := requestClientIP(c.Request)
if h == nil || h.opsService == nil {
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "ops service not initialized"})
return
}
if h.requestEventBus == nil {
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "request event bus not initialized"})
return
}
if !h.opsService.IsRealtimeMonitoringEnabled(c.Request.Context()) {
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "ops realtime monitoring is disabled"})
return
}
closeWS(conn, opsWSCloseRealtimeDisabled, "realtime_disabled")
return
}
if !tryAcquireOpsWSTotalSlot(opsWSLimits.MaxConns) {
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] connection limit reached: %d/%d", wsConnCount.Load(), opsWSLimits.MaxConns)
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "too many connections"})
return
}
defer func() {
if wsConnCount.Add(-1) == 0 {
scheduleQPSWSIdleStop()
}
}()
if opsWSLimits.MaxConnsPerIP > 0 && clientIP != "" {
if !tryAcquireOpsWSIPSlot(clientIP, opsWSLimits.MaxConnsPerIP) {
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] per-ip limit reached: ip=%s limit=%d", clientIP, opsWSLimits.MaxConnsPerIP)
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "too many connections"})
return
}
defer releaseOpsWSIPSlot(clientIP)
}
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] upgrade failed: %v", err)
return
}
defer func() { _ = conn.Close() }()
handleRequestStreamWebSocket(c.Request.Context(), conn, h.requestEventBus)
}
func handleRequestStreamWebSocket(parentCtx context.Context, conn *websocket.Conn, bus *service.RequestEventBus) {
if conn == nil || bus == nil {
return
}
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()
subID, eventCh := bus.Subscribe()
defer bus.Unsubscribe(subID)
var closeOnce sync.Once
closeConn := func() {
closeOnce.Do(func() { _ = conn.Close() })
}
closeFrameCh := make(chan []byte, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
conn.SetReadLimit(qpsWSMaxReadBytes)
if err := conn.SetReadDeadline(time.Now().Add(qpsWSPongWait)); err != nil {
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] set read deadline failed: %v", err)
return
}
conn.SetPongHandler(func(string) error {
return conn.SetReadDeadline(time.Now().Add(qpsWSPongWait))
})
conn.SetCloseHandler(func(code int, text string) error {
select {
case closeFrameCh <- websocket.FormatCloseMessage(code, text):
default:
}
cancel()
return nil
})
for {
_, _, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) {
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] read failed: %v", err)
}
return
}
}
}()
pingTicker := time.NewTicker(qpsWSPingInterval)
defer pingTicker.Stop()
writeWithTimeout := func(messageType int, data []byte) error {
if err := conn.SetWriteDeadline(time.Now().Add(qpsWSWriteTimeout)); err != nil {
return err
}
return conn.WriteMessage(messageType, data)
}
sendClose := func(closeFrame []byte) {
if closeFrame == nil {
closeFrame = websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
}
_ = writeWithTimeout(websocket.CloseMessage, closeFrame)
}
for {
select {
case evt, ok := <-eventCh:
if !ok {
// channel closed by Unsubscribe
sendClose(nil)
closeConn()
wg.Wait()
return
}
msg, err := json.Marshal(requestStreamWSMessage{Type: "request_event", Data: evt})
if err != nil {
continue
}
if err := writeWithTimeout(websocket.TextMessage, msg); err != nil {
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] write failed: %v", err)
cancel()
closeConn()
wg.Wait()
return
}
case <-pingTicker.C:
if err := writeWithTimeout(websocket.PingMessage, nil); err != nil {
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] ping failed: %v", err)
cancel()
closeConn()
wg.Wait()
return
}
case closeFrame := <-closeFrameCh:
sendClose(closeFrame)
closeConn()
wg.Wait()
return
case <-ctx.Done():
var closeFrame []byte
select {
case closeFrame = <-closeFrameCh:
default:
}
sendClose(closeFrame)
closeConn()
wg.Wait()
return
}
}
}

View File

@ -50,7 +50,6 @@ type GatewayHandler struct {
contentModerationService *service.ContentModerationService
concurrencyHelper *ConcurrencyHelper
userMsgQueueHelper *UserMsgQueueHelper
requestEventBus *service.RequestEventBus
maxAccountSwitches int
maxAccountSwitchesGemini int
cfg *config.Config
@ -74,7 +73,6 @@ func NewGatewayHandler(
userMsgQueueService *service.UserMessageQueueService,
cfg *config.Config,
settingService *service.SettingService,
requestEventBus *service.RequestEventBus,
) *GatewayHandler {
pingInterval := time.Duration(0)
maxAccountSwitches := 10
@ -109,7 +107,6 @@ func NewGatewayHandler(
contentModerationService: contentModerationService,
concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatClaude, pingInterval),
userMsgQueueHelper: umqHelper,
requestEventBus: requestEventBus,
maxAccountSwitches: maxAccountSwitches,
maxAccountSwitchesGemini: maxAccountSwitchesGemini,
cfg: cfg,
@ -120,7 +117,6 @@ func NewGatewayHandler(
// Messages handles Claude API compatible messages endpoint
// POST /v1/messages
func (h *GatewayHandler) Messages(c *gin.Context) {
reqStartTime := time.Now()
// 从context获取apiKey和userApiKeyAuth中间件已设置
apiKey, ok := middleware2.GetAPIKeyFromContext(c)
if !ok {
@ -172,25 +168,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
// 解析渠道级模型映射
channelMapping, _ := h.gatewayService.ResolveChannelMappingAndRestrict(c.Request.Context(), apiKey.GroupID, reqModel)
// 实时请求查看器:记录每次请求的结果(账号、模型、状态、延迟)
var (
reqEventAccountID int64
reqEventStatus = "error"
)
defer func() {
if h.requestEventBus != nil {
h.requestEventBus.Publish(service.RequestEvent{
Timestamp: reqStartTime,
Method: c.Request.Method,
Path: c.FullPath(),
Model: reqModel,
AccountID: reqEventAccountID,
Status: reqEventStatus,
LatencyMS: time.Since(reqStartTime).Milliseconds(),
})
}
}()
// 设置 max_tokens=1 + haiku 探测请求标识到 context 中
// 必须在 SetClaudeCodeClientContext 之前设置,因为 ClaudeCodeValidator 需要读取此标识进行绕过判断
if isMaxTokensOneHaikuRequest(reqModel, parsedReq.MaxTokens, reqStream) {
@ -463,8 +440,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
if accountReleaseFunc != nil {
accountReleaseFunc()
}
reqEventAccountID = account.ID
reqEventStatus = "rate_limited"
h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "RPM rate limit exceeded, please retry later", streamStarted)
return
}
@ -537,10 +512,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
return
}
// 实时请求查看器:标记 Gemini 路径成功
reqEventAccountID = account.ID
reqEventStatus = "success"
// RPM 计数递增Forward 成功后)
// 注意TOCTOU 竞态是已知且可接受的设计权衡,与 WindowCost 一致的 soft-limit 模式。
// 在高并发下可能短暂超出 RPM 限制,但不会导致请求失败。
@ -750,8 +721,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
if accountReleaseFunc != nil {
accountReleaseFunc()
}
reqEventAccountID = account.ID
reqEventStatus = "rate_limited"
h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "RPM rate limit exceeded, please retry later", streamStarted)
return
}
@ -957,10 +926,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
return
}
// 实时请求查看器:标记 Anthropic 路径成功
reqEventAccountID = account.ID
reqEventStatus = "success"
// RPM 计数递增Forward 成功后)
// 注意TOCTOU 竞态是已知且可接受的设计权衡,与 WindowCost 一致的 soft-limit 模式。
// 在高并发下可能短暂超出 RPM 限制,但不会导致请求失败。

View File

@ -1,100 +0,0 @@
package handler
import (
"strings"
"time"
"github.com/gin-gonic/gin"
servermiddleware "github.com/Wei-Shaw/sub2api/internal/server/middleware"
"github.com/Wei-Shaw/sub2api/internal/service"
)
// OpsLogStreamMiddleware fans every gateway request out to the in-memory
// OpsLogBroadcaster so admin tools can subscribe to a real-time SSE feed.
//
// This is intentionally separate from OpsErrorLoggerMiddleware:
// - OpsErrorLoggerMiddleware persists 4xx/5xx into the database for audit.
// - This middleware streams every request (success + failure) for live UX.
//
// The broadcaster.Publish call is non-blocking by design (see the
// implementation): a slow/missing subscriber NEVER stalls the request path.
// Empty broadcaster (nil receiver, or no subscribers) is a no-op.
func OpsLogStreamMiddleware(b *service.OpsLogBroadcaster) gin.HandlerFunc {
if b == nil {
return func(c *gin.Context) { c.Next() }
}
return func(c *gin.Context) {
start := time.Now()
c.Next()
entry := service.OpsLogEntry{
Time: start,
Method: c.Request.Method,
Path: c.Request.URL.Path,
Status: c.Writer.Status(),
LatencyMs: time.Since(start).Milliseconds(),
}
if v, ok := c.Get(opsModelKey); ok {
if s, ok := v.(string); ok {
entry.Model = s
}
}
if v, ok := c.Get(opsStreamKey); ok {
if streamFlag, ok := v.(bool); ok {
entry.Stream = streamFlag
}
}
if v, ok := c.Get(opsAccountIDKey); ok {
switch t := v.(type) {
case int64:
entry.AccountID = t
case int:
entry.AccountID = int64(t)
}
}
// Best-effort api-key + group + user from middleware context.
if apiKey, ok := servermiddleware.GetAPIKeyFromContext(c); ok && apiKey != nil {
entry.APIKeyID = apiKey.ID
if apiKey.GroupID != nil {
entry.GroupID = *apiKey.GroupID
}
entry.UserID = apiKey.UserID
}
// Pull upstream error context (set by gateway services on retries).
if v, ok := c.Get(service.OpsUpstreamStatusCodeKey); ok {
switch t := v.(type) {
case int:
entry.UpstreamCode = t
case int64:
entry.UpstreamCode = int(t)
}
}
if v, ok := c.Get(service.OpsUpstreamErrorMessageKey); ok {
if s, ok := v.(string); ok {
entry.ErrorMessage = trimForStream(s)
}
}
if v, ok := c.Get(service.OpsUpstreamErrorDetailKey); ok {
if s, ok := v.(string); ok {
entry.ErrorDetail = trimForStream(s)
}
}
b.Publish(entry)
}
}
// trimForStream caps long error strings so a single broken upstream cannot
// flood the SSE channel with megabyte-sized error blobs.
func trimForStream(s string) string {
const max = 512
s = strings.TrimSpace(s)
if len(s) > max {
return s[:max] + "…"
}
return s
}

View File

@ -40,7 +40,6 @@ func ProvideRouter(
settingService *service.SettingService,
healthService *service.HealthService,
redisClient *redis.Client,
opsLogBroadcaster *service.OpsLogBroadcaster,
) *gin.Engine {
if cfg.Server.Mode == "release" {
gin.SetMode(gin.ReleaseMode)
@ -97,7 +96,7 @@ func ProvideRouter(
service.SetWebSearchManager(websearch.NewManager(configs, redisClient))
})
return SetupRouter(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, healthService, cfg, redisClient, opsLogBroadcaster)
return SetupRouter(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, healthService, cfg, redisClient)
}
// ProvideHTTPServer 提供 HTTP 服务器

View File

@ -33,7 +33,6 @@ func SetupRouter(
healthService *service.HealthService,
cfg *config.Config,
redisClient *redis.Client,
opsLogBroadcaster *service.OpsLogBroadcaster,
) *gin.Engine {
// 缓存 iframe 页面的 origin 列表,用于动态注入 CSP frame-src
var cachedFrameOrigins atomic.Pointer[[]string]
@ -83,7 +82,7 @@ func SetupRouter(
}
// 注册路由
registerRoutes(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, healthService, cfg, redisClient, opsLogBroadcaster)
registerRoutes(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, healthService, cfg, redisClient)
return r
}
@ -102,7 +101,6 @@ func registerRoutes(
healthService *service.HealthService,
cfg *config.Config,
redisClient *redis.Client,
opsLogBroadcaster *service.OpsLogBroadcaster,
) {
// 通用路由(健康检查、状态等)
routes.RegisterCommonRoutes(r, healthService)
@ -114,10 +112,10 @@ func registerRoutes(
routes.RegisterAuthRoutes(v1, h, jwtAuth, redisClient, settingService)
routes.RegisterUserRoutes(v1, h, jwtAuth, settingService)
routes.RegisterAdminRoutes(v1, h, adminAuth)
routes.RegisterGatewayRoutes(r, h, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, opsLogBroadcaster)
routes.RegisterGatewayRoutes(r, h, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg)
// Windsurf gateway routes
routes.RegisterWindsurfGatewayRoutes(r, h, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, opsLogBroadcaster)
routes.RegisterWindsurfGatewayRoutes(r, h, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg)
routes.RegisterPaymentRoutes(v1, h.Payment, h.PaymentWebhook, h.Admin.Payment, jwtAuth, adminAuth, settingService)

View File

@ -132,8 +132,6 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
ops.GET("/user-concurrency", h.Admin.Ops.GetUserConcurrencyStats)
ops.GET("/account-availability", h.Admin.Ops.GetAccountAvailability)
ops.GET("/realtime-traffic", h.Admin.Ops.GetRealtimeTrafficSummary)
ops.GET("/logs/stream", h.Admin.Ops.LogStream)
ops.GET("/logs/recent", h.Admin.Ops.LogStreamRecent)
// Alerts (rules + events)
ops.GET("/alert-rules", h.Admin.Ops.ListAlertRules)
@ -170,11 +168,10 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
settings.PUT("/metric-thresholds", h.Admin.Ops.UpdateMetricThresholds)
}
// WebSocket realtime (QPS/TPS and request stream)
// WebSocket realtime (QPS/TPS)
ws := ops.Group("/ws")
{
ws.GET("/qps", h.Admin.Ops.QPSWSHandler)
ws.GET("/requests", h.Admin.Ops.RequestStreamWSHandler)
}
// Error logs (legacy)

View File

@ -21,12 +21,10 @@ func RegisterGatewayRoutes(
opsService *service.OpsService,
settingService *service.SettingService,
cfg *config.Config,
opsLogBroadcaster *service.OpsLogBroadcaster,
) {
bodyLimit := middleware.RequestBodyLimit(cfg.Gateway.MaxBodySize)
clientRequestID := middleware.ClientRequestID()
opsErrorLogger := handler.OpsErrorLoggerMiddleware(opsService)
opsLogStream := handler.OpsLogStreamMiddleware(opsLogBroadcaster)
endpointNorm := handler.InboundEndpointMiddleware()
// 未分组 Key 拦截中间件(按协议格式区分错误响应)
@ -38,7 +36,6 @@ func RegisterGatewayRoutes(
gateway.Use(bodyLimit)
gateway.Use(clientRequestID)
gateway.Use(opsErrorLogger)
gateway.Use(opsLogStream)
gateway.Use(endpointNorm)
gateway.Use(gin.HandlerFunc(apiKeyAuth))
gateway.Use(requireGroupAnthropic)

View File

@ -37,7 +37,6 @@ func newGatewayRoutesTestRouter() *gin.Engine {
nil,
nil,
&config.Config{},
nil,
)
return router

View File

@ -18,7 +18,6 @@ func RegisterWindsurfGatewayRoutes(
opsService *service.OpsService,
settingService *service.SettingService,
cfg *config.Config,
opsLogBroadcaster *service.OpsLogBroadcaster,
) {
if h.Gateway == nil {
return
@ -27,7 +26,6 @@ func RegisterWindsurfGatewayRoutes(
bodyLimit := middleware.RequestBodyLimit(cfg.Gateway.MaxBodySize)
clientRequestID := middleware.ClientRequestID()
opsErrorLogger := handler.OpsErrorLoggerMiddleware(opsService)
opsLogStream := handler.OpsLogStreamMiddleware(opsLogBroadcaster)
endpointNorm := handler.InboundEndpointMiddleware()
requireGroupAnthropic := middleware.RequireGroupAssignment(settingService, middleware.AnthropicErrorWriter)
@ -35,7 +33,6 @@ func RegisterWindsurfGatewayRoutes(
windsurfV1.Use(bodyLimit)
windsurfV1.Use(clientRequestID)
windsurfV1.Use(opsErrorLogger)
windsurfV1.Use(opsLogStream)
windsurfV1.Use(endpointNorm)
windsurfV1.Use(middleware.ForcePlatform(service.PlatformWindsurf))
windsurfV1.Use(gin.HandlerFunc(apiKeyAuth))

View File

@ -1,237 +0,0 @@
// Package service exposes domain services. opslog provides a lightweight
// in-memory pub/sub for streaming admin log events without persisting them.
package service
import (
"sync"
"sync/atomic"
"time"
)
// OpsLogEntry is one streamed log event. All fields are optional except Time
// — any missing data simply renders as empty in the admin UI.
type OpsLogEntry struct {
Time time.Time `json:"time"`
Method string `json:"method,omitempty"`
Path string `json:"path,omitempty"`
Status int `json:"status"`
LatencyMs int64 `json:"latency_ms"`
Model string `json:"model,omitempty"`
Stream bool `json:"stream,omitempty"`
AccountID int64 `json:"account_id,omitempty"`
GroupID int64 `json:"group_id,omitempty"`
APIKeyID int64 `json:"api_key_id,omitempty"`
UserID int64 `json:"user_id,omitempty"`
Turns int `json:"turns,omitempty"`
PromptChars int `json:"prompt_chars,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
ErrorDetail string `json:"error_detail,omitempty"`
UpstreamCode int `json:"upstream_status,omitempty"`
}
// OpsLogFilter restricts which entries a subscriber receives. Empty fields
// match everything; non-empty fields are AND-combined.
type OpsLogFilter struct {
MinStatus int
Model string
AccountID int64
GroupID int64
MinLatencyMs int64
}
// matches reports whether the entry passes the filter.
func (f OpsLogFilter) matches(e *OpsLogEntry) bool {
if f.MinStatus > 0 && e.Status < f.MinStatus {
return false
}
if f.Model != "" && e.Model != f.Model {
return false
}
if f.AccountID > 0 && e.AccountID != f.AccountID {
return false
}
if f.GroupID > 0 && e.GroupID != f.GroupID {
return false
}
if f.MinLatencyMs > 0 && e.LatencyMs < f.MinLatencyMs {
return false
}
return true
}
// OpsLogBroadcaster is a lock-free-ish fan-out broadcaster with a bounded
// ring buffer for history (so freshly connected clients can prime their UI)
// and per-subscriber non-blocking sends (so a slow client never stalls a
// publish on the hot request path).
type OpsLogBroadcaster struct {
subsMu sync.RWMutex
subscribers map[int64]*opsLogSubscription
nextID atomic.Int64
historyMu sync.Mutex
history []OpsLogEntry
histHead int
histLen int
histCap int
// publishedTotal / droppedTotal expose simple ops counters for an
// admin dashboard cell. Atomic so callers don't need to lock.
publishedTotal atomic.Int64
droppedTotal atomic.Int64
}
type opsLogSubscription struct {
ch chan OpsLogEntry
filter OpsLogFilter
// closed is set atomically by unsubscribe() before any cleanup. Publish
// reads this flag under subsMu.RLock and skips closed subscriptions
// instead of attempting a send-on-closed-channel (which would panic).
closed atomic.Bool
}
// NewOpsLogBroadcaster constructs a broadcaster. historyCap controls how
// many recent entries are kept for newly connected clients; 1000 is a sane
// default. Pass historyCap<=0 to disable the buffer entirely.
func NewOpsLogBroadcaster(historyCap int) *OpsLogBroadcaster {
if historyCap < 0 {
historyCap = 0
}
b := &OpsLogBroadcaster{
subscribers: make(map[int64]*opsLogSubscription),
histCap: historyCap,
}
if historyCap > 0 {
b.history = make([]OpsLogEntry, historyCap)
}
return b
}
// Publish fans the entry out to every matching subscriber and appends it to
// the history buffer. Never blocks: if a subscriber's channel is full, the
// entry is dropped for that subscriber and the broadcaster's drop counter
// is incremented. Hot-path safe.
//
// The same entry value is delivered (by value) to all subscribers and to
// the ring buffer — no shared mutable pointer is leaked, so subscribers
// holding references to past entries cannot observe later mutations.
func (b *OpsLogBroadcaster) Publish(entry OpsLogEntry) {
if entry.Time.IsZero() {
entry.Time = time.Now()
}
b.publishedTotal.Add(1)
b.appendHistory(entry)
b.subsMu.RLock()
subs := make([]*opsLogSubscription, 0, len(b.subscribers))
for _, s := range b.subscribers {
subs = append(subs, s)
}
b.subsMu.RUnlock()
for _, s := range subs {
// Skip subscriptions that have been unsubscribed since we snapped
// the list. Without this check, a concurrent unsubscribe → close(ch)
// would race the send below and panic on send-to-closed-channel.
if s.closed.Load() {
continue
}
if !s.filter.matches(&entry) {
continue
}
select {
case s.ch <- entry:
default:
b.droppedTotal.Add(1)
}
}
}
// Subscribe registers a new listener. The returned channel is buffered;
// callers MUST drain it. Cancel by calling the returned unsubscribe func
// (idempotent and safe from any goroutine).
//
// IMPORTANT: unsubscribe does NOT close the channel. Closing it would race
// with concurrent Publish goroutines that may already be holding a snapshot
// of the subscription pointer (causing a panic on send-to-closed-channel).
// Instead, unsubscribe (a) sets the closed atomic flag — Publish skips sends
// to flagged subs — and (b) removes from the subscriber map. Any in-flight
// send that slips past the flag check still proceeds harmlessly into the
// channel buffer and is garbage-collected with the channel once the caller
// drops its reference. Subscribers that need to know the broadcaster is
// done with them should rely on the parent ctx, not channel close.
func (b *OpsLogBroadcaster) Subscribe(filter OpsLogFilter, bufSize int) (<-chan OpsLogEntry, func()) {
if bufSize <= 0 {
bufSize = 1024
}
id := b.nextID.Add(1)
sub := &opsLogSubscription{
ch: make(chan OpsLogEntry, bufSize),
filter: filter,
}
b.subsMu.Lock()
b.subscribers[id] = sub
b.subsMu.Unlock()
var unsubOnce sync.Once
unsubscribe := func() {
unsubOnce.Do(func() {
sub.closed.Store(true)
b.subsMu.Lock()
delete(b.subscribers, id)
b.subsMu.Unlock()
})
}
return sub.ch, unsubscribe
}
// Snapshot returns a copy of the recent history (oldest → newest), filtered
// by the given filter. Used by /admin/ops/logs/recent to prime newly opened
// dashboards before live events arrive.
func (b *OpsLogBroadcaster) Snapshot(filter OpsLogFilter, maxEntries int) []OpsLogEntry {
b.historyMu.Lock()
defer b.historyMu.Unlock()
if b.histLen == 0 {
return nil
}
out := make([]OpsLogEntry, 0, b.histLen)
start := b.histHead - b.histLen
if start < 0 {
start += b.histCap
}
for i := 0; i < b.histLen; i++ {
idx := (start + i) % b.histCap
e := b.history[idx]
if !filter.matches(&e) {
continue
}
out = append(out, e)
}
if maxEntries > 0 && len(out) > maxEntries {
out = out[len(out)-maxEntries:]
}
return out
}
// Stats reports cumulative publish/drop counts and the current subscriber
// count for diagnostics.
func (b *OpsLogBroadcaster) Stats() (published, dropped int64, subscribers int) {
b.subsMu.RLock()
subscribers = len(b.subscribers)
b.subsMu.RUnlock()
return b.publishedTotal.Load(), b.droppedTotal.Load(), subscribers
}
func (b *OpsLogBroadcaster) appendHistory(e OpsLogEntry) {
if b.histCap == 0 {
return
}
b.historyMu.Lock()
defer b.historyMu.Unlock()
b.history[b.histHead] = e
b.histHead = (b.histHead + 1) % b.histCap
if b.histLen < b.histCap {
b.histLen++
}
}

View File

@ -1,267 +0,0 @@
package service
import (
"sync"
"testing"
"time"
)
func TestOpsLogBroadcaster_FanOutDeliversToMatchingSubscribers(t *testing.T) {
b := NewOpsLogBroadcaster(16)
chHigh, unHigh := b.Subscribe(OpsLogFilter{MinStatus: 500}, 8)
defer unHigh()
chAll, unAll := b.Subscribe(OpsLogFilter{}, 8)
defer unAll()
b.Publish(OpsLogEntry{Status: 200, Model: "claude-sonnet-4.6"})
b.Publish(OpsLogEntry{Status: 503, Model: "kimi-k2.5"})
got200 := receiveOrTimeout(t, chAll, 200*time.Millisecond)
got503 := receiveOrTimeout(t, chAll, 200*time.Millisecond)
if got200.Status != 200 || got503.Status != 503 {
t.Fatalf("unexpected fan-out: %d / %d", got200.Status, got503.Status)
}
gotHigh := receiveOrTimeout(t, chHigh, 200*time.Millisecond)
if gotHigh.Status != 503 {
t.Fatalf("filter MinStatus=500 should drop 200, got %d", gotHigh.Status)
}
expectNoMessage(t, chHigh, 50*time.Millisecond)
}
func TestOpsLogBroadcaster_FilterByModelAndAccount(t *testing.T) {
b := NewOpsLogBroadcaster(0)
chKimi, unKimi := b.Subscribe(OpsLogFilter{Model: "kimi-k2.5"}, 4)
defer unKimi()
chAcct, unAcct := b.Subscribe(OpsLogFilter{AccountID: 42}, 4)
defer unAcct()
b.Publish(OpsLogEntry{Status: 200, Model: "claude-sonnet-4.6", AccountID: 1})
b.Publish(OpsLogEntry{Status: 200, Model: "kimi-k2.5", AccountID: 42})
got := receiveOrTimeout(t, chKimi, 200*time.Millisecond)
if got.Model != "kimi-k2.5" {
t.Fatalf("expected kimi entry, got %+v", got)
}
expectNoMessage(t, chKimi, 50*time.Millisecond)
gotA := receiveOrTimeout(t, chAcct, 200*time.Millisecond)
if gotA.AccountID != 42 {
t.Fatalf("expected account 42, got %d", gotA.AccountID)
}
}
func TestOpsLogBroadcaster_NeverBlocksOnSlowSubscriber(t *testing.T) {
b := NewOpsLogBroadcaster(0)
// Subscriber with buffer=1, never reads. After the second Publish,
// the entry must be dropped instead of blocking the publisher.
_, unsub := b.Subscribe(OpsLogFilter{}, 1)
defer unsub()
done := make(chan struct{})
go func() {
for i := 0; i < 100; i++ {
b.Publish(OpsLogEntry{Status: 200})
}
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("publisher blocked on slow subscriber")
}
_, dropped, _ := b.Stats()
if dropped == 0 {
t.Fatal("expected dropped count > 0 when subscriber buffer overflows")
}
}
func TestOpsLogBroadcaster_HistorySnapshot(t *testing.T) {
b := NewOpsLogBroadcaster(3)
for i := 1; i <= 5; i++ {
b.Publish(OpsLogEntry{Status: 200 + i})
}
got := b.Snapshot(OpsLogFilter{}, 0)
if len(got) != 3 {
t.Fatalf("expected ring of 3, got %d", len(got))
}
// Oldest → newest
if got[0].Status != 203 || got[1].Status != 204 || got[2].Status != 205 {
t.Fatalf("expected 203/204/205, got %d/%d/%d", got[0].Status, got[1].Status, got[2].Status)
}
}
func TestOpsLogBroadcaster_HistoryAppliesFilter(t *testing.T) {
b := NewOpsLogBroadcaster(8)
b.Publish(OpsLogEntry{Status: 200})
b.Publish(OpsLogEntry{Status: 500})
b.Publish(OpsLogEntry{Status: 503})
got := b.Snapshot(OpsLogFilter{MinStatus: 500}, 0)
if len(got) != 2 {
t.Fatalf("expected 2 high-status entries, got %d: %+v", len(got), got)
}
}
func TestOpsLogBroadcaster_UnsubscribeIdempotent(t *testing.T) {
b := NewOpsLogBroadcaster(0)
_, unsub := b.Subscribe(OpsLogFilter{}, 1)
unsub()
unsub() // second call must not panic
unsub() // and a third
}
func TestOpsLogBroadcaster_ZeroTimeFilledIn(t *testing.T) {
b := NewOpsLogBroadcaster(2)
b.Publish(OpsLogEntry{Status: 200}) // Time intentionally zero
got := b.Snapshot(OpsLogFilter{}, 0)
if len(got) != 1 {
t.Fatalf("expected 1 entry, got %d", len(got))
}
if got[0].Time.IsZero() {
t.Fatal("Publish should populate zero Time with time.Now()")
}
}
func TestOpsLogBroadcaster_ConcurrentSafe(t *testing.T) {
b := NewOpsLogBroadcaster(64)
// Spin a few subscribers and producers; rely on -race to surface
// any concurrency bugs in the fan-out path.
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
ch, unsub := b.Subscribe(OpsLogFilter{}, 32)
wg.Add(1)
go func() {
defer wg.Done()
defer unsub()
deadline := time.Now().Add(200 * time.Millisecond)
for time.Now().Before(deadline) {
select {
case <-ch:
case <-time.After(5 * time.Millisecond):
}
}
}()
}
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 200; j++ {
b.Publish(OpsLogEntry{Status: 200, Model: "x"})
}
}()
}
wg.Wait()
pub, _, _ := b.Stats()
if pub != 800 {
t.Fatalf("expected 800 publishes, got %d", pub)
}
}
// TestOpsLogBroadcaster_ConcurrentUnsubscribeNoPanic exercises the exact
// race the audit identified: a Publish goroutine has snapped a subscription
// pointer while another goroutine unsubscribes (close(ch)) the moment before
// the send. Without the closed-flag guard in Publish, this races into
// "send on closed channel" and panics. With the guard, Publish observes
// closed=true and skips the send. Run with -race.
func TestOpsLogBroadcaster_ConcurrentUnsubscribeNoPanic(t *testing.T) {
b := NewOpsLogBroadcaster(0)
var wg sync.WaitGroup
for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 200; j++ {
ch, unsub := b.Subscribe(OpsLogFilter{}, 1)
// Drain non-blockingly until the next publish lands.
done := make(chan struct{})
go func() {
defer close(done)
timer := time.NewTimer(50 * time.Millisecond)
defer timer.Stop()
for {
select {
case <-ch:
case <-timer.C:
return
}
}
}()
b.Publish(OpsLogEntry{Status: 200})
unsub()
b.Publish(OpsLogEntry{Status: 200}) // post-unsub publish must not panic
<-done
}
}()
}
wg.Wait()
}
// TestOpsLogBroadcaster_SnapshotConcurrentWithPublish ensures Snapshot is
// safe under concurrent Publish (verifies subsMu vs historyMu coexistence).
func TestOpsLogBroadcaster_SnapshotConcurrentWithPublish(t *testing.T) {
b := NewOpsLogBroadcaster(32)
stop := make(chan struct{})
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
b.Publish(OpsLogEntry{Status: 200})
}
}
}()
go func() {
defer wg.Done()
for i := 0; i < 200; i++ {
_ = b.Snapshot(OpsLogFilter{}, 0)
}
}()
time.Sleep(50 * time.Millisecond)
close(stop)
wg.Wait()
}
// helpers --------------------------------------------------------------
func receiveOrTimeout(t *testing.T, ch <-chan OpsLogEntry, d time.Duration) OpsLogEntry {
t.Helper()
select {
case e := <-ch:
return e
case <-time.After(d):
t.Fatalf("timeout waiting for entry after %s", d)
}
return OpsLogEntry{}
}
func expectNoMessage(t *testing.T, ch <-chan OpsLogEntry, d time.Duration) {
t.Helper()
select {
case e := <-ch:
t.Fatalf("unexpected message: %+v", e)
case <-time.After(d):
}
}

View File

@ -1,75 +0,0 @@
package service
import (
"sync"
"sync/atomic"
"time"
)
const requestEventBufSize = 64
// RequestEvent is published for every gateway dispatch completion.
type RequestEvent struct {
Timestamp time.Time `json:"timestamp"`
Method string `json:"method"`
Path string `json:"path"`
Model string `json:"model"`
AccountID int64 `json:"account_id"`
// Status is "success", "error", or "rate_limited".
Status string `json:"status"`
LatencyMS int64 `json:"latency_ms"`
}
// RequestEventBus is a fan-out hub for real-time request events.
// Publishers call Publish; subscribers call Subscribe/Unsubscribe.
// Each subscriber gets its own buffered channel. If the buffer is full
// the event is dropped for that subscriber (non-blocking publish).
type RequestEventBus struct {
mu sync.RWMutex
subscribers map[uint64]chan RequestEvent
nextID atomic.Uint64
}
func NewRequestEventBus() *RequestEventBus {
return &RequestEventBus{
subscribers: make(map[uint64]chan RequestEvent),
}
}
// Subscribe registers a new subscriber and returns its ID and a receive-only channel.
func (b *RequestEventBus) Subscribe() (uint64, <-chan RequestEvent) {
id := b.nextID.Add(1)
ch := make(chan RequestEvent, requestEventBufSize)
b.mu.Lock()
b.subscribers[id] = ch
b.mu.Unlock()
return id, ch
}
// Unsubscribe removes a subscriber and closes its channel.
func (b *RequestEventBus) Unsubscribe(id uint64) {
b.mu.Lock()
ch, ok := b.subscribers[id]
if ok {
delete(b.subscribers, id)
}
b.mu.Unlock()
if ok {
close(ch)
}
}
// Publish sends an event to all current subscribers without blocking.
func (b *RequestEventBus) Publish(e RequestEvent) {
if b == nil {
return
}
b.mu.RLock()
defer b.mu.RUnlock()
for _, ch := range b.subscribers {
select {
case ch <- e:
default:
}
}
}

View File

@ -1,100 +0,0 @@
package service
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestRequestEventBus_PublishToSubscriber(t *testing.T) {
bus := NewRequestEventBus()
id, ch := bus.Subscribe()
defer bus.Unsubscribe(id)
evt := RequestEvent{Model: "claude-3", Status: "success", LatencyMS: 100}
bus.Publish(evt)
select {
case got := <-ch:
assert.Equal(t, evt, got)
case <-time.After(time.Second):
t.Fatal("timed out waiting for event")
}
}
func TestRequestEventBus_MultipleSubscribers(t *testing.T) {
bus := NewRequestEventBus()
id1, ch1 := bus.Subscribe()
id2, ch2 := bus.Subscribe()
defer bus.Unsubscribe(id1)
defer bus.Unsubscribe(id2)
evt := RequestEvent{Model: "claude-3", Status: "error"}
bus.Publish(evt)
for _, ch := range []<-chan RequestEvent{ch1, ch2} {
select {
case got := <-ch:
assert.Equal(t, evt, got)
case <-time.After(time.Second):
t.Fatal("timed out waiting for event on one subscriber")
}
}
}
func TestRequestEventBus_UnsubscribeClosesChannel(t *testing.T) {
bus := NewRequestEventBus()
id, ch := bus.Subscribe()
bus.Unsubscribe(id)
// Channel should be closed.
_, ok := <-ch
assert.False(t, ok, "channel should be closed after Unsubscribe")
}
func TestRequestEventBus_UnsubscribedMissesEvents(t *testing.T) {
bus := NewRequestEventBus()
id, _ := bus.Subscribe()
bus.Unsubscribe(id)
// Publish after unsubscribe should not panic.
require.NotPanics(t, func() {
bus.Publish(RequestEvent{Model: "test"})
})
}
func TestRequestEventBus_DropWhenFull(t *testing.T) {
bus := NewRequestEventBus()
id, ch := bus.Subscribe()
defer bus.Unsubscribe(id)
// Fill the buffer then publish one more — should drop, not block.
evt := RequestEvent{Model: "model", Status: "success"}
for i := 0; i < requestEventBufSize; i++ {
bus.Publish(evt)
}
// This publish should return immediately (dropped).
done := make(chan struct{})
go func() {
bus.Publish(evt)
close(done)
}()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("Publish blocked when buffer was full")
}
assert.Len(t, ch, requestEventBufSize)
}
func TestRequestEventBus_NilSafePublish(t *testing.T) {
var bus *RequestEventBus
require.NotPanics(t, func() {
bus.Publish(RequestEvent{Model: "test"})
})
}

View File

@ -453,7 +453,6 @@ var ProviderSet = wire.NewSet(
NewAnnouncementService,
NewAdminService,
NewRPMTokenBucketService,
NewRequestEventBus,
NewGatewayService,
NewOpenAIGatewayService,
NewOAuthService,
@ -533,7 +532,6 @@ var ProviderSet = wire.NewSet(
ProvideWindsurfRefreshService,
ProvideWindsurfProbeService,
ProvideWindsurfTierAccessService,
ProvideOpsLogBroadcaster,
ProvideChannelMonitorService,
ProvideChannelMonitorRunner,
NewChannelMonitorRequestTemplateService,
@ -556,17 +554,6 @@ func ProvideWindsurfTierAccessService(cfg *config.Config, accountRepo AccountRep
return NewWindsurfTierAccessService(accountRepo)
}
// ProvideOpsLogBroadcaster builds the in-memory ops log fan-out.
//
// Always returns a non-nil broadcaster — middleware/handler call sites
// rely on a stable receiver and gracefully no-op when the feature is
// effectively disabled (e.g. monitoring globally turned off via OpsService).
// History capacity is fixed at 1000 entries to bound memory.
func ProvideOpsLogBroadcaster() *OpsLogBroadcaster {
const historyCap = 1000
return NewOpsLogBroadcaster(historyCap)
}
// ProvideWindsurfLSService creates WindsurfLSService (nil when windsurf is disabled).
func ProvideWindsurfLSService(cfg *config.Config) *WindsurfLSService {
if !cfg.Windsurf.Enabled {

View File

@ -1324,190 +1324,7 @@ export const opsAPI = {
updateMetricThresholds,
listSystemLogs,
cleanupSystemLogs,
getSystemLogSinkHealth,
getRecentOpsLogs,
subscribeOpsLogStream
getSystemLogSinkHealth
}
export default opsAPI
// ===== Real-time ops log stream =====================================
export interface OpsLogEntry {
time: string
method?: string
path?: string
status: number
latency_ms: number
model?: string
stream?: boolean
account_id?: number
group_id?: number
api_key_id?: number
user_id?: number
turns?: number
prompt_chars?: number
error_message?: string
error_detail?: string
upstream_status?: number
}
export interface OpsLogFilter {
min_status?: number
model?: string
account_id?: number
group_id?: number
min_latency_ms?: number
}
export interface OpsLogRecentResponse {
entries: OpsLogEntry[]
published_total: number
dropped_total: number
subscribers: number
}
function buildLogQuery(filter: OpsLogFilter): string {
const params = new URLSearchParams()
if (filter.min_status && filter.min_status > 0) params.set('min_status', String(filter.min_status))
if (filter.model) params.set('model', filter.model)
if (filter.account_id && filter.account_id > 0) params.set('account_id', String(filter.account_id))
if (filter.group_id && filter.group_id > 0) params.set('group_id', String(filter.group_id))
if (filter.min_latency_ms && filter.min_latency_ms > 0) params.set('min_latency_ms', String(filter.min_latency_ms))
const qs = params.toString()
return qs ? `?${qs}` : ''
}
export async function getRecentOpsLogs(
filter: OpsLogFilter = {},
max?: number
): Promise<OpsLogRecentResponse> {
const params: Record<string, string | number> = {}
if (filter.min_status) params.min_status = filter.min_status
if (filter.model) params.model = filter.model
if (filter.account_id) params.account_id = filter.account_id
if (filter.group_id) params.group_id = filter.group_id
if (filter.min_latency_ms) params.min_latency_ms = filter.min_latency_ms
if (max) params.max = max
const { data } = await apiClient.get<OpsLogRecentResponse>('/admin/ops/logs/recent', { params })
return data
}
export interface SubscribeLogsOptions {
onEntry: (entry: OpsLogEntry) => void
onStatus?: (status: 'connecting' | 'live' | 'closed' | 'error') => void
onError?: (err: Error) => void
}
export type LogStreamHandle = {
close: () => void
}
/**
* Subscribe to /admin/ops/logs/stream via fetch + ReadableStream.
*
* EventSource doesn't allow custom headers, so we cannot use it for our
* Bearer-token authenticated SSE endpoint. fetch with `accept: text/event-stream`
* gets us the same wire protocol with Authorization support.
*
* The returned handle is idempotent calling close() multiple times is safe.
*/
export function subscribeOpsLogStream(
filter: OpsLogFilter,
opts: SubscribeLogsOptions
): LogStreamHandle {
const ctrl = new AbortController()
let closed = false
const close = () => {
if (closed) return
closed = true
ctrl.abort()
opts.onStatus?.('closed')
}
const run = async () => {
opts.onStatus?.('connecting')
const baseURL = (apiClient.defaults.baseURL ?? '/api/v1').replace(/\/+$/, '')
const url = `${baseURL}/admin/ops/logs/stream${buildLogQuery(filter)}`
const token = localStorage.getItem('auth_token') ?? ''
let resp: Response
try {
resp = await fetch(url, {
method: 'GET',
signal: ctrl.signal,
credentials: 'include',
headers: {
accept: 'text/event-stream',
...(token ? { Authorization: `Bearer ${token}` } : {})
}
})
} catch (e: any) {
if (closed || ctrl.signal.aborted) return
opts.onStatus?.('error')
opts.onError?.(e instanceof Error ? e : new Error(String(e)))
return
}
if (!resp.ok || !resp.body) {
opts.onStatus?.('error')
opts.onError?.(new Error(`SSE ${resp.status} ${resp.statusText}`))
return
}
opts.onStatus?.('live')
const reader = resp.body.getReader()
const decoder = new TextDecoder('utf-8')
let buffer = ''
try {
while (!closed) {
const { done, value } = await reader.read()
if (done) break
// Normalize CRLF and bare CR to LF before scanning. The SSE wire
// format permits \r\n line endings; without this, a trailing \r
// would leak into the JSON payload and silently break JSON.parse.
buffer += decoder.decode(value, { stream: true }).replace(/\r\n?/g, '\n')
// SSE events are separated by a blank line.
let sep: number
while ((sep = buffer.indexOf('\n\n')) !== -1) {
const rawEvent = buffer.slice(0, sep)
buffer = buffer.slice(sep + 2)
let dataLine = ''
for (const line of rawEvent.split('\n')) {
if (line.startsWith(':')) continue // comment / heartbeat
if (line.startsWith('data:')) {
// Per SSE spec, multiple `data:` lines in one event are
// joined by '\n', not concatenated. Our JSON-encoded entries
// never contain unescaped LF, but we follow the spec to
// future-proof against a field that emits raw bytes.
const piece = line.slice(5).replace(/^ /, '')
dataLine += dataLine ? '\n' + piece : piece
}
}
if (!dataLine) continue
try {
const parsed = JSON.parse(dataLine) as OpsLogEntry
opts.onEntry(parsed)
} catch {
// skip malformed payload — server uses well-formed JSON, this is defensive
}
}
}
} catch (e: any) {
if (!closed) {
opts.onStatus?.('error')
opts.onError?.(e instanceof Error ? e : new Error(String(e)))
}
} finally {
if (!closed) opts.onStatus?.('closed')
}
}
void run()
return { close }
}

View File

@ -718,7 +718,6 @@ const adminNavItems = computed((): NavItem[] => {
const baseItems: NavItem[] = [
{ path: '/admin/dashboard', label: t('nav.dashboard'), icon: DashboardIcon },
{ path: '/admin/ops', label: t('nav.ops'), icon: ChartIcon, featureFlag: flagOpsMonitoring },
{ path: '/admin/ops/logs', label: t('nav.opsLogStream'), icon: ChartIcon, featureFlag: flagOpsMonitoring },
{ path: '/admin/users', label: t('nav.users'), icon: UsersIcon, hideInSimpleMode: true },
{ path: '/admin/groups', label: t('nav.groups'), icon: FolderIcon, hideInSimpleMode: true },
{

View File

@ -367,7 +367,6 @@ export default {
proxies: 'Proxies',
redeemCodes: 'Redeem Codes',
ops: 'Ops',
opsLogStream: 'Live Logs',
promoCodes: 'Promo Codes',
settings: 'Settings',
myAccount: 'My Account',
@ -4466,39 +4465,6 @@ export default {
ops: {
title: 'Ops Monitoring',
description: 'Operational monitoring and troubleshooting',
logStream: {
title: 'Live Request Logs',
description: 'Subscribe to a real-time SSE stream of every gateway request',
empty: 'No logs yet — waiting for new requests or adjust filters',
loadError: 'Failed to load live log stream',
pause: 'Pause',
resume: 'Resume',
clearLogs: 'Clear',
scrollToBottom: 'Jump to latest',
statusBar: 'Showing {shown} · {pending} queued while paused · {dropped} dropped',
status: {
live: 'Live',
connecting: 'Connecting',
closed: 'Disconnected',
error: 'Error'
},
filter: {
allStatuses: 'All statuses',
modelPlaceholder: 'Model (exact)',
accountIdPlaceholder: 'Account ID',
minLatencyMs: 'Min latency ms'
},
col: {
time: 'Time',
method: 'Method',
path: 'Path',
status: 'Status',
latency: 'Latency',
model: 'Model',
accountId: 'Account',
error: 'Error'
}
},
// Dashboard
systemHealth: 'System Health',
overview: 'Overview',

View File

@ -367,7 +367,6 @@ export default {
proxies: 'IP管理',
redeemCodes: '兑换码',
ops: '运维监控',
opsLogStream: '实时日志',
promoCodes: '优惠码',
settings: '系统设置',
myAccount: '我的账户',
@ -4622,39 +4621,6 @@ export default {
ops: {
title: '运维监控',
description: '运维监控与排障',
logStream: {
title: '实时请求日志',
description: '订阅每条网关请求的 SSE 实时流',
empty: '暂无日志,请等待新请求或调整过滤器',
loadError: '加载实时日志失败',
pause: '暂停',
resume: '继续',
clearLogs: '清空',
scrollToBottom: '回到最新',
statusBar: '当前显示 {shown} 条,暂停队列 {pending} 条,已丢弃 {dropped} 条',
status: {
live: '实时',
connecting: '连接中',
closed: '已断开',
error: '错误'
},
filter: {
allStatuses: '全部状态',
modelPlaceholder: '模型 (精确匹配)',
accountIdPlaceholder: 'Account ID',
minLatencyMs: '最小延迟 ms'
},
col: {
time: '时间',
method: '方法',
path: '路径',
status: '状态',
latency: '延迟',
model: '模型',
accountId: '账号',
error: '错误'
}
},
// Dashboard
systemHealth: '系统健康',
overview: '概览',

View File

@ -401,18 +401,6 @@ const routes: RouteRecordRaw[] = [
descriptionKey: 'admin.ops.description'
}
},
{
path: '/admin/ops/logs',
name: 'AdminOpsLogStream',
component: () => import('@/views/admin/ops/OpsLogStreamView.vue'),
meta: {
requiresAuth: true,
requiresAdmin: true,
title: 'Live Log Stream',
titleKey: 'admin.ops.logStream.title',
descriptionKey: 'admin.ops.logStream.description'
}
},
{
path: '/admin/users',
name: 'AdminUsers',

View File

@ -1,390 +0,0 @@
<template>
<AppLayout>
<TablePageLayout>
<template #filters>
<div class="flex flex-wrap items-center justify-between gap-3">
<div class="flex items-center gap-2">
<h2 class="text-lg font-semibold text-gray-900 dark:text-gray-100">
{{ t('admin.ops.logStream.title') }}
</h2>
<span
:class="[
'inline-flex items-center gap-1 rounded-full px-2 py-0.5 text-xs font-medium',
statusBadge.cls
]"
:aria-live="'polite'"
>
<span class="h-1.5 w-1.5 rounded-full" :class="statusBadge.dot" />
{{ t(statusBadge.labelKey) }}
</span>
</div>
<div class="flex flex-wrap items-center gap-2">
<select v-model.number="filter.min_status" class="input h-8 w-auto text-sm">
<option :value="0">{{ t('admin.ops.logStream.filter.allStatuses') }}</option>
<option :value="200"> 200</option>
<option :value="400"> 400</option>
<option :value="500"> 500</option>
</select>
<input
v-model.trim="filter.model"
type="text"
:placeholder="t('admin.ops.logStream.filter.modelPlaceholder')"
class="input h-8 w-40 text-sm"
/>
<input
v-model.number="filter.account_id"
type="number"
min="0"
:placeholder="t('admin.ops.logStream.filter.accountIdPlaceholder')"
class="input h-8 w-32 text-sm"
/>
<input
v-model.number="filter.min_latency_ms"
type="number"
min="0"
:placeholder="t('admin.ops.logStream.filter.minLatencyMs')"
class="input h-8 w-32 text-sm"
/>
<button type="button" class="btn btn-secondary h-8 px-3 text-sm" @click="applyFilter()">
{{ t('common.apply') }}
</button>
<button
type="button"
class="btn btn-secondary h-8 px-3 text-sm"
:class="{ 'text-amber-600': paused }"
@click="togglePause"
>
{{ paused ? t('admin.ops.logStream.resume') : t('admin.ops.logStream.pause') }}
</button>
<button type="button" class="btn btn-secondary h-8 px-3 text-sm" @click="entries = []">
{{ t('admin.ops.logStream.clearLogs') }}
</button>
</div>
</div>
</template>
<template #content>
<div
ref="logBox"
class="h-[calc(100vh-280px)] overflow-y-auto rounded border border-gray-200 bg-white text-xs dark:border-dark-600 dark:bg-dark-800"
@scroll="onScroll"
>
<table class="min-w-full divide-y divide-gray-200 dark:divide-dark-600">
<thead class="sticky top-0 z-10 bg-gray-50 dark:bg-dark-700">
<tr>
<th class="px-2 py-1 text-left font-medium text-gray-500 dark:text-gray-400">
{{ t('admin.ops.logStream.col.time') }}
</th>
<th class="px-2 py-1 text-left font-medium text-gray-500 dark:text-gray-400">
{{ t('admin.ops.logStream.col.method') }}
</th>
<th class="px-2 py-1 text-left font-medium text-gray-500 dark:text-gray-400">
{{ t('admin.ops.logStream.col.path') }}
</th>
<th class="px-2 py-1 text-right font-medium text-gray-500 dark:text-gray-400">
{{ t('admin.ops.logStream.col.status') }}
</th>
<th class="px-2 py-1 text-right font-medium text-gray-500 dark:text-gray-400">
{{ t('admin.ops.logStream.col.latency') }}
</th>
<th class="px-2 py-1 text-left font-medium text-gray-500 dark:text-gray-400">
{{ t('admin.ops.logStream.col.model') }}
</th>
<th class="px-2 py-1 text-right font-medium text-gray-500 dark:text-gray-400">
{{ t('admin.ops.logStream.col.accountId') }}
</th>
<th class="px-2 py-1 text-left font-medium text-gray-500 dark:text-gray-400">
{{ t('admin.ops.logStream.col.error') }}
</th>
</tr>
</thead>
<tbody class="divide-y divide-gray-100 dark:divide-dark-700">
<tr v-if="entries.length === 0">
<td colspan="8" class="px-2 py-6 text-center text-gray-400">
{{ t('admin.ops.logStream.empty') }}
</td>
</tr>
<tr
v-for="row in entries"
:key="row.key"
class="hover:bg-gray-50 dark:hover:bg-dark-700"
>
<td class="whitespace-nowrap px-2 py-1 font-mono text-gray-500 dark:text-gray-400">
{{ formatTime(row.entry.time) }}
</td>
<td class="whitespace-nowrap px-2 py-1 font-mono">{{ row.entry.method || '—' }}</td>
<td class="px-2 py-1 font-mono text-gray-600 dark:text-gray-300">{{ row.entry.path || '—' }}</td>
<td class="whitespace-nowrap px-2 py-1 text-right font-mono" :class="statusColor(row.entry.status)">
{{ row.entry.status || '—' }}
</td>
<td class="whitespace-nowrap px-2 py-1 text-right font-mono" :class="latencyColor(row.entry.latency_ms)">
{{ row.entry.latency_ms ?? 0 }}ms
</td>
<td class="whitespace-nowrap px-2 py-1">{{ row.entry.model || '—' }}</td>
<td class="whitespace-nowrap px-2 py-1 text-right font-mono">{{ row.entry.account_id || '—' }}</td>
<td class="px-2 py-1 text-red-500 dark:text-red-400">{{ row.entry.error_message || '' }}</td>
</tr>
</tbody>
</table>
</div>
<div class="mt-2 flex items-center justify-between text-xs text-gray-500 dark:text-gray-400">
<span>
{{ t('admin.ops.logStream.statusBar', {
shown: entries.length,
pending: pending.length,
dropped: stats.droppedTotal
}) }}
</span>
<button
v-if="!autoScroll"
type="button"
class="rounded px-2 py-0.5 text-primary-600 hover:bg-primary-50 dark:hover:bg-dark-700"
@click="scrollToBottom"
>
{{ t('admin.ops.logStream.scrollToBottom') }}
</button>
</div>
</template>
</TablePageLayout>
</AppLayout>
</template>
<script setup lang="ts">
import { ref, reactive, computed, nextTick, onMounted, onBeforeUnmount } from 'vue'
import { useI18n } from 'vue-i18n'
import AppLayout from '@/components/layout/AppLayout.vue'
import TablePageLayout from '@/components/layout/TablePageLayout.vue'
import { useAppStore } from '@/stores/app'
import { opsAPI, type OpsLogEntry, type OpsLogFilter, type LogStreamHandle } from '@/api/admin/ops'
const { t } = useI18n()
const appStore = useAppStore()
const MAX_BUFFERED = 2000
const RECONNECT_BASE_MS = 1000
const RECONNECT_MAX_MS = 30_000
interface KeyedEntry {
key: number
entry: OpsLogEntry
}
const filter = reactive<OpsLogFilter>({
min_status: 0,
model: '',
account_id: 0,
min_latency_ms: 0
})
let nextEntryKey = 0
function wrap(entry: OpsLogEntry): KeyedEntry {
return { key: ++nextEntryKey, entry }
}
const entries = ref<KeyedEntry[]>([])
const paused = ref(false)
const pending = ref<KeyedEntry[]>([])
const stats = reactive({ droppedTotal: 0 })
const status = ref<'connecting' | 'live' | 'closed' | 'error'>('connecting')
const autoScroll = ref(true)
const logBox = ref<HTMLElement | null>(null)
const statusBadge = computed(() => {
switch (status.value) {
case 'live':
return {
cls: 'bg-emerald-100 text-emerald-700 dark:bg-emerald-900 dark:text-emerald-300',
dot: 'bg-emerald-500',
labelKey: 'admin.ops.logStream.status.live'
}
case 'connecting':
return {
cls: 'bg-amber-100 text-amber-700 dark:bg-amber-900 dark:text-amber-300',
dot: 'bg-amber-500 animate-pulse',
labelKey: 'admin.ops.logStream.status.connecting'
}
case 'error':
return {
cls: 'bg-red-100 text-red-700 dark:bg-red-900 dark:text-red-300',
dot: 'bg-red-500',
labelKey: 'admin.ops.logStream.status.error'
}
default:
return {
cls: 'bg-gray-100 text-gray-600 dark:bg-dark-600 dark:text-gray-300',
dot: 'bg-gray-400',
labelKey: 'admin.ops.logStream.status.closed'
}
}
})
let handle: LogStreamHandle | null = null
let primeAbort: AbortController | null = null
let reconnectTimer: ReturnType<typeof setTimeout> | null = null
let reconnectAttempt = 0
let mounted = true
function formatTime(timeStr: string | undefined): string {
if (!timeStr) return '—'
try {
const d = new Date(timeStr)
return d.toLocaleTimeString()
} catch {
return timeStr
}
}
function statusColor(s: number): string {
if (!s) return ''
if (s >= 500) return 'text-red-600 dark:text-red-400'
if (s >= 400) return 'text-amber-600 dark:text-amber-400'
if (s >= 300) return 'text-blue-600 dark:text-blue-400'
return 'text-emerald-600 dark:text-emerald-400'
}
function latencyColor(ms: number): string {
if (ms >= 5000) return 'text-red-600 dark:text-red-400'
if (ms >= 2000) return 'text-amber-600 dark:text-amber-400'
return 'text-gray-700 dark:text-gray-300'
}
function clampBuffer() {
if (entries.value.length > MAX_BUFFERED) {
entries.value = entries.value.slice(entries.value.length - MAX_BUFFERED)
}
}
function pushEntry(e: OpsLogEntry) {
const wrapped = wrap(e)
if (paused.value) {
pending.value.push(wrapped)
if (pending.value.length > MAX_BUFFERED) pending.value.shift()
return
}
entries.value.push(wrapped)
clampBuffer()
if (autoScroll.value) {
nextTick(() => {
if (!mounted || !logBox.value) return
logBox.value.scrollTop = logBox.value.scrollHeight
})
}
}
function togglePause() {
paused.value = !paused.value
if (!paused.value && pending.value.length > 0) {
entries.value.push(...pending.value)
pending.value = []
clampBuffer()
if (autoScroll.value) {
nextTick(() => {
if (!mounted || !logBox.value) return
logBox.value.scrollTop = logBox.value.scrollHeight
})
}
}
}
function onScroll(e: Event) {
const el = e.target as HTMLElement
// Treat being within 80px of the bottom as "at bottom" so auto-scroll
// resumes naturally when the user catches up.
autoScroll.value = el.scrollHeight - el.scrollTop - el.clientHeight < 80
}
function scrollToBottom() {
if (!logBox.value) return
logBox.value.scrollTop = logBox.value.scrollHeight
autoScroll.value = true
}
async function primeFromRecent() {
// Abort any in-flight priming fetch without this a rapid filter reapply
// can race two fetches and let the older one overwrite newer state.
if (primeAbort) primeAbort.abort()
primeAbort = new AbortController()
const ctrl = primeAbort
try {
const resp = await opsAPI.getRecentOpsLogs(buildFilter(), 200)
if (!mounted || ctrl !== primeAbort) return
entries.value = resp.entries.map(wrap)
stats.droppedTotal = resp.dropped_total
} catch (e: any) {
if (!mounted || ctrl !== primeAbort) return
appStore.showError(e?.response?.data?.message || e?.message || t('admin.ops.logStream.loadError'))
}
}
function buildFilter(): OpsLogFilter {
const out: OpsLogFilter = {}
if (filter.min_status && filter.min_status > 0) out.min_status = filter.min_status
if (filter.model && filter.model.trim()) out.model = filter.model.trim()
if (filter.account_id && filter.account_id > 0) out.account_id = filter.account_id
if (filter.min_latency_ms && filter.min_latency_ms > 0) out.min_latency_ms = filter.min_latency_ms
return out
}
function applyFilter() {
reconnectAttempt = 0
reconnect()
}
function scheduleReconnect() {
if (!mounted) return
if (reconnectTimer) return
const delay = Math.min(RECONNECT_BASE_MS * 2 ** reconnectAttempt, RECONNECT_MAX_MS)
reconnectAttempt = Math.min(reconnectAttempt + 1, 10)
reconnectTimer = setTimeout(() => {
reconnectTimer = null
if (!mounted) return
reconnect()
}, delay)
}
function reconnect() {
if (reconnectTimer) {
clearTimeout(reconnectTimer)
reconnectTimer = null
}
closeStream()
void primeFromRecent()
handle = opsAPI.subscribeOpsLogStream(buildFilter(), {
onEntry: pushEntry,
onStatus: s => {
status.value = s
if (s === 'live') {
reconnectAttempt = 0
}
if (s === 'closed' || s === 'error') {
scheduleReconnect()
}
},
onError: e => {
if (!mounted) return
appStore.showError(e.message)
}
})
}
function closeStream() {
if (handle) {
handle.close()
handle = null
}
}
onMounted(() => {
reconnect()
})
onBeforeUnmount(() => {
mounted = false
if (primeAbort) primeAbort.abort()
if (reconnectTimer) {
clearTimeout(reconnectTimer)
reconnectTimer = null
}
closeStream()
})
</script>