sub2api/backend/internal/service/ops_log_broadcaster.go
win de048fad25 chore(wip): save Windsurf/Antigravity/ops customizations before upstream merge
WIP commit保存以下定制工作以便后续合并 upstream v0.1.124-125:
- Windsurf: tier access service, NLU extractor, cold threshold, Google login
- Antigravity: client/oauth 调整
- Ops: log stream handler/broadcaster/middleware, OpsLogStreamView
- Frontend: WindsurfLoginModal Google, GoogleIcon, AccountsView, sidebar/router/i18n
2026-05-09 00:41:19 +08:00

238 lines
7.2 KiB
Go

// Package service exposes domain services. opslog provides a lightweight
// in-memory pub/sub for streaming admin log events without persisting them.
package service
import (
"sync"
"sync/atomic"
"time"
)
// OpsLogEntry is one streamed log event. All fields are optional except Time
// — any missing data simply renders as empty in the admin UI.
type OpsLogEntry struct {
Time time.Time `json:"time"`
Method string `json:"method,omitempty"`
Path string `json:"path,omitempty"`
Status int `json:"status"`
LatencyMs int64 `json:"latency_ms"`
Model string `json:"model,omitempty"`
Stream bool `json:"stream,omitempty"`
AccountID int64 `json:"account_id,omitempty"`
GroupID int64 `json:"group_id,omitempty"`
APIKeyID int64 `json:"api_key_id,omitempty"`
UserID int64 `json:"user_id,omitempty"`
Turns int `json:"turns,omitempty"`
PromptChars int `json:"prompt_chars,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
ErrorDetail string `json:"error_detail,omitempty"`
UpstreamCode int `json:"upstream_status,omitempty"`
}
// OpsLogFilter restricts which entries a subscriber receives. Empty fields
// match everything; non-empty fields are AND-combined.
type OpsLogFilter struct {
MinStatus int
Model string
AccountID int64
GroupID int64
MinLatencyMs int64
}
// matches reports whether the entry passes the filter.
func (f OpsLogFilter) matches(e *OpsLogEntry) bool {
if f.MinStatus > 0 && e.Status < f.MinStatus {
return false
}
if f.Model != "" && e.Model != f.Model {
return false
}
if f.AccountID > 0 && e.AccountID != f.AccountID {
return false
}
if f.GroupID > 0 && e.GroupID != f.GroupID {
return false
}
if f.MinLatencyMs > 0 && e.LatencyMs < f.MinLatencyMs {
return false
}
return true
}
// OpsLogBroadcaster is a lock-free-ish fan-out broadcaster with a bounded
// ring buffer for history (so freshly connected clients can prime their UI)
// and per-subscriber non-blocking sends (so a slow client never stalls a
// publish on the hot request path).
type OpsLogBroadcaster struct {
subsMu sync.RWMutex
subscribers map[int64]*opsLogSubscription
nextID atomic.Int64
historyMu sync.Mutex
history []OpsLogEntry
histHead int
histLen int
histCap int
// publishedTotal / droppedTotal expose simple ops counters for an
// admin dashboard cell. Atomic so callers don't need to lock.
publishedTotal atomic.Int64
droppedTotal atomic.Int64
}
type opsLogSubscription struct {
ch chan OpsLogEntry
filter OpsLogFilter
// closed is set atomically by unsubscribe() before any cleanup. Publish
// reads this flag under subsMu.RLock and skips closed subscriptions
// instead of attempting a send-on-closed-channel (which would panic).
closed atomic.Bool
}
// NewOpsLogBroadcaster constructs a broadcaster. historyCap controls how
// many recent entries are kept for newly connected clients; 1000 is a sane
// default. Pass historyCap<=0 to disable the buffer entirely.
func NewOpsLogBroadcaster(historyCap int) *OpsLogBroadcaster {
if historyCap < 0 {
historyCap = 0
}
b := &OpsLogBroadcaster{
subscribers: make(map[int64]*opsLogSubscription),
histCap: historyCap,
}
if historyCap > 0 {
b.history = make([]OpsLogEntry, historyCap)
}
return b
}
// Publish fans the entry out to every matching subscriber and appends it to
// the history buffer. Never blocks: if a subscriber's channel is full, the
// entry is dropped for that subscriber and the broadcaster's drop counter
// is incremented. Hot-path safe.
//
// The same entry value is delivered (by value) to all subscribers and to
// the ring buffer — no shared mutable pointer is leaked, so subscribers
// holding references to past entries cannot observe later mutations.
func (b *OpsLogBroadcaster) Publish(entry OpsLogEntry) {
if entry.Time.IsZero() {
entry.Time = time.Now()
}
b.publishedTotal.Add(1)
b.appendHistory(entry)
b.subsMu.RLock()
subs := make([]*opsLogSubscription, 0, len(b.subscribers))
for _, s := range b.subscribers {
subs = append(subs, s)
}
b.subsMu.RUnlock()
for _, s := range subs {
// Skip subscriptions that have been unsubscribed since we snapped
// the list. Without this check, a concurrent unsubscribe → close(ch)
// would race the send below and panic on send-to-closed-channel.
if s.closed.Load() {
continue
}
if !s.filter.matches(&entry) {
continue
}
select {
case s.ch <- entry:
default:
b.droppedTotal.Add(1)
}
}
}
// Subscribe registers a new listener. The returned channel is buffered;
// callers MUST drain it. Cancel by calling the returned unsubscribe func
// (idempotent and safe from any goroutine).
//
// IMPORTANT: unsubscribe does NOT close the channel. Closing it would race
// with concurrent Publish goroutines that may already be holding a snapshot
// of the subscription pointer (causing a panic on send-to-closed-channel).
// Instead, unsubscribe (a) sets the closed atomic flag — Publish skips sends
// to flagged subs — and (b) removes from the subscriber map. Any in-flight
// send that slips past the flag check still proceeds harmlessly into the
// channel buffer and is garbage-collected with the channel once the caller
// drops its reference. Subscribers that need to know the broadcaster is
// done with them should rely on the parent ctx, not channel close.
func (b *OpsLogBroadcaster) Subscribe(filter OpsLogFilter, bufSize int) (<-chan OpsLogEntry, func()) {
if bufSize <= 0 {
bufSize = 1024
}
id := b.nextID.Add(1)
sub := &opsLogSubscription{
ch: make(chan OpsLogEntry, bufSize),
filter: filter,
}
b.subsMu.Lock()
b.subscribers[id] = sub
b.subsMu.Unlock()
var unsubOnce sync.Once
unsubscribe := func() {
unsubOnce.Do(func() {
sub.closed.Store(true)
b.subsMu.Lock()
delete(b.subscribers, id)
b.subsMu.Unlock()
})
}
return sub.ch, unsubscribe
}
// Snapshot returns a copy of the recent history (oldest → newest), filtered
// by the given filter. Used by /admin/ops/logs/recent to prime newly opened
// dashboards before live events arrive.
func (b *OpsLogBroadcaster) Snapshot(filter OpsLogFilter, maxEntries int) []OpsLogEntry {
b.historyMu.Lock()
defer b.historyMu.Unlock()
if b.histLen == 0 {
return nil
}
out := make([]OpsLogEntry, 0, b.histLen)
start := b.histHead - b.histLen
if start < 0 {
start += b.histCap
}
for i := 0; i < b.histLen; i++ {
idx := (start + i) % b.histCap
e := b.history[idx]
if !filter.matches(&e) {
continue
}
out = append(out, e)
}
if maxEntries > 0 && len(out) > maxEntries {
out = out[len(out)-maxEntries:]
}
return out
}
// Stats reports cumulative publish/drop counts and the current subscriber
// count for diagnostics.
func (b *OpsLogBroadcaster) Stats() (published, dropped int64, subscribers int) {
b.subsMu.RLock()
subscribers = len(b.subscribers)
b.subsMu.RUnlock()
return b.publishedTotal.Load(), b.droppedTotal.Load(), subscribers
}
func (b *OpsLogBroadcaster) appendHistory(e OpsLogEntry) {
if b.histCap == 0 {
return
}
b.historyMu.Lock()
defer b.historyMu.Unlock()
b.history[b.histHead] = e
b.histHead = (b.histHead + 1) % b.histCap
if b.histLen < b.histCap {
b.histLen++
}
}