package admin import ( "context" "encoding/json" "net/http" "sync" "time" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" "github.com/Wei-Shaw/sub2api/internal/service" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) type requestStreamWSMessage struct { Type string `json:"type"` Data service.RequestEvent `json:"data"` } // RequestStreamWSHandler streams real-time request events to WebSocket clients. // GET /api/v1/admin/ops/ws/requests // // Each connected client receives a JSON message per gateway dispatch: // // {"type":"request_event","data":{"timestamp":...,"method":"POST","path":"/v1/messages", // "model":"claude-3-5-sonnet-20241022","account_id":42,"status":"success","latency_ms":1230}} func (h *OpsHandler) RequestStreamWSHandler(c *gin.Context) { clientIP := requestClientIP(c.Request) if h == nil || h.opsService == nil { c.JSON(http.StatusServiceUnavailable, gin.H{"error": "ops service not initialized"}) return } if h.requestEventBus == nil { c.JSON(http.StatusServiceUnavailable, gin.H{"error": "request event bus not initialized"}) return } if !h.opsService.IsRealtimeMonitoringEnabled(c.Request.Context()) { conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { c.JSON(http.StatusNotFound, gin.H{"error": "ops realtime monitoring is disabled"}) return } closeWS(conn, opsWSCloseRealtimeDisabled, "realtime_disabled") return } if !tryAcquireOpsWSTotalSlot(opsWSLimits.MaxConns) { logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] connection limit reached: %d/%d", wsConnCount.Load(), opsWSLimits.MaxConns) c.JSON(http.StatusServiceUnavailable, gin.H{"error": "too many connections"}) return } defer func() { if wsConnCount.Add(-1) == 0 { scheduleQPSWSIdleStop() } }() if opsWSLimits.MaxConnsPerIP > 0 && clientIP != "" { if !tryAcquireOpsWSIPSlot(clientIP, opsWSLimits.MaxConnsPerIP) { logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] per-ip limit reached: ip=%s limit=%d", clientIP, opsWSLimits.MaxConnsPerIP) c.JSON(http.StatusServiceUnavailable, gin.H{"error": "too many connections"}) return } defer releaseOpsWSIPSlot(clientIP) } conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] upgrade failed: %v", err) return } defer func() { _ = conn.Close() }() handleRequestStreamWebSocket(c.Request.Context(), conn, h.requestEventBus) } func handleRequestStreamWebSocket(parentCtx context.Context, conn *websocket.Conn, bus *service.RequestEventBus) { if conn == nil || bus == nil { return } ctx, cancel := context.WithCancel(parentCtx) defer cancel() subID, eventCh := bus.Subscribe() defer bus.Unsubscribe(subID) var closeOnce sync.Once closeConn := func() { closeOnce.Do(func() { _ = conn.Close() }) } closeFrameCh := make(chan []byte, 1) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() defer cancel() conn.SetReadLimit(qpsWSMaxReadBytes) if err := conn.SetReadDeadline(time.Now().Add(qpsWSPongWait)); err != nil { logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] set read deadline failed: %v", err) return } conn.SetPongHandler(func(string) error { return conn.SetReadDeadline(time.Now().Add(qpsWSPongWait)) }) conn.SetCloseHandler(func(code int, text string) error { select { case closeFrameCh <- websocket.FormatCloseMessage(code, text): default: } cancel() return nil }) for { _, _, err := conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) { logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] read failed: %v", err) } return } } }() pingTicker := time.NewTicker(qpsWSPingInterval) defer pingTicker.Stop() writeWithTimeout := func(messageType int, data []byte) error { if err := conn.SetWriteDeadline(time.Now().Add(qpsWSWriteTimeout)); err != nil { return err } return conn.WriteMessage(messageType, data) } sendClose := func(closeFrame []byte) { if closeFrame == nil { closeFrame = websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") } _ = writeWithTimeout(websocket.CloseMessage, closeFrame) } for { select { case evt, ok := <-eventCh: if !ok { // channel closed by Unsubscribe sendClose(nil) closeConn() wg.Wait() return } msg, err := json.Marshal(requestStreamWSMessage{Type: "request_event", Data: evt}) if err != nil { continue } if err := writeWithTimeout(websocket.TextMessage, msg); err != nil { logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] write failed: %v", err) cancel() closeConn() wg.Wait() return } case <-pingTicker.C: if err := writeWithTimeout(websocket.PingMessage, nil); err != nil { logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] ping failed: %v", err) cancel() closeConn() wg.Wait() return } case closeFrame := <-closeFrameCh: sendClose(closeFrame) closeConn() wg.Wait() return case <-ctx.Done(): var closeFrame []byte select { case closeFrame = <-closeFrameCh: default: } sendClose(closeFrame) closeConn() wg.Wait() return } } }