sub2api/backend/internal/handler/admin/ops_ws_requests_handler.go
win d1e2d39c26 feat(viewer): add real-time request stream WebSocket endpoint
Adds GET /api/v1/admin/ops/ws/requests — a fan-out WebSocket that pushes
per-request metadata (method, path, model, account_id, status, latency_ms)
to all connected admin clients the moment each gateway dispatch completes.

- service/request_event_bus.go: lock-free pub/sub with non-blocking drop
  when per-subscriber buffer (64 slots) is full; nil-safe Publish
- service/request_event_bus_test.go: 6 tests (basic, fanout, drop, nil, close)
- GatewayHandler: records reqStartTime at entry; defer emits RequestEvent on
  every return; sets status success/error/rate_limited in both Gemini and
  Anthropic dispatch paths
- OpsHandler: accepts *RequestEventBus; wires it to RequestStreamWSHandler
- ops_ws_requests_handler.go: subscribes to bus, pushes JSON per event,
  reuses existing upgrader/conn-limit/ping-pong infrastructure
- Route: ws.GET("/requests", ...) alongside existing /ws/qps
- wire_gen.go: requestEventBus shared between OpsHandler and GatewayHandler
2026-04-29 01:48:15 +08:00

199 lines
5.3 KiB
Go

package admin
import (
"context"
"encoding/json"
"net/http"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
type requestStreamWSMessage struct {
Type string `json:"type"`
Data service.RequestEvent `json:"data"`
}
// RequestStreamWSHandler streams real-time request events to WebSocket clients.
// GET /api/v1/admin/ops/ws/requests
//
// Each connected client receives a JSON message per gateway dispatch:
//
// {"type":"request_event","data":{"timestamp":...,"method":"POST","path":"/v1/messages",
// "model":"claude-3-5-sonnet-20241022","account_id":42,"status":"success","latency_ms":1230}}
func (h *OpsHandler) RequestStreamWSHandler(c *gin.Context) {
clientIP := requestClientIP(c.Request)
if h == nil || h.opsService == nil {
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "ops service not initialized"})
return
}
if h.requestEventBus == nil {
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "request event bus not initialized"})
return
}
if !h.opsService.IsRealtimeMonitoringEnabled(c.Request.Context()) {
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "ops realtime monitoring is disabled"})
return
}
closeWS(conn, opsWSCloseRealtimeDisabled, "realtime_disabled")
return
}
if !tryAcquireOpsWSTotalSlot(opsWSLimits.MaxConns) {
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] connection limit reached: %d/%d", wsConnCount.Load(), opsWSLimits.MaxConns)
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "too many connections"})
return
}
defer func() {
if wsConnCount.Add(-1) == 0 {
scheduleQPSWSIdleStop()
}
}()
if opsWSLimits.MaxConnsPerIP > 0 && clientIP != "" {
if !tryAcquireOpsWSIPSlot(clientIP, opsWSLimits.MaxConnsPerIP) {
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] per-ip limit reached: ip=%s limit=%d", clientIP, opsWSLimits.MaxConnsPerIP)
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "too many connections"})
return
}
defer releaseOpsWSIPSlot(clientIP)
}
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] upgrade failed: %v", err)
return
}
defer func() { _ = conn.Close() }()
handleRequestStreamWebSocket(c.Request.Context(), conn, h.requestEventBus)
}
func handleRequestStreamWebSocket(parentCtx context.Context, conn *websocket.Conn, bus *service.RequestEventBus) {
if conn == nil || bus == nil {
return
}
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()
subID, eventCh := bus.Subscribe()
defer bus.Unsubscribe(subID)
var closeOnce sync.Once
closeConn := func() {
closeOnce.Do(func() { _ = conn.Close() })
}
closeFrameCh := make(chan []byte, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
conn.SetReadLimit(qpsWSMaxReadBytes)
if err := conn.SetReadDeadline(time.Now().Add(qpsWSPongWait)); err != nil {
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] set read deadline failed: %v", err)
return
}
conn.SetPongHandler(func(string) error {
return conn.SetReadDeadline(time.Now().Add(qpsWSPongWait))
})
conn.SetCloseHandler(func(code int, text string) error {
select {
case closeFrameCh <- websocket.FormatCloseMessage(code, text):
default:
}
cancel()
return nil
})
for {
_, _, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) {
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] read failed: %v", err)
}
return
}
}
}()
pingTicker := time.NewTicker(qpsWSPingInterval)
defer pingTicker.Stop()
writeWithTimeout := func(messageType int, data []byte) error {
if err := conn.SetWriteDeadline(time.Now().Add(qpsWSWriteTimeout)); err != nil {
return err
}
return conn.WriteMessage(messageType, data)
}
sendClose := func(closeFrame []byte) {
if closeFrame == nil {
closeFrame = websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
}
_ = writeWithTimeout(websocket.CloseMessage, closeFrame)
}
for {
select {
case evt, ok := <-eventCh:
if !ok {
// channel closed by Unsubscribe
sendClose(nil)
closeConn()
wg.Wait()
return
}
msg, err := json.Marshal(requestStreamWSMessage{Type: "request_event", Data: evt})
if err != nil {
continue
}
if err := writeWithTimeout(websocket.TextMessage, msg); err != nil {
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] write failed: %v", err)
cancel()
closeConn()
wg.Wait()
return
}
case <-pingTicker.C:
if err := writeWithTimeout(websocket.PingMessage, nil); err != nil {
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] ping failed: %v", err)
cancel()
closeConn()
wg.Wait()
return
}
case closeFrame := <-closeFrameCh:
sendClose(closeFrame)
closeConn()
wg.Wait()
return
case <-ctx.Done():
var closeFrame []byte
select {
case closeFrame = <-closeFrameCh:
default:
}
sendClose(closeFrame)
closeConn()
wg.Wait()
return
}
}
}