sub2api/backend/internal/service/ops_log_broadcaster_test.go
win de048fad25 chore(wip): save Windsurf/Antigravity/ops customizations before upstream merge
WIP commit保存以下定制工作以便后续合并 upstream v0.1.124-125:
- Windsurf: tier access service, NLU extractor, cold threshold, Google login
- Antigravity: client/oauth 调整
- Ops: log stream handler/broadcaster/middleware, OpsLogStreamView
- Frontend: WindsurfLoginModal Google, GoogleIcon, AccountsView, sidebar/router/i18n
2026-05-09 00:41:19 +08:00

268 lines
6.7 KiB
Go

package service
import (
"sync"
"testing"
"time"
)
func TestOpsLogBroadcaster_FanOutDeliversToMatchingSubscribers(t *testing.T) {
b := NewOpsLogBroadcaster(16)
chHigh, unHigh := b.Subscribe(OpsLogFilter{MinStatus: 500}, 8)
defer unHigh()
chAll, unAll := b.Subscribe(OpsLogFilter{}, 8)
defer unAll()
b.Publish(OpsLogEntry{Status: 200, Model: "claude-sonnet-4.6"})
b.Publish(OpsLogEntry{Status: 503, Model: "kimi-k2.5"})
got200 := receiveOrTimeout(t, chAll, 200*time.Millisecond)
got503 := receiveOrTimeout(t, chAll, 200*time.Millisecond)
if got200.Status != 200 || got503.Status != 503 {
t.Fatalf("unexpected fan-out: %d / %d", got200.Status, got503.Status)
}
gotHigh := receiveOrTimeout(t, chHigh, 200*time.Millisecond)
if gotHigh.Status != 503 {
t.Fatalf("filter MinStatus=500 should drop 200, got %d", gotHigh.Status)
}
expectNoMessage(t, chHigh, 50*time.Millisecond)
}
func TestOpsLogBroadcaster_FilterByModelAndAccount(t *testing.T) {
b := NewOpsLogBroadcaster(0)
chKimi, unKimi := b.Subscribe(OpsLogFilter{Model: "kimi-k2.5"}, 4)
defer unKimi()
chAcct, unAcct := b.Subscribe(OpsLogFilter{AccountID: 42}, 4)
defer unAcct()
b.Publish(OpsLogEntry{Status: 200, Model: "claude-sonnet-4.6", AccountID: 1})
b.Publish(OpsLogEntry{Status: 200, Model: "kimi-k2.5", AccountID: 42})
got := receiveOrTimeout(t, chKimi, 200*time.Millisecond)
if got.Model != "kimi-k2.5" {
t.Fatalf("expected kimi entry, got %+v", got)
}
expectNoMessage(t, chKimi, 50*time.Millisecond)
gotA := receiveOrTimeout(t, chAcct, 200*time.Millisecond)
if gotA.AccountID != 42 {
t.Fatalf("expected account 42, got %d", gotA.AccountID)
}
}
func TestOpsLogBroadcaster_NeverBlocksOnSlowSubscriber(t *testing.T) {
b := NewOpsLogBroadcaster(0)
// Subscriber with buffer=1, never reads. After the second Publish,
// the entry must be dropped instead of blocking the publisher.
_, unsub := b.Subscribe(OpsLogFilter{}, 1)
defer unsub()
done := make(chan struct{})
go func() {
for i := 0; i < 100; i++ {
b.Publish(OpsLogEntry{Status: 200})
}
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("publisher blocked on slow subscriber")
}
_, dropped, _ := b.Stats()
if dropped == 0 {
t.Fatal("expected dropped count > 0 when subscriber buffer overflows")
}
}
func TestOpsLogBroadcaster_HistorySnapshot(t *testing.T) {
b := NewOpsLogBroadcaster(3)
for i := 1; i <= 5; i++ {
b.Publish(OpsLogEntry{Status: 200 + i})
}
got := b.Snapshot(OpsLogFilter{}, 0)
if len(got) != 3 {
t.Fatalf("expected ring of 3, got %d", len(got))
}
// Oldest → newest
if got[0].Status != 203 || got[1].Status != 204 || got[2].Status != 205 {
t.Fatalf("expected 203/204/205, got %d/%d/%d", got[0].Status, got[1].Status, got[2].Status)
}
}
func TestOpsLogBroadcaster_HistoryAppliesFilter(t *testing.T) {
b := NewOpsLogBroadcaster(8)
b.Publish(OpsLogEntry{Status: 200})
b.Publish(OpsLogEntry{Status: 500})
b.Publish(OpsLogEntry{Status: 503})
got := b.Snapshot(OpsLogFilter{MinStatus: 500}, 0)
if len(got) != 2 {
t.Fatalf("expected 2 high-status entries, got %d: %+v", len(got), got)
}
}
func TestOpsLogBroadcaster_UnsubscribeIdempotent(t *testing.T) {
b := NewOpsLogBroadcaster(0)
_, unsub := b.Subscribe(OpsLogFilter{}, 1)
unsub()
unsub() // second call must not panic
unsub() // and a third
}
func TestOpsLogBroadcaster_ZeroTimeFilledIn(t *testing.T) {
b := NewOpsLogBroadcaster(2)
b.Publish(OpsLogEntry{Status: 200}) // Time intentionally zero
got := b.Snapshot(OpsLogFilter{}, 0)
if len(got) != 1 {
t.Fatalf("expected 1 entry, got %d", len(got))
}
if got[0].Time.IsZero() {
t.Fatal("Publish should populate zero Time with time.Now()")
}
}
func TestOpsLogBroadcaster_ConcurrentSafe(t *testing.T) {
b := NewOpsLogBroadcaster(64)
// Spin a few subscribers and producers; rely on -race to surface
// any concurrency bugs in the fan-out path.
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
ch, unsub := b.Subscribe(OpsLogFilter{}, 32)
wg.Add(1)
go func() {
defer wg.Done()
defer unsub()
deadline := time.Now().Add(200 * time.Millisecond)
for time.Now().Before(deadline) {
select {
case <-ch:
case <-time.After(5 * time.Millisecond):
}
}
}()
}
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 200; j++ {
b.Publish(OpsLogEntry{Status: 200, Model: "x"})
}
}()
}
wg.Wait()
pub, _, _ := b.Stats()
if pub != 800 {
t.Fatalf("expected 800 publishes, got %d", pub)
}
}
// TestOpsLogBroadcaster_ConcurrentUnsubscribeNoPanic exercises the exact
// race the audit identified: a Publish goroutine has snapped a subscription
// pointer while another goroutine unsubscribes (close(ch)) the moment before
// the send. Without the closed-flag guard in Publish, this races into
// "send on closed channel" and panics. With the guard, Publish observes
// closed=true and skips the send. Run with -race.
func TestOpsLogBroadcaster_ConcurrentUnsubscribeNoPanic(t *testing.T) {
b := NewOpsLogBroadcaster(0)
var wg sync.WaitGroup
for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 200; j++ {
ch, unsub := b.Subscribe(OpsLogFilter{}, 1)
// Drain non-blockingly until the next publish lands.
done := make(chan struct{})
go func() {
defer close(done)
timer := time.NewTimer(50 * time.Millisecond)
defer timer.Stop()
for {
select {
case <-ch:
case <-timer.C:
return
}
}
}()
b.Publish(OpsLogEntry{Status: 200})
unsub()
b.Publish(OpsLogEntry{Status: 200}) // post-unsub publish must not panic
<-done
}
}()
}
wg.Wait()
}
// TestOpsLogBroadcaster_SnapshotConcurrentWithPublish ensures Snapshot is
// safe under concurrent Publish (verifies subsMu vs historyMu coexistence).
func TestOpsLogBroadcaster_SnapshotConcurrentWithPublish(t *testing.T) {
b := NewOpsLogBroadcaster(32)
stop := make(chan struct{})
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
b.Publish(OpsLogEntry{Status: 200})
}
}
}()
go func() {
defer wg.Done()
for i := 0; i < 200; i++ {
_ = b.Snapshot(OpsLogFilter{}, 0)
}
}()
time.Sleep(50 * time.Millisecond)
close(stop)
wg.Wait()
}
// helpers --------------------------------------------------------------
func receiveOrTimeout(t *testing.T, ch <-chan OpsLogEntry, d time.Duration) OpsLogEntry {
t.Helper()
select {
case e := <-ch:
return e
case <-time.After(d):
t.Fatalf("timeout waiting for entry after %s", d)
}
return OpsLogEntry{}
}
func expectNoMessage(t *testing.T, ch <-chan OpsLogEntry, d time.Duration) {
t.Helper()
select {
case e := <-ch:
t.Fatalf("unexpected message: %+v", e)
case <-time.After(d):
}
}