sub2api/backend/internal/service/request_event_bus_test.go
win d1e2d39c26 feat(viewer): add real-time request stream WebSocket endpoint
Adds GET /api/v1/admin/ops/ws/requests — a fan-out WebSocket that pushes
per-request metadata (method, path, model, account_id, status, latency_ms)
to all connected admin clients the moment each gateway dispatch completes.

- service/request_event_bus.go: lock-free pub/sub with non-blocking drop
  when per-subscriber buffer (64 slots) is full; nil-safe Publish
- service/request_event_bus_test.go: 6 tests (basic, fanout, drop, nil, close)
- GatewayHandler: records reqStartTime at entry; defer emits RequestEvent on
  every return; sets status success/error/rate_limited in both Gemini and
  Anthropic dispatch paths
- OpsHandler: accepts *RequestEventBus; wires it to RequestStreamWSHandler
- ops_ws_requests_handler.go: subscribes to bus, pushes JSON per event,
  reuses existing upgrader/conn-limit/ping-pong infrastructure
- Route: ws.GET("/requests", ...) alongside existing /ws/qps
- wire_gen.go: requestEventBus shared between OpsHandler and GatewayHandler
2026-04-29 01:48:15 +08:00

101 lines
2.3 KiB
Go

package service
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestRequestEventBus_PublishToSubscriber(t *testing.T) {
bus := NewRequestEventBus()
id, ch := bus.Subscribe()
defer bus.Unsubscribe(id)
evt := RequestEvent{Model: "claude-3", Status: "success", LatencyMS: 100}
bus.Publish(evt)
select {
case got := <-ch:
assert.Equal(t, evt, got)
case <-time.After(time.Second):
t.Fatal("timed out waiting for event")
}
}
func TestRequestEventBus_MultipleSubscribers(t *testing.T) {
bus := NewRequestEventBus()
id1, ch1 := bus.Subscribe()
id2, ch2 := bus.Subscribe()
defer bus.Unsubscribe(id1)
defer bus.Unsubscribe(id2)
evt := RequestEvent{Model: "claude-3", Status: "error"}
bus.Publish(evt)
for _, ch := range []<-chan RequestEvent{ch1, ch2} {
select {
case got := <-ch:
assert.Equal(t, evt, got)
case <-time.After(time.Second):
t.Fatal("timed out waiting for event on one subscriber")
}
}
}
func TestRequestEventBus_UnsubscribeClosesChannel(t *testing.T) {
bus := NewRequestEventBus()
id, ch := bus.Subscribe()
bus.Unsubscribe(id)
// Channel should be closed.
_, ok := <-ch
assert.False(t, ok, "channel should be closed after Unsubscribe")
}
func TestRequestEventBus_UnsubscribedMissesEvents(t *testing.T) {
bus := NewRequestEventBus()
id, _ := bus.Subscribe()
bus.Unsubscribe(id)
// Publish after unsubscribe should not panic.
require.NotPanics(t, func() {
bus.Publish(RequestEvent{Model: "test"})
})
}
func TestRequestEventBus_DropWhenFull(t *testing.T) {
bus := NewRequestEventBus()
id, ch := bus.Subscribe()
defer bus.Unsubscribe(id)
// Fill the buffer then publish one more — should drop, not block.
evt := RequestEvent{Model: "model", Status: "success"}
for i := 0; i < requestEventBufSize; i++ {
bus.Publish(evt)
}
// This publish should return immediately (dropped).
done := make(chan struct{})
go func() {
bus.Publish(evt)
close(done)
}()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("Publish blocked when buffer was full")
}
assert.Len(t, ch, requestEventBufSize)
}
func TestRequestEventBus_NilSafePublish(t *testing.T) {
var bus *RequestEventBus
require.NotPanics(t, func() {
bus.Publish(RequestEvent{Model: "test"})
})
}