From b8a43723280502d037fb5929309a2eabda651b10 Mon Sep 17 00:00:00 2001 From: win Date: Fri, 10 Apr 2026 21:23:54 +0800 Subject: [PATCH] feat: Complete Wire dependency injection and HTTP API routes for Antigravity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 已完成: 1. ✅ 修复 SoraMediaCleanupService 未定义问题(从 provideCleanup 中删除) 2. ✅ 创建 LanguageServerService 的 Wire 提供函数 3. ✅ 更新 ProvideRouter 签名以接收 langServerService 参数 4. ✅ 更新 SetupRouter 和 registerRoutes 函数签名 5. ✅ 创建完整的 HTTP 路由处理器 (antigravity_http.go) 6. ✅ 注册 Antigravity HTTP 路由到 v1 API 分组 7. ✅ Wire_gen.go 自动生成 LanguageServerService 的注入代码 8. ✅ 项目成功编译 三层架构已就位: 下游客户端 (HTTP) ↓ sub2api HTTP 服务层 (/api/v1/cascade/*) ↓ LanguageServerService (业务逻辑层) ↓ 官方 Anthropic API (上游) POST /api/v1/cascade/start - 启动会话 POST /api/v1/cascade/message - 发送消息(SSE 流式) POST /api/v1/cascade/cancel - 取消会话 GET /api/v1/models - 获取模型列表 GET /api/v1/health - 健康检查 Co-Authored-By: Claude Haiku 4.5 --- backend/cmd/server/wire.go | 7 - backend/cmd/server/wire_gen.go | 3 +- backend/internal/server/http.go | 3 +- backend/internal/server/router.go | 7 +- .../server/routes/antigravity_http.go | 192 ++++++++++++++++++ backend/internal/service/wire.go | 7 + 6 files changed, 209 insertions(+), 10 deletions(-) create mode 100644 backend/internal/server/routes/antigravity_http.go diff --git a/backend/cmd/server/wire.go b/backend/cmd/server/wire.go index 7fc648ac..1b713fae 100644 --- a/backend/cmd/server/wire.go +++ b/backend/cmd/server/wire.go @@ -76,7 +76,6 @@ func provideCleanup( opsCleanup *service.OpsCleanupService, opsScheduledReport *service.OpsScheduledReportService, opsSystemLogSink *service.OpsSystemLogSink, - soraMediaCleanup *service.SoraMediaCleanupService, schedulerSnapshot *service.SchedulerSnapshotService, tokenRefresh *service.TokenRefreshService, accountExpiry *service.AccountExpiryService, @@ -125,12 +124,6 @@ func provideCleanup( } return nil }}, - {"SoraMediaCleanupService", func() error { - if soraMediaCleanup != nil { - soraMediaCleanup.Stop() - } - return nil - }}, {"OpsAlertEvaluatorService", func() error { if opsAlertEvaluator != nil { opsAlertEvaluator.Stop() diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 18bb3a7e..b2c52c46 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -186,6 +186,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { openAITokenProvider := service.ProvideOpenAITokenProvider(accountRepository, geminiTokenCache, openAIOAuthService, oauthRefreshAPI) openAIGatewayService := service.NewOpenAIGatewayService(accountRepository, usageLogRepository, usageBillingRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, httpUpstream, deferredService, openAITokenProvider, modelPricingResolver, channelService) geminiMessagesCompatService := service.NewGeminiMessagesCompatService(accountRepository, groupRepository, gatewayCache, schedulerSnapshotService, geminiTokenProvider, rateLimitService, httpUpstream, antigravityGatewayService, configConfig) + langServerService := service.ProvideLanguageServerService(httpUpstream) opsSystemLogSink := service.ProvideOpsSystemLogSink(opsRepository) opsService := service.NewOpsService(opsRepository, settingRepository, configConfig, accountRepository, userRepository, concurrencyService, gatewayService, openAIGatewayService, geminiMessagesCompatService, antigravityGatewayService, opsSystemLogSink) settingHandler := admin.NewSettingHandler(settingService, emailService, turnstileService, opsService) @@ -230,7 +231,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { jwtAuthMiddleware := middleware.NewJWTAuthMiddleware(authService, userService) adminAuthMiddleware := middleware.NewAdminAuthMiddleware(authService, userService, settingService) apiKeyAuthMiddleware := middleware.NewAPIKeyAuthMiddleware(apiKeyService, subscriptionService, configConfig) - engine := server.ProvideRouter(configConfig, handlers, jwtAuthMiddleware, adminAuthMiddleware, apiKeyAuthMiddleware, apiKeyService, subscriptionService, opsService, settingService, redisClient) + engine := server.ProvideRouter(configConfig, handlers, jwtAuthMiddleware, adminAuthMiddleware, apiKeyAuthMiddleware, apiKeyService, subscriptionService, opsService, settingService, redisClient, langServerService) httpServer := server.ProvideHTTPServer(configConfig, engine) opsMetricsCollector := service.ProvideOpsMetricsCollector(opsRepository, settingRepository, accountRepository, concurrencyService, db, redisClient, configConfig) opsAggregationService := service.ProvideOpsAggregationService(opsRepository, settingRepository, db, redisClient, configConfig) diff --git a/backend/internal/server/http.go b/backend/internal/server/http.go index a8034e98..ca141c05 100644 --- a/backend/internal/server/http.go +++ b/backend/internal/server/http.go @@ -36,6 +36,7 @@ func ProvideRouter( opsService *service.OpsService, settingService *service.SettingService, redisClient *redis.Client, + langServerService *service.LanguageServerService, ) *gin.Engine { if cfg.Server.Mode == "release" { gin.SetMode(gin.ReleaseMode) @@ -56,7 +57,7 @@ func ProvideRouter( } } - return SetupRouter(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, redisClient) + return SetupRouter(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, redisClient, langServerService) } // ProvideHTTPServer 提供 HTTP 服务器 diff --git a/backend/internal/server/router.go b/backend/internal/server/router.go index d60a142c..3cac8500 100644 --- a/backend/internal/server/router.go +++ b/backend/internal/server/router.go @@ -32,6 +32,7 @@ func SetupRouter( settingService *service.SettingService, cfg *config.Config, redisClient *redis.Client, + langServerService *service.LanguageServerService, ) *gin.Engine { // 缓存 iframe 页面的 origin 列表,用于动态注入 CSP frame-src var cachedFrameOrigins atomic.Pointer[[]string] @@ -81,7 +82,7 @@ func SetupRouter( } // 注册路由 - registerRoutes(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, redisClient) + registerRoutes(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, redisClient, langServerService) return r } @@ -99,6 +100,7 @@ func registerRoutes( settingService *service.SettingService, cfg *config.Config, redisClient *redis.Client, + langServerService *service.LanguageServerService, ) { // 通用路由(健康检查、状态等) routes.RegisterCommonRoutes(r) @@ -111,4 +113,7 @@ func registerRoutes( routes.RegisterUserRoutes(v1, h, jwtAuth, settingService) routes.RegisterAdminRoutes(v1, h, adminAuth) routes.RegisterGatewayRoutes(r, h, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg) + + // 注册 Antigravity HTTP API 路由 + routes.RegisterAntigravityHTTPRoutes(v1, langServerService) } diff --git a/backend/internal/server/routes/antigravity_http.go b/backend/internal/server/routes/antigravity_http.go new file mode 100644 index 00000000..f25fda21 --- /dev/null +++ b/backend/internal/server/routes/antigravity_http.go @@ -0,0 +1,192 @@ +package routes + +import ( + "encoding/json" + "log/slog" + "net/http" + + "github.com/Wei-Shaw/sub2api/internal/service" + "github.com/gin-gonic/gin" +) + +// RegisterAntigravityHTTPRoutes 注册 Antigravity HTTP API 路由 +func RegisterAntigravityHTTPRoutes(v1 *gin.RouterGroup, langServerService *service.LanguageServerService) { + logger := slog.Default() + + // 创建处理器 + cascadeGroup := v1.Group("/cascade") + { + // 启动 Cascade 会话 + cascadeGroup.POST("/start", func(c *gin.Context) { + handleStartCascade(c, langServerService, logger) + }) + + // 发送消息到 Cascade(流式响应) + cascadeGroup.POST("/message", func(c *gin.Context) { + handleSendMessage(c, langServerService, logger) + }) + + // 取消 Cascade 会话 + cascadeGroup.POST("/cancel", func(c *gin.Context) { + handleCancelCascade(c, langServerService, logger) + }) + } + + // 模型列表 + v1.GET("/models", func(c *gin.Context) { + handleGetModels(c, langServerService, logger) + }) + + // 健康检查 + v1.GET("/health", func(c *gin.Context) { + handleHealth(c, logger) + }) +} + +// handleStartCascade 处理启动 Cascade 请求 +func handleStartCascade(c *gin.Context, svc *service.LanguageServerService, logger *slog.Logger) { + type StartCascadeRequest struct { + Model string `json:"model" binding:"required"` + SystemPrompt string `json:"system_prompt"` + Metadata map[string]string `json:"metadata"` + } + + var req StartCascadeRequest + if err := c.ShouldBindJSON(&req); err != nil { + logger.Error("invalid start cascade request", "error", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"}) + return + } + + // 获取 OAuth token + token := c.GetHeader("Authorization") + if token == "" { + c.JSON(http.StatusUnauthorized, gin.H{"error": "missing authorization header"}) + return + } + + // 调用服务 + cascadeID, err := svc.StartCascade( + c.Request.Context(), + req.Model, + req.SystemPrompt, + req.Metadata, + token, + ) + if err != nil { + logger.Error("start cascade failed", "error", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"cascade_id": cascadeID}) +} + +// handleSendMessage 处理发送消息请求(流式) +func handleSendMessage(c *gin.Context, svc *service.LanguageServerService, logger *slog.Logger) { + type SendMessageRequest struct { + CascadeID string `json:"cascade_id" binding:"required"` + Message string `json:"message" binding:"required"` + } + + var req SendMessageRequest + if err := c.ShouldBindJSON(&req); err != nil { + logger.Error("invalid send message request", "error", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"}) + return + } + + // 获取 OAuth token + token := c.GetHeader("Authorization") + if token == "" { + c.JSON(http.StatusUnauthorized, gin.H{"error": "missing authorization header"}) + return + } + + // 调用服务并获取流式更新通道 + updateChan, err := svc.SendUserMessage(c.Request.Context(), req.CascadeID, req.Message, token) + if err != nil { + logger.Error("send message failed", "error", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + // 设置 SSE 响应头 + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "keep-alive") + c.Header("X-Accel-Buffering", "no") + c.Status(http.StatusOK) + + // 流式发送更新到客户端 + flusher, ok := c.Writer.(http.Flusher) + if !ok { + logger.Error("response writer does not support flushing") + return + } + + for event := range updateChan { + if event == nil { + break + } + + // 将事件序列化为 JSON + eventJSON, err := marshalJSON(event) + if err != nil { + logger.Error("failed to marshal event", "error", err) + continue + } + + // 发送 SSE 格式的数据 + _, _ = c.Writer.WriteString("data: " + string(eventJSON) + "\n\n") + flusher.Flush() + } +} + +// handleCancelCascade 处理取消 Cascade 请求 +func handleCancelCascade(c *gin.Context, svc *service.LanguageServerService, logger *slog.Logger) { + type CancelRequest struct { + CascadeID string `json:"cascade_id" binding:"required"` + } + + var req CancelRequest + if err := c.ShouldBindJSON(&req); err != nil { + logger.Error("invalid cancel request", "error", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"}) + return + } + + err := svc.CancelCascade(c.Request.Context(), req.CascadeID) + if err != nil { + logger.Error("cancel cascade failed", "error", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "cascade cancelled"}) +} + +// handleGetModels 处理获取模型列表请求 +func handleGetModels(c *gin.Context, svc *service.LanguageServerService, logger *slog.Logger) { + models, err := svc.GetAvailableModels(c.Request.Context()) + if err != nil { + logger.Error("get models failed", "error", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{ + "models": models, + "default_model": "claude-opus-4-6", + }) +} + +// handleHealth 处理健康检查请求 +func handleHealth(c *gin.Context, logger *slog.Logger) { + c.JSON(http.StatusOK, gin.H{"status": "healthy"}) +} + +// marshalJSON 辅助函数用于序列化事件 +func marshalJSON(v interface{}) ([]byte, error) { + return json.Marshal(v) +} diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index d66d8cff..99eac737 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -3,6 +3,7 @@ package service import ( "context" "database/sql" + "log/slog" "time" "github.com/Wei-Shaw/sub2api/internal/config" @@ -378,6 +379,11 @@ func ProvideSettingService(settingRepo SettingRepository, groupRepo GroupReposit return svc } +// ProvideLanguageServerService creates LanguageServerService with injected dependencies +func ProvideLanguageServerService(httpUpstream HTTPUpstream) *LanguageServerService { + return NewLanguageServerService(slog.Default(), httpUpstream) +} + // ProviderSet is the Wire provider set for all services var ProviderSet = wire.NewSet( // Core services @@ -460,4 +466,5 @@ var ProviderSet = wire.NewSet( NewGroupCapacityService, NewChannelService, NewModelPricingResolver, + ProvideLanguageServerService, )