Adds GET /api/v1/admin/ops/ws/requests — a fan-out WebSocket that pushes
per-request metadata (method, path, model, account_id, status, latency_ms)
to all connected admin clients the moment each gateway dispatch completes.
- service/request_event_bus.go: lock-free pub/sub with non-blocking drop
when per-subscriber buffer (64 slots) is full; nil-safe Publish
- service/request_event_bus_test.go: 6 tests (basic, fanout, drop, nil, close)
- GatewayHandler: records reqStartTime at entry; defer emits RequestEvent on
every return; sets status success/error/rate_limited in both Gemini and
Anthropic dispatch paths
- OpsHandler: accepts *RequestEventBus; wires it to RequestStreamWSHandler
- ops_ws_requests_handler.go: subscribes to bus, pushes JSON per event,
reuses existing upgrader/conn-limit/ping-pong infrastructure
- Route: ws.GET("/requests", ...) alongside existing /ws/qps
- wire_gen.go: requestEventBus shared between OpsHandler and GatewayHandler
199 lines
5.3 KiB
Go
199 lines
5.3 KiB
Go
package admin
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
|
|
"github.com/Wei-Shaw/sub2api/internal/service"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
type requestStreamWSMessage struct {
|
|
Type string `json:"type"`
|
|
Data service.RequestEvent `json:"data"`
|
|
}
|
|
|
|
// RequestStreamWSHandler streams real-time request events to WebSocket clients.
|
|
// GET /api/v1/admin/ops/ws/requests
|
|
//
|
|
// Each connected client receives a JSON message per gateway dispatch:
|
|
//
|
|
// {"type":"request_event","data":{"timestamp":...,"method":"POST","path":"/v1/messages",
|
|
// "model":"claude-3-5-sonnet-20241022","account_id":42,"status":"success","latency_ms":1230}}
|
|
func (h *OpsHandler) RequestStreamWSHandler(c *gin.Context) {
|
|
clientIP := requestClientIP(c.Request)
|
|
|
|
if h == nil || h.opsService == nil {
|
|
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "ops service not initialized"})
|
|
return
|
|
}
|
|
if h.requestEventBus == nil {
|
|
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "request event bus not initialized"})
|
|
return
|
|
}
|
|
|
|
if !h.opsService.IsRealtimeMonitoringEnabled(c.Request.Context()) {
|
|
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
|
if err != nil {
|
|
c.JSON(http.StatusNotFound, gin.H{"error": "ops realtime monitoring is disabled"})
|
|
return
|
|
}
|
|
closeWS(conn, opsWSCloseRealtimeDisabled, "realtime_disabled")
|
|
return
|
|
}
|
|
|
|
if !tryAcquireOpsWSTotalSlot(opsWSLimits.MaxConns) {
|
|
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] connection limit reached: %d/%d", wsConnCount.Load(), opsWSLimits.MaxConns)
|
|
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "too many connections"})
|
|
return
|
|
}
|
|
defer func() {
|
|
if wsConnCount.Add(-1) == 0 {
|
|
scheduleQPSWSIdleStop()
|
|
}
|
|
}()
|
|
|
|
if opsWSLimits.MaxConnsPerIP > 0 && clientIP != "" {
|
|
if !tryAcquireOpsWSIPSlot(clientIP, opsWSLimits.MaxConnsPerIP) {
|
|
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] per-ip limit reached: ip=%s limit=%d", clientIP, opsWSLimits.MaxConnsPerIP)
|
|
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "too many connections"})
|
|
return
|
|
}
|
|
defer releaseOpsWSIPSlot(clientIP)
|
|
}
|
|
|
|
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
|
if err != nil {
|
|
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] upgrade failed: %v", err)
|
|
return
|
|
}
|
|
defer func() { _ = conn.Close() }()
|
|
|
|
handleRequestStreamWebSocket(c.Request.Context(), conn, h.requestEventBus)
|
|
}
|
|
|
|
func handleRequestStreamWebSocket(parentCtx context.Context, conn *websocket.Conn, bus *service.RequestEventBus) {
|
|
if conn == nil || bus == nil {
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(parentCtx)
|
|
defer cancel()
|
|
|
|
subID, eventCh := bus.Subscribe()
|
|
defer bus.Unsubscribe(subID)
|
|
|
|
var closeOnce sync.Once
|
|
closeConn := func() {
|
|
closeOnce.Do(func() { _ = conn.Close() })
|
|
}
|
|
|
|
closeFrameCh := make(chan []byte, 1)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
defer cancel()
|
|
|
|
conn.SetReadLimit(qpsWSMaxReadBytes)
|
|
if err := conn.SetReadDeadline(time.Now().Add(qpsWSPongWait)); err != nil {
|
|
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] set read deadline failed: %v", err)
|
|
return
|
|
}
|
|
conn.SetPongHandler(func(string) error {
|
|
return conn.SetReadDeadline(time.Now().Add(qpsWSPongWait))
|
|
})
|
|
conn.SetCloseHandler(func(code int, text string) error {
|
|
select {
|
|
case closeFrameCh <- websocket.FormatCloseMessage(code, text):
|
|
default:
|
|
}
|
|
cancel()
|
|
return nil
|
|
})
|
|
|
|
for {
|
|
_, _, err := conn.ReadMessage()
|
|
if err != nil {
|
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) {
|
|
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] read failed: %v", err)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
pingTicker := time.NewTicker(qpsWSPingInterval)
|
|
defer pingTicker.Stop()
|
|
|
|
writeWithTimeout := func(messageType int, data []byte) error {
|
|
if err := conn.SetWriteDeadline(time.Now().Add(qpsWSWriteTimeout)); err != nil {
|
|
return err
|
|
}
|
|
return conn.WriteMessage(messageType, data)
|
|
}
|
|
|
|
sendClose := func(closeFrame []byte) {
|
|
if closeFrame == nil {
|
|
closeFrame = websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
|
|
}
|
|
_ = writeWithTimeout(websocket.CloseMessage, closeFrame)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case evt, ok := <-eventCh:
|
|
if !ok {
|
|
// channel closed by Unsubscribe
|
|
sendClose(nil)
|
|
closeConn()
|
|
wg.Wait()
|
|
return
|
|
}
|
|
msg, err := json.Marshal(requestStreamWSMessage{Type: "request_event", Data: evt})
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if err := writeWithTimeout(websocket.TextMessage, msg); err != nil {
|
|
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] write failed: %v", err)
|
|
cancel()
|
|
closeConn()
|
|
wg.Wait()
|
|
return
|
|
}
|
|
|
|
case <-pingTicker.C:
|
|
if err := writeWithTimeout(websocket.PingMessage, nil); err != nil {
|
|
logger.LegacyPrintf("handler.admin.ops_ws_requests", "[OpsWSReq] ping failed: %v", err)
|
|
cancel()
|
|
closeConn()
|
|
wg.Wait()
|
|
return
|
|
}
|
|
|
|
case closeFrame := <-closeFrameCh:
|
|
sendClose(closeFrame)
|
|
closeConn()
|
|
wg.Wait()
|
|
return
|
|
|
|
case <-ctx.Done():
|
|
var closeFrame []byte
|
|
select {
|
|
case closeFrame = <-closeFrameCh:
|
|
default:
|
|
}
|
|
sendClose(closeFrame)
|
|
closeConn()
|
|
wg.Wait()
|
|
return
|
|
}
|
|
}
|
|
}
|