// Package service exposes domain services. opslog provides a lightweight // in-memory pub/sub for streaming admin log events without persisting them. package service import ( "sync" "sync/atomic" "time" ) // OpsLogEntry is one streamed log event. All fields are optional except Time // — any missing data simply renders as empty in the admin UI. type OpsLogEntry struct { Time time.Time `json:"time"` Method string `json:"method,omitempty"` Path string `json:"path,omitempty"` Status int `json:"status"` LatencyMs int64 `json:"latency_ms"` Model string `json:"model,omitempty"` Stream bool `json:"stream,omitempty"` AccountID int64 `json:"account_id,omitempty"` GroupID int64 `json:"group_id,omitempty"` APIKeyID int64 `json:"api_key_id,omitempty"` UserID int64 `json:"user_id,omitempty"` Turns int `json:"turns,omitempty"` PromptChars int `json:"prompt_chars,omitempty"` ErrorMessage string `json:"error_message,omitempty"` ErrorDetail string `json:"error_detail,omitempty"` UpstreamCode int `json:"upstream_status,omitempty"` } // OpsLogFilter restricts which entries a subscriber receives. Empty fields // match everything; non-empty fields are AND-combined. type OpsLogFilter struct { MinStatus int Model string AccountID int64 GroupID int64 MinLatencyMs int64 } // matches reports whether the entry passes the filter. func (f OpsLogFilter) matches(e *OpsLogEntry) bool { if f.MinStatus > 0 && e.Status < f.MinStatus { return false } if f.Model != "" && e.Model != f.Model { return false } if f.AccountID > 0 && e.AccountID != f.AccountID { return false } if f.GroupID > 0 && e.GroupID != f.GroupID { return false } if f.MinLatencyMs > 0 && e.LatencyMs < f.MinLatencyMs { return false } return true } // OpsLogBroadcaster is a lock-free-ish fan-out broadcaster with a bounded // ring buffer for history (so freshly connected clients can prime their UI) // and per-subscriber non-blocking sends (so a slow client never stalls a // publish on the hot request path). type OpsLogBroadcaster struct { subsMu sync.RWMutex subscribers map[int64]*opsLogSubscription nextID atomic.Int64 historyMu sync.Mutex history []OpsLogEntry histHead int histLen int histCap int // publishedTotal / droppedTotal expose simple ops counters for an // admin dashboard cell. Atomic so callers don't need to lock. publishedTotal atomic.Int64 droppedTotal atomic.Int64 } type opsLogSubscription struct { ch chan OpsLogEntry filter OpsLogFilter // closed is set atomically by unsubscribe() before any cleanup. Publish // reads this flag under subsMu.RLock and skips closed subscriptions // instead of attempting a send-on-closed-channel (which would panic). closed atomic.Bool } // NewOpsLogBroadcaster constructs a broadcaster. historyCap controls how // many recent entries are kept for newly connected clients; 1000 is a sane // default. Pass historyCap<=0 to disable the buffer entirely. func NewOpsLogBroadcaster(historyCap int) *OpsLogBroadcaster { if historyCap < 0 { historyCap = 0 } b := &OpsLogBroadcaster{ subscribers: make(map[int64]*opsLogSubscription), histCap: historyCap, } if historyCap > 0 { b.history = make([]OpsLogEntry, historyCap) } return b } // Publish fans the entry out to every matching subscriber and appends it to // the history buffer. Never blocks: if a subscriber's channel is full, the // entry is dropped for that subscriber and the broadcaster's drop counter // is incremented. Hot-path safe. // // The same entry value is delivered (by value) to all subscribers and to // the ring buffer — no shared mutable pointer is leaked, so subscribers // holding references to past entries cannot observe later mutations. func (b *OpsLogBroadcaster) Publish(entry OpsLogEntry) { if entry.Time.IsZero() { entry.Time = time.Now() } b.publishedTotal.Add(1) b.appendHistory(entry) b.subsMu.RLock() subs := make([]*opsLogSubscription, 0, len(b.subscribers)) for _, s := range b.subscribers { subs = append(subs, s) } b.subsMu.RUnlock() for _, s := range subs { // Skip subscriptions that have been unsubscribed since we snapped // the list. Without this check, a concurrent unsubscribe → close(ch) // would race the send below and panic on send-to-closed-channel. if s.closed.Load() { continue } if !s.filter.matches(&entry) { continue } select { case s.ch <- entry: default: b.droppedTotal.Add(1) } } } // Subscribe registers a new listener. The returned channel is buffered; // callers MUST drain it. Cancel by calling the returned unsubscribe func // (idempotent and safe from any goroutine). // // IMPORTANT: unsubscribe does NOT close the channel. Closing it would race // with concurrent Publish goroutines that may already be holding a snapshot // of the subscription pointer (causing a panic on send-to-closed-channel). // Instead, unsubscribe (a) sets the closed atomic flag — Publish skips sends // to flagged subs — and (b) removes from the subscriber map. Any in-flight // send that slips past the flag check still proceeds harmlessly into the // channel buffer and is garbage-collected with the channel once the caller // drops its reference. Subscribers that need to know the broadcaster is // done with them should rely on the parent ctx, not channel close. func (b *OpsLogBroadcaster) Subscribe(filter OpsLogFilter, bufSize int) (<-chan OpsLogEntry, func()) { if bufSize <= 0 { bufSize = 1024 } id := b.nextID.Add(1) sub := &opsLogSubscription{ ch: make(chan OpsLogEntry, bufSize), filter: filter, } b.subsMu.Lock() b.subscribers[id] = sub b.subsMu.Unlock() var unsubOnce sync.Once unsubscribe := func() { unsubOnce.Do(func() { sub.closed.Store(true) b.subsMu.Lock() delete(b.subscribers, id) b.subsMu.Unlock() }) } return sub.ch, unsubscribe } // Snapshot returns a copy of the recent history (oldest → newest), filtered // by the given filter. Used by /admin/ops/logs/recent to prime newly opened // dashboards before live events arrive. func (b *OpsLogBroadcaster) Snapshot(filter OpsLogFilter, maxEntries int) []OpsLogEntry { b.historyMu.Lock() defer b.historyMu.Unlock() if b.histLen == 0 { return nil } out := make([]OpsLogEntry, 0, b.histLen) start := b.histHead - b.histLen if start < 0 { start += b.histCap } for i := 0; i < b.histLen; i++ { idx := (start + i) % b.histCap e := b.history[idx] if !filter.matches(&e) { continue } out = append(out, e) } if maxEntries > 0 && len(out) > maxEntries { out = out[len(out)-maxEntries:] } return out } // Stats reports cumulative publish/drop counts and the current subscriber // count for diagnostics. func (b *OpsLogBroadcaster) Stats() (published, dropped int64, subscribers int) { b.subsMu.RLock() subscribers = len(b.subscribers) b.subsMu.RUnlock() return b.publishedTotal.Load(), b.droppedTotal.Load(), subscribers } func (b *OpsLogBroadcaster) appendHistory(e OpsLogEntry) { if b.histCap == 0 { return } b.historyMu.Lock() defer b.historyMu.Unlock() b.history[b.histHead] = e b.histHead = (b.histHead + 1) % b.histCap if b.histLen < b.histCap { b.histLen++ } }