package admin import ( "encoding/json" "fmt" "io" "net/http" "strconv" "strings" "time" "github.com/Wei-Shaw/sub2api/internal/pkg/response" "github.com/Wei-Shaw/sub2api/internal/service" "github.com/gin-gonic/gin" ) const ( opsLogStreamHeartbeat = 25 * time.Second opsLogStreamRecentMax = 200 opsLogStreamSubBufEntries = 1024 opsLogStreamModelMaxLen = 256 ) // LogStream serves a Server-Sent Events feed of every gateway request. // // GET /api/v1/admin/ops/logs/stream?min_status=400&model=glm-4.7&account_id=42&min_latency_ms=2000 // // Filter query params (all optional, AND-combined): // // min_status — int only emit when entry.status >= this value // model — exact match on model key // account_id — int64 // group_id — int64 // min_latency_ms — int64 only emit when entry.latency_ms >= this value // // The handler keeps the connection open until the client disconnects, the // monitoring is disabled, or the broadcaster is torn down. A heartbeat // comment line is sent every 25s so reverse proxies don't time out idle // streams. func (h *OpsHandler) LogStream(c *gin.Context) { if h.logBroadcaster == nil { response.Error(c, http.StatusServiceUnavailable, "log broadcaster not configured") return } // nil opsService is allowed for lightweight deployments / tests where // the OpsService dependency is intentionally absent. The admin auth // middleware on the route group still enforces JWT + admin role, so // the stream is never reachable anonymously. if h.opsService != nil { if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil { response.ErrorFrom(c, err) return } } filter, err := parseOpsLogFilter(c) if err != nil { response.BadRequest(c, err.Error()) return } c.Header("Content-Type", "text/event-stream") c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") c.Header("X-Accel-Buffering", "no") flusher, ok := c.Writer.(http.Flusher) if !ok { response.Error(c, http.StatusInternalServerError, "streaming unsupported") return } ch, unsubscribe := h.logBroadcaster.Subscribe(filter, opsLogStreamSubBufEntries) defer unsubscribe() // Prime client with recent buffered history (so a fresh dashboard tab // renders something immediately instead of staying blank). for _, e := range h.logBroadcaster.Snapshot(filter, opsLogStreamRecentMax) { if err := writeOpsLogSSE(c.Writer, &e); err != nil { return } } flusher.Flush() heartbeat := time.NewTicker(opsLogStreamHeartbeat) defer heartbeat.Stop() ctxDone := c.Request.Context().Done() for { select { case <-ctxDone: return case <-heartbeat.C: if _, err := io.WriteString(c.Writer, ": ping\n\n"); err != nil { return } flusher.Flush() case entry := <-ch: if err := writeOpsLogSSE(c.Writer, &entry); err != nil { return } flusher.Flush() } } } // LogStreamRecent returns the broadcaster history without subscribing. // Useful for one-shot polling when the admin panel cannot keep an open // SSE connection (e.g. behind a buffering proxy). // // GET /api/v1/admin/ops/logs/recent?min_status=400&max=500 func (h *OpsHandler) LogStreamRecent(c *gin.Context) { if h.logBroadcaster == nil { response.Error(c, http.StatusServiceUnavailable, "log broadcaster not configured") return } if h.opsService != nil { if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil { response.ErrorFrom(c, err) return } } filter, err := parseOpsLogFilter(c) if err != nil { response.BadRequest(c, err.Error()) return } maxN := opsLogStreamRecentMax if v := strings.TrimSpace(c.Query("max")); v != "" { if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 2000 { maxN = n } } published, dropped, subs := h.logBroadcaster.Stats() response.Success(c, gin.H{ "entries": h.logBroadcaster.Snapshot(filter, maxN), "published_total": published, "dropped_total": dropped, "subscribers": subs, }) } func parseOpsLogFilter(c *gin.Context) (service.OpsLogFilter, error) { f := service.OpsLogFilter{} if v := strings.TrimSpace(c.Query("min_status")); v != "" { n, err := strconv.Atoi(v) if err != nil || n < 0 { return f, fmt.Errorf("invalid min_status") } f.MinStatus = n } if v := strings.TrimSpace(c.Query("model")); v != "" { // Cap input to keep an authenticated admin from stuffing huge // strings into long-lived subscription state. if len(v) > opsLogStreamModelMaxLen { return f, fmt.Errorf("model too long (max %d)", opsLogStreamModelMaxLen) } f.Model = v } if v := strings.TrimSpace(c.Query("account_id")); v != "" { n, err := strconv.ParseInt(v, 10, 64) // Reject 0 — matches() treats AccountID==0 as "match all", so a // param of 0 would silently degrade to no-filter without telling // the user. Demand a positive id (callers wanting all should omit // the param). if err != nil || n <= 0 { return f, fmt.Errorf("invalid account_id") } f.AccountID = n } if v := strings.TrimSpace(c.Query("group_id")); v != "" { n, err := strconv.ParseInt(v, 10, 64) if err != nil || n <= 0 { return f, fmt.Errorf("invalid group_id") } f.GroupID = n } if v := strings.TrimSpace(c.Query("min_latency_ms")); v != "" { n, err := strconv.ParseInt(v, 10, 64) if err != nil || n < 0 { return f, fmt.Errorf("invalid min_latency_ms") } f.MinLatencyMs = n } return f, nil } // writeOpsLogSSE writes one SSE frame: an `event: log` line followed by a // single `data:` line containing the JSON-encoded entry, terminated by a // blank line per the SSE protocol. // // This assumes the JSON payload contains no bare LF — which holds because // every string field in OpsLogEntry passes through encoding/json escaping. // If a future field is added that emits raw bytes (e.g. a []byte body), // the marshalled output must be split across multiple `data:` lines. func writeOpsLogSSE(w io.Writer, e *service.OpsLogEntry) error { payload, err := json.Marshal(e) if err != nil { return err } if _, err := io.WriteString(w, "event: log\ndata: "); err != nil { return err } if _, err := w.Write(payload); err != nil { return err } _, err = io.WriteString(w, "\n\n") return err }