feat: Complete Wire dependency injection and HTTP API routes for Antigravity

已完成:
1.  修复 SoraMediaCleanupService 未定义问题(从 provideCleanup 中删除)
2.  创建 LanguageServerService 的 Wire 提供函数
3.  更新 ProvideRouter 签名以接收 langServerService 参数
4.  更新 SetupRouter 和 registerRoutes 函数签名
5.  创建完整的 HTTP 路由处理器 (antigravity_http.go)
6.  注册 Antigravity HTTP 路由到 v1 API 分组
7.  Wire_gen.go 自动生成 LanguageServerService 的注入代码
8.  项目成功编译

三层架构已就位:
  下游客户端 (HTTP)
    ↓
  sub2api HTTP 服务层 (/api/v1/cascade/*)
    ↓
  LanguageServerService (业务逻辑层)
    ↓
  官方 Anthropic API (上游)

POST /api/v1/cascade/start      - 启动会话
POST /api/v1/cascade/message    - 发送消息(SSE 流式)
POST /api/v1/cascade/cancel     - 取消会话
GET  /api/v1/models             - 获取模型列表
GET  /api/v1/health             - 健康检查

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
win 2026-04-10 21:23:54 +08:00
parent 84555dcb44
commit b8a4372328
6 changed files with 209 additions and 10 deletions

View File

@ -76,7 +76,6 @@ func provideCleanup(
opsCleanup *service.OpsCleanupService,
opsScheduledReport *service.OpsScheduledReportService,
opsSystemLogSink *service.OpsSystemLogSink,
soraMediaCleanup *service.SoraMediaCleanupService,
schedulerSnapshot *service.SchedulerSnapshotService,
tokenRefresh *service.TokenRefreshService,
accountExpiry *service.AccountExpiryService,
@ -125,12 +124,6 @@ func provideCleanup(
}
return nil
}},
{"SoraMediaCleanupService", func() error {
if soraMediaCleanup != nil {
soraMediaCleanup.Stop()
}
return nil
}},
{"OpsAlertEvaluatorService", func() error {
if opsAlertEvaluator != nil {
opsAlertEvaluator.Stop()

View File

@ -186,6 +186,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
openAITokenProvider := service.ProvideOpenAITokenProvider(accountRepository, geminiTokenCache, openAIOAuthService, oauthRefreshAPI)
openAIGatewayService := service.NewOpenAIGatewayService(accountRepository, usageLogRepository, usageBillingRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, httpUpstream, deferredService, openAITokenProvider, modelPricingResolver, channelService)
geminiMessagesCompatService := service.NewGeminiMessagesCompatService(accountRepository, groupRepository, gatewayCache, schedulerSnapshotService, geminiTokenProvider, rateLimitService, httpUpstream, antigravityGatewayService, configConfig)
langServerService := service.ProvideLanguageServerService(httpUpstream)
opsSystemLogSink := service.ProvideOpsSystemLogSink(opsRepository)
opsService := service.NewOpsService(opsRepository, settingRepository, configConfig, accountRepository, userRepository, concurrencyService, gatewayService, openAIGatewayService, geminiMessagesCompatService, antigravityGatewayService, opsSystemLogSink)
settingHandler := admin.NewSettingHandler(settingService, emailService, turnstileService, opsService)
@ -230,7 +231,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
jwtAuthMiddleware := middleware.NewJWTAuthMiddleware(authService, userService)
adminAuthMiddleware := middleware.NewAdminAuthMiddleware(authService, userService, settingService)
apiKeyAuthMiddleware := middleware.NewAPIKeyAuthMiddleware(apiKeyService, subscriptionService, configConfig)
engine := server.ProvideRouter(configConfig, handlers, jwtAuthMiddleware, adminAuthMiddleware, apiKeyAuthMiddleware, apiKeyService, subscriptionService, opsService, settingService, redisClient)
engine := server.ProvideRouter(configConfig, handlers, jwtAuthMiddleware, adminAuthMiddleware, apiKeyAuthMiddleware, apiKeyService, subscriptionService, opsService, settingService, redisClient, langServerService)
httpServer := server.ProvideHTTPServer(configConfig, engine)
opsMetricsCollector := service.ProvideOpsMetricsCollector(opsRepository, settingRepository, accountRepository, concurrencyService, db, redisClient, configConfig)
opsAggregationService := service.ProvideOpsAggregationService(opsRepository, settingRepository, db, redisClient, configConfig)

View File

@ -36,6 +36,7 @@ func ProvideRouter(
opsService *service.OpsService,
settingService *service.SettingService,
redisClient *redis.Client,
langServerService *service.LanguageServerService,
) *gin.Engine {
if cfg.Server.Mode == "release" {
gin.SetMode(gin.ReleaseMode)
@ -56,7 +57,7 @@ func ProvideRouter(
}
}
return SetupRouter(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, redisClient)
return SetupRouter(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, redisClient, langServerService)
}
// ProvideHTTPServer 提供 HTTP 服务器

View File

@ -32,6 +32,7 @@ func SetupRouter(
settingService *service.SettingService,
cfg *config.Config,
redisClient *redis.Client,
langServerService *service.LanguageServerService,
) *gin.Engine {
// 缓存 iframe 页面的 origin 列表,用于动态注入 CSP frame-src
var cachedFrameOrigins atomic.Pointer[[]string]
@ -81,7 +82,7 @@ func SetupRouter(
}
// 注册路由
registerRoutes(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, redisClient)
registerRoutes(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, redisClient, langServerService)
return r
}
@ -99,6 +100,7 @@ func registerRoutes(
settingService *service.SettingService,
cfg *config.Config,
redisClient *redis.Client,
langServerService *service.LanguageServerService,
) {
// 通用路由(健康检查、状态等)
routes.RegisterCommonRoutes(r)
@ -111,4 +113,7 @@ func registerRoutes(
routes.RegisterUserRoutes(v1, h, jwtAuth, settingService)
routes.RegisterAdminRoutes(v1, h, adminAuth)
routes.RegisterGatewayRoutes(r, h, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg)
// 注册 Antigravity HTTP API 路由
routes.RegisterAntigravityHTTPRoutes(v1, langServerService)
}

View File

@ -0,0 +1,192 @@
package routes
import (
"encoding/json"
"log/slog"
"net/http"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
)
// RegisterAntigravityHTTPRoutes 注册 Antigravity HTTP API 路由
func RegisterAntigravityHTTPRoutes(v1 *gin.RouterGroup, langServerService *service.LanguageServerService) {
logger := slog.Default()
// 创建处理器
cascadeGroup := v1.Group("/cascade")
{
// 启动 Cascade 会话
cascadeGroup.POST("/start", func(c *gin.Context) {
handleStartCascade(c, langServerService, logger)
})
// 发送消息到 Cascade流式响应
cascadeGroup.POST("/message", func(c *gin.Context) {
handleSendMessage(c, langServerService, logger)
})
// 取消 Cascade 会话
cascadeGroup.POST("/cancel", func(c *gin.Context) {
handleCancelCascade(c, langServerService, logger)
})
}
// 模型列表
v1.GET("/models", func(c *gin.Context) {
handleGetModels(c, langServerService, logger)
})
// 健康检查
v1.GET("/health", func(c *gin.Context) {
handleHealth(c, logger)
})
}
// handleStartCascade 处理启动 Cascade 请求
func handleStartCascade(c *gin.Context, svc *service.LanguageServerService, logger *slog.Logger) {
type StartCascadeRequest struct {
Model string `json:"model" binding:"required"`
SystemPrompt string `json:"system_prompt"`
Metadata map[string]string `json:"metadata"`
}
var req StartCascadeRequest
if err := c.ShouldBindJSON(&req); err != nil {
logger.Error("invalid start cascade request", "error", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"})
return
}
// 获取 OAuth token
token := c.GetHeader("Authorization")
if token == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "missing authorization header"})
return
}
// 调用服务
cascadeID, err := svc.StartCascade(
c.Request.Context(),
req.Model,
req.SystemPrompt,
req.Metadata,
token,
)
if err != nil {
logger.Error("start cascade failed", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"cascade_id": cascadeID})
}
// handleSendMessage 处理发送消息请求(流式)
func handleSendMessage(c *gin.Context, svc *service.LanguageServerService, logger *slog.Logger) {
type SendMessageRequest struct {
CascadeID string `json:"cascade_id" binding:"required"`
Message string `json:"message" binding:"required"`
}
var req SendMessageRequest
if err := c.ShouldBindJSON(&req); err != nil {
logger.Error("invalid send message request", "error", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"})
return
}
// 获取 OAuth token
token := c.GetHeader("Authorization")
if token == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "missing authorization header"})
return
}
// 调用服务并获取流式更新通道
updateChan, err := svc.SendUserMessage(c.Request.Context(), req.CascadeID, req.Message, token)
if err != nil {
logger.Error("send message failed", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
// 设置 SSE 响应头
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("X-Accel-Buffering", "no")
c.Status(http.StatusOK)
// 流式发送更新到客户端
flusher, ok := c.Writer.(http.Flusher)
if !ok {
logger.Error("response writer does not support flushing")
return
}
for event := range updateChan {
if event == nil {
break
}
// 将事件序列化为 JSON
eventJSON, err := marshalJSON(event)
if err != nil {
logger.Error("failed to marshal event", "error", err)
continue
}
// 发送 SSE 格式的数据
_, _ = c.Writer.WriteString("data: " + string(eventJSON) + "\n\n")
flusher.Flush()
}
}
// handleCancelCascade 处理取消 Cascade 请求
func handleCancelCascade(c *gin.Context, svc *service.LanguageServerService, logger *slog.Logger) {
type CancelRequest struct {
CascadeID string `json:"cascade_id" binding:"required"`
}
var req CancelRequest
if err := c.ShouldBindJSON(&req); err != nil {
logger.Error("invalid cancel request", "error", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"})
return
}
err := svc.CancelCascade(c.Request.Context(), req.CascadeID)
if err != nil {
logger.Error("cancel cascade failed", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"message": "cascade cancelled"})
}
// handleGetModels 处理获取模型列表请求
func handleGetModels(c *gin.Context, svc *service.LanguageServerService, logger *slog.Logger) {
models, err := svc.GetAvailableModels(c.Request.Context())
if err != nil {
logger.Error("get models failed", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{
"models": models,
"default_model": "claude-opus-4-6",
})
}
// handleHealth 处理健康检查请求
func handleHealth(c *gin.Context, logger *slog.Logger) {
c.JSON(http.StatusOK, gin.H{"status": "healthy"})
}
// marshalJSON 辅助函数用于序列化事件
func marshalJSON(v interface{}) ([]byte, error) {
return json.Marshal(v)
}

View File

@ -3,6 +3,7 @@ package service
import (
"context"
"database/sql"
"log/slog"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
@ -378,6 +379,11 @@ func ProvideSettingService(settingRepo SettingRepository, groupRepo GroupReposit
return svc
}
// ProvideLanguageServerService creates LanguageServerService with injected dependencies
func ProvideLanguageServerService(httpUpstream HTTPUpstream) *LanguageServerService {
return NewLanguageServerService(slog.Default(), httpUpstream)
}
// ProviderSet is the Wire provider set for all services
var ProviderSet = wire.NewSet(
// Core services
@ -460,4 +466,5 @@ var ProviderSet = wire.NewSet(
NewGroupCapacityService,
NewChannelService,
NewModelPricingResolver,
ProvideLanguageServerService,
)