package service import ( "sync" "testing" "time" ) func TestOpsLogBroadcaster_FanOutDeliversToMatchingSubscribers(t *testing.T) { b := NewOpsLogBroadcaster(16) chHigh, unHigh := b.Subscribe(OpsLogFilter{MinStatus: 500}, 8) defer unHigh() chAll, unAll := b.Subscribe(OpsLogFilter{}, 8) defer unAll() b.Publish(OpsLogEntry{Status: 200, Model: "claude-sonnet-4.6"}) b.Publish(OpsLogEntry{Status: 503, Model: "kimi-k2.5"}) got200 := receiveOrTimeout(t, chAll, 200*time.Millisecond) got503 := receiveOrTimeout(t, chAll, 200*time.Millisecond) if got200.Status != 200 || got503.Status != 503 { t.Fatalf("unexpected fan-out: %d / %d", got200.Status, got503.Status) } gotHigh := receiveOrTimeout(t, chHigh, 200*time.Millisecond) if gotHigh.Status != 503 { t.Fatalf("filter MinStatus=500 should drop 200, got %d", gotHigh.Status) } expectNoMessage(t, chHigh, 50*time.Millisecond) } func TestOpsLogBroadcaster_FilterByModelAndAccount(t *testing.T) { b := NewOpsLogBroadcaster(0) chKimi, unKimi := b.Subscribe(OpsLogFilter{Model: "kimi-k2.5"}, 4) defer unKimi() chAcct, unAcct := b.Subscribe(OpsLogFilter{AccountID: 42}, 4) defer unAcct() b.Publish(OpsLogEntry{Status: 200, Model: "claude-sonnet-4.6", AccountID: 1}) b.Publish(OpsLogEntry{Status: 200, Model: "kimi-k2.5", AccountID: 42}) got := receiveOrTimeout(t, chKimi, 200*time.Millisecond) if got.Model != "kimi-k2.5" { t.Fatalf("expected kimi entry, got %+v", got) } expectNoMessage(t, chKimi, 50*time.Millisecond) gotA := receiveOrTimeout(t, chAcct, 200*time.Millisecond) if gotA.AccountID != 42 { t.Fatalf("expected account 42, got %d", gotA.AccountID) } } func TestOpsLogBroadcaster_NeverBlocksOnSlowSubscriber(t *testing.T) { b := NewOpsLogBroadcaster(0) // Subscriber with buffer=1, never reads. After the second Publish, // the entry must be dropped instead of blocking the publisher. _, unsub := b.Subscribe(OpsLogFilter{}, 1) defer unsub() done := make(chan struct{}) go func() { for i := 0; i < 100; i++ { b.Publish(OpsLogEntry{Status: 200}) } close(done) }() select { case <-done: case <-time.After(2 * time.Second): t.Fatal("publisher blocked on slow subscriber") } _, dropped, _ := b.Stats() if dropped == 0 { t.Fatal("expected dropped count > 0 when subscriber buffer overflows") } } func TestOpsLogBroadcaster_HistorySnapshot(t *testing.T) { b := NewOpsLogBroadcaster(3) for i := 1; i <= 5; i++ { b.Publish(OpsLogEntry{Status: 200 + i}) } got := b.Snapshot(OpsLogFilter{}, 0) if len(got) != 3 { t.Fatalf("expected ring of 3, got %d", len(got)) } // Oldest → newest if got[0].Status != 203 || got[1].Status != 204 || got[2].Status != 205 { t.Fatalf("expected 203/204/205, got %d/%d/%d", got[0].Status, got[1].Status, got[2].Status) } } func TestOpsLogBroadcaster_HistoryAppliesFilter(t *testing.T) { b := NewOpsLogBroadcaster(8) b.Publish(OpsLogEntry{Status: 200}) b.Publish(OpsLogEntry{Status: 500}) b.Publish(OpsLogEntry{Status: 503}) got := b.Snapshot(OpsLogFilter{MinStatus: 500}, 0) if len(got) != 2 { t.Fatalf("expected 2 high-status entries, got %d: %+v", len(got), got) } } func TestOpsLogBroadcaster_UnsubscribeIdempotent(t *testing.T) { b := NewOpsLogBroadcaster(0) _, unsub := b.Subscribe(OpsLogFilter{}, 1) unsub() unsub() // second call must not panic unsub() // and a third } func TestOpsLogBroadcaster_ZeroTimeFilledIn(t *testing.T) { b := NewOpsLogBroadcaster(2) b.Publish(OpsLogEntry{Status: 200}) // Time intentionally zero got := b.Snapshot(OpsLogFilter{}, 0) if len(got) != 1 { t.Fatalf("expected 1 entry, got %d", len(got)) } if got[0].Time.IsZero() { t.Fatal("Publish should populate zero Time with time.Now()") } } func TestOpsLogBroadcaster_ConcurrentSafe(t *testing.T) { b := NewOpsLogBroadcaster(64) // Spin a few subscribers and producers; rely on -race to surface // any concurrency bugs in the fan-out path. var wg sync.WaitGroup for i := 0; i < 4; i++ { ch, unsub := b.Subscribe(OpsLogFilter{}, 32) wg.Add(1) go func() { defer wg.Done() defer unsub() deadline := time.Now().Add(200 * time.Millisecond) for time.Now().Before(deadline) { select { case <-ch: case <-time.After(5 * time.Millisecond): } } }() } for i := 0; i < 4; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < 200; j++ { b.Publish(OpsLogEntry{Status: 200, Model: "x"}) } }() } wg.Wait() pub, _, _ := b.Stats() if pub != 800 { t.Fatalf("expected 800 publishes, got %d", pub) } } // TestOpsLogBroadcaster_ConcurrentUnsubscribeNoPanic exercises the exact // race the audit identified: a Publish goroutine has snapped a subscription // pointer while another goroutine unsubscribes (close(ch)) the moment before // the send. Without the closed-flag guard in Publish, this races into // "send on closed channel" and panics. With the guard, Publish observes // closed=true and skips the send. Run with -race. func TestOpsLogBroadcaster_ConcurrentUnsubscribeNoPanic(t *testing.T) { b := NewOpsLogBroadcaster(0) var wg sync.WaitGroup for i := 0; i < 8; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < 200; j++ { ch, unsub := b.Subscribe(OpsLogFilter{}, 1) // Drain non-blockingly until the next publish lands. done := make(chan struct{}) go func() { defer close(done) timer := time.NewTimer(50 * time.Millisecond) defer timer.Stop() for { select { case <-ch: case <-timer.C: return } } }() b.Publish(OpsLogEntry{Status: 200}) unsub() b.Publish(OpsLogEntry{Status: 200}) // post-unsub publish must not panic <-done } }() } wg.Wait() } // TestOpsLogBroadcaster_SnapshotConcurrentWithPublish ensures Snapshot is // safe under concurrent Publish (verifies subsMu vs historyMu coexistence). func TestOpsLogBroadcaster_SnapshotConcurrentWithPublish(t *testing.T) { b := NewOpsLogBroadcaster(32) stop := make(chan struct{}) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() for { select { case <-stop: return default: b.Publish(OpsLogEntry{Status: 200}) } } }() go func() { defer wg.Done() for i := 0; i < 200; i++ { _ = b.Snapshot(OpsLogFilter{}, 0) } }() time.Sleep(50 * time.Millisecond) close(stop) wg.Wait() } // helpers -------------------------------------------------------------- func receiveOrTimeout(t *testing.T, ch <-chan OpsLogEntry, d time.Duration) OpsLogEntry { t.Helper() select { case e := <-ch: return e case <-time.After(d): t.Fatalf("timeout waiting for entry after %s", d) } return OpsLogEntry{} } func expectNoMessage(t *testing.T, ch <-chan OpsLogEntry, d time.Duration) { t.Helper() select { case e := <-ch: t.Fatalf("unexpected message: %+v", e) case <-time.After(d): } }