sub2api/backend/internal/handler/admin/ops_log_stream_handler.go
win de048fad25 chore(wip): save Windsurf/Antigravity/ops customizations before upstream merge
WIP commit保存以下定制工作以便后续合并 upstream v0.1.124-125:
- Windsurf: tier access service, NLU extractor, cold threshold, Google login
- Antigravity: client/oauth 调整
- Ops: log stream handler/broadcaster/middleware, OpsLogStreamView
- Frontend: WindsurfLoginModal Google, GoogleIcon, AccountsView, sidebar/router/i18n
2026-05-09 00:41:19 +08:00

211 lines
6.2 KiB
Go

package admin
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/response"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
)
const (
opsLogStreamHeartbeat = 25 * time.Second
opsLogStreamRecentMax = 200
opsLogStreamSubBufEntries = 1024
opsLogStreamModelMaxLen = 256
)
// LogStream serves a Server-Sent Events feed of every gateway request.
//
// GET /api/v1/admin/ops/logs/stream?min_status=400&model=glm-4.7&account_id=42&min_latency_ms=2000
//
// Filter query params (all optional, AND-combined):
//
// min_status — int only emit when entry.status >= this value
// model — exact match on model key
// account_id — int64
// group_id — int64
// min_latency_ms — int64 only emit when entry.latency_ms >= this value
//
// The handler keeps the connection open until the client disconnects, the
// monitoring is disabled, or the broadcaster is torn down. A heartbeat
// comment line is sent every 25s so reverse proxies don't time out idle
// streams.
func (h *OpsHandler) LogStream(c *gin.Context) {
if h.logBroadcaster == nil {
response.Error(c, http.StatusServiceUnavailable, "log broadcaster not configured")
return
}
// nil opsService is allowed for lightweight deployments / tests where
// the OpsService dependency is intentionally absent. The admin auth
// middleware on the route group still enforces JWT + admin role, so
// the stream is never reachable anonymously.
if h.opsService != nil {
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
}
filter, err := parseOpsLogFilter(c)
if err != nil {
response.BadRequest(c, err.Error())
return
}
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("X-Accel-Buffering", "no")
flusher, ok := c.Writer.(http.Flusher)
if !ok {
response.Error(c, http.StatusInternalServerError, "streaming unsupported")
return
}
ch, unsubscribe := h.logBroadcaster.Subscribe(filter, opsLogStreamSubBufEntries)
defer unsubscribe()
// Prime client with recent buffered history (so a fresh dashboard tab
// renders something immediately instead of staying blank).
for _, e := range h.logBroadcaster.Snapshot(filter, opsLogStreamRecentMax) {
if err := writeOpsLogSSE(c.Writer, &e); err != nil {
return
}
}
flusher.Flush()
heartbeat := time.NewTicker(opsLogStreamHeartbeat)
defer heartbeat.Stop()
ctxDone := c.Request.Context().Done()
for {
select {
case <-ctxDone:
return
case <-heartbeat.C:
if _, err := io.WriteString(c.Writer, ": ping\n\n"); err != nil {
return
}
flusher.Flush()
case entry := <-ch:
if err := writeOpsLogSSE(c.Writer, &entry); err != nil {
return
}
flusher.Flush()
}
}
}
// LogStreamRecent returns the broadcaster history without subscribing.
// Useful for one-shot polling when the admin panel cannot keep an open
// SSE connection (e.g. behind a buffering proxy).
//
// GET /api/v1/admin/ops/logs/recent?min_status=400&max=500
func (h *OpsHandler) LogStreamRecent(c *gin.Context) {
if h.logBroadcaster == nil {
response.Error(c, http.StatusServiceUnavailable, "log broadcaster not configured")
return
}
if h.opsService != nil {
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
response.ErrorFrom(c, err)
return
}
}
filter, err := parseOpsLogFilter(c)
if err != nil {
response.BadRequest(c, err.Error())
return
}
maxN := opsLogStreamRecentMax
if v := strings.TrimSpace(c.Query("max")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 2000 {
maxN = n
}
}
published, dropped, subs := h.logBroadcaster.Stats()
response.Success(c, gin.H{
"entries": h.logBroadcaster.Snapshot(filter, maxN),
"published_total": published,
"dropped_total": dropped,
"subscribers": subs,
})
}
func parseOpsLogFilter(c *gin.Context) (service.OpsLogFilter, error) {
f := service.OpsLogFilter{}
if v := strings.TrimSpace(c.Query("min_status")); v != "" {
n, err := strconv.Atoi(v)
if err != nil || n < 0 {
return f, fmt.Errorf("invalid min_status")
}
f.MinStatus = n
}
if v := strings.TrimSpace(c.Query("model")); v != "" {
// Cap input to keep an authenticated admin from stuffing huge
// strings into long-lived subscription state.
if len(v) > opsLogStreamModelMaxLen {
return f, fmt.Errorf("model too long (max %d)", opsLogStreamModelMaxLen)
}
f.Model = v
}
if v := strings.TrimSpace(c.Query("account_id")); v != "" {
n, err := strconv.ParseInt(v, 10, 64)
// Reject 0 — matches() treats AccountID==0 as "match all", so a
// param of 0 would silently degrade to no-filter without telling
// the user. Demand a positive id (callers wanting all should omit
// the param).
if err != nil || n <= 0 {
return f, fmt.Errorf("invalid account_id")
}
f.AccountID = n
}
if v := strings.TrimSpace(c.Query("group_id")); v != "" {
n, err := strconv.ParseInt(v, 10, 64)
if err != nil || n <= 0 {
return f, fmt.Errorf("invalid group_id")
}
f.GroupID = n
}
if v := strings.TrimSpace(c.Query("min_latency_ms")); v != "" {
n, err := strconv.ParseInt(v, 10, 64)
if err != nil || n < 0 {
return f, fmt.Errorf("invalid min_latency_ms")
}
f.MinLatencyMs = n
}
return f, nil
}
// writeOpsLogSSE writes one SSE frame: an `event: log` line followed by a
// single `data:` line containing the JSON-encoded entry, terminated by a
// blank line per the SSE protocol.
//
// This assumes the JSON payload contains no bare LF — which holds because
// every string field in OpsLogEntry passes through encoding/json escaping.
// If a future field is added that emits raw bytes (e.g. a []byte body),
// the marshalled output must be split across multiple `data:` lines.
func writeOpsLogSSE(w io.Writer, e *service.OpsLogEntry) error {
payload, err := json.Marshal(e)
if err != nil {
return err
}
if _, err := io.WriteString(w, "event: log\ndata: "); err != nil {
return err
}
if _, err := w.Write(payload); err != nil {
return err
}
_, err = io.WriteString(w, "\n\n")
return err
}