sub2api/backend/internal/handler/ops_log_stream_middleware.go
win de048fad25 chore(wip): save Windsurf/Antigravity/ops customizations before upstream merge
WIP commit保存以下定制工作以便后续合并 upstream v0.1.124-125:
- Windsurf: tier access service, NLU extractor, cold threshold, Google login
- Antigravity: client/oauth 调整
- Ops: log stream handler/broadcaster/middleware, OpsLogStreamView
- Frontend: WindsurfLoginModal Google, GoogleIcon, AccountsView, sidebar/router/i18n
2026-05-09 00:41:19 +08:00

101 lines
2.6 KiB
Go

package handler
import (
"strings"
"time"
"github.com/gin-gonic/gin"
servermiddleware "github.com/Wei-Shaw/sub2api/internal/server/middleware"
"github.com/Wei-Shaw/sub2api/internal/service"
)
// OpsLogStreamMiddleware fans every gateway request out to the in-memory
// OpsLogBroadcaster so admin tools can subscribe to a real-time SSE feed.
//
// This is intentionally separate from OpsErrorLoggerMiddleware:
// - OpsErrorLoggerMiddleware persists 4xx/5xx into the database for audit.
// - This middleware streams every request (success + failure) for live UX.
//
// The broadcaster.Publish call is non-blocking by design (see the
// implementation): a slow/missing subscriber NEVER stalls the request path.
// Empty broadcaster (nil receiver, or no subscribers) is a no-op.
func OpsLogStreamMiddleware(b *service.OpsLogBroadcaster) gin.HandlerFunc {
if b == nil {
return func(c *gin.Context) { c.Next() }
}
return func(c *gin.Context) {
start := time.Now()
c.Next()
entry := service.OpsLogEntry{
Time: start,
Method: c.Request.Method,
Path: c.Request.URL.Path,
Status: c.Writer.Status(),
LatencyMs: time.Since(start).Milliseconds(),
}
if v, ok := c.Get(opsModelKey); ok {
if s, ok := v.(string); ok {
entry.Model = s
}
}
if v, ok := c.Get(opsStreamKey); ok {
if streamFlag, ok := v.(bool); ok {
entry.Stream = streamFlag
}
}
if v, ok := c.Get(opsAccountIDKey); ok {
switch t := v.(type) {
case int64:
entry.AccountID = t
case int:
entry.AccountID = int64(t)
}
}
// Best-effort api-key + group + user from middleware context.
if apiKey, ok := servermiddleware.GetAPIKeyFromContext(c); ok && apiKey != nil {
entry.APIKeyID = apiKey.ID
if apiKey.GroupID != nil {
entry.GroupID = *apiKey.GroupID
}
entry.UserID = apiKey.UserID
}
// Pull upstream error context (set by gateway services on retries).
if v, ok := c.Get(service.OpsUpstreamStatusCodeKey); ok {
switch t := v.(type) {
case int:
entry.UpstreamCode = t
case int64:
entry.UpstreamCode = int(t)
}
}
if v, ok := c.Get(service.OpsUpstreamErrorMessageKey); ok {
if s, ok := v.(string); ok {
entry.ErrorMessage = trimForStream(s)
}
}
if v, ok := c.Get(service.OpsUpstreamErrorDetailKey); ok {
if s, ok := v.(string); ok {
entry.ErrorDetail = trimForStream(s)
}
}
b.Publish(entry)
}
}
// trimForStream caps long error strings so a single broken upstream cannot
// flood the SSE channel with megabyte-sized error blobs.
func trimForStream(s string) string {
const max = 512
s = strings.TrimSpace(s)
if len(s) > max {
return s[:max] + "…"
}
return s
}