package taskcenter import ( "bindbox-game/internal/pkg/async" "bindbox-game/internal/pkg/logger" "bindbox-game/internal/repository/mysql" "bindbox-game/internal/repository/mysql/dao" tcmodel "bindbox-game/internal/repository/mysql/task_center" "context" "encoding/json" "errors" "fmt" "time" titlesvc "bindbox-game/internal/service/title" usersvc "bindbox-game/internal/service/user" "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/datatypes" "gorm.io/gorm" "gorm.io/gorm/clause" ) type Service interface { ListTasks(ctx context.Context, in ListTasksInput) (items []TaskItem, total int64, err error) CreateTask(ctx context.Context, in CreateTaskInput) (int64, error) ModifyTask(ctx context.Context, id int64, in ModifyTaskInput) error DeleteTask(ctx context.Context, id int64) error ListTaskTiers(ctx context.Context, taskID int64) ([]TaskTierItem, error) UpsertTaskTiers(ctx context.Context, taskID int64, tiers []TaskTierInput) error ListTaskRewards(ctx context.Context, taskID int64) ([]TaskRewardItem, error) UpsertTaskRewards(ctx context.Context, taskID int64, rewards []TaskRewardInput) error GetUserProgress(ctx context.Context, userID int64, taskID int64) (*UserProgress, error) ClaimTier(ctx context.Context, userID int64, taskID int64, tierID int64) error OnOrderPaid(ctx context.Context, userID int64, orderID int64) error OnInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error StartWorker(ctx context.Context) } type service struct { logger logger.CustomLogger readDB *dao.Query writeDB *dao.Query repo mysql.Repo redis *redis.Client queue async.TaskQueue userSvc usersvc.Service titleSvc titlesvc.Service } func New(l logger.CustomLogger, db mysql.Repo, rdb *redis.Client, userSvc usersvc.Service, titleSvc titlesvc.Service) Service { var q async.TaskQueue if rdb != nil { q = async.NewRedisTaskQueue(rdb) } return &service{ logger: l, readDB: dao.Use(db.GetDbR()), writeDB: dao.Use(db.GetDbW()), repo: db, redis: rdb, queue: q, userSvc: userSvc, titleSvc: titleSvc, } } type ListTasksInput struct { Page int PageSize int } type TaskItem struct { ID int64 Name string Description string Status int32 StartTime int64 EndTime int64 Visibility int32 Tiers []TaskTierItem Rewards []TaskRewardItem } type UserProgress struct { TaskID int64 UserID int64 OrderCount int64 InviteCount int64 FirstOrder bool ClaimedTiers []int64 } type CreateTaskInput struct { Name string Description string Status int32 StartTime *time.Time EndTime *time.Time Visibility int32 } type ModifyTaskInput struct { Name string Description string Status int32 StartTime *time.Time EndTime *time.Time Visibility int32 } type TaskTierInput struct { Metric string Operator string Threshold int64 Window string Repeatable int32 Priority int32 ExtraParams datatypes.JSON } type TaskTierItem struct { ID int64 `json:"id"` Metric string `json:"metric"` Operator string `json:"operator"` Threshold int64 `json:"threshold"` Window string `json:"window"` Repeatable int32 `json:"repeatable"` Priority int32 `json:"priority"` ExtraParams datatypes.JSON `json:"extra_params"` } type TaskRewardInput struct { TierID int64 `json:"tier_id"` RewardType string `json:"reward_type"` RewardPayload datatypes.JSON `json:"reward_payload"` Quantity int64 `json:"quantity"` } type TaskRewardItem struct { ID int64 `json:"id"` TierID int64 `json:"tier_id"` RewardType string `json:"reward_type"` RewardPayload datatypes.JSON `json:"reward_payload"` Quantity int64 `json:"quantity"` } func (s *service) ListTasks(ctx context.Context, in ListTasksInput) (items []TaskItem, total int64, err error) { db := s.repo.GetDbR() var rows []tcmodel.Task q := db.Model(&tcmodel.Task{}) if in.PageSize <= 0 { in.PageSize = 20 } if in.Page <= 0 { in.Page = 1 } if err = q.Count(&total).Error; err != nil { return nil, 0, err } if err = q.Preload("Tiers", func(db *gorm.DB) *gorm.DB { return db.Order("priority asc, id asc") }).Preload("Rewards", func(db *gorm.DB) *gorm.DB { return db.Order("id asc") }).Offset((in.Page - 1) * in.PageSize).Limit(in.PageSize).Order("id desc").Find(&rows).Error; err != nil { return nil, 0, err } out := make([]TaskItem, len(rows)) for i, v := range rows { var st, et int64 if v.StartTime != nil { st = v.StartTime.Unix() } if v.EndTime != nil { et = v.EndTime.Unix() } out[i] = TaskItem{ID: v.ID, Name: v.Name, Description: v.Description, Status: v.Status, StartTime: st, EndTime: et, Visibility: v.Visibility} // 填充 Tiers out[i].Tiers = make([]TaskTierItem, len(v.Tiers)) for j, t := range v.Tiers { out[i].Tiers[j] = TaskTierItem{ID: t.ID, Metric: t.Metric, Operator: t.Operator, Threshold: t.Threshold, Window: t.Window, Repeatable: t.Repeatable, Priority: t.Priority, ExtraParams: t.ExtraParams} } // 填充 Rewards out[i].Rewards = make([]TaskRewardItem, len(v.Rewards)) for j, r := range v.Rewards { out[i].Rewards[j] = TaskRewardItem{ID: r.ID, TierID: r.TierID, RewardType: r.RewardType, RewardPayload: r.RewardPayload, Quantity: r.Quantity} } } return out, total, nil } func (s *service) GetUserProgress(ctx context.Context, userID int64, taskID int64) (*UserProgress, error) { db := s.repo.GetDbR() var row tcmodel.UserTaskProgress if err := db.Where("user_id=? AND task_id=?", userID, taskID).First(&row).Error; err != nil { return &UserProgress{TaskID: taskID, UserID: userID, ClaimedTiers: []int64{}}, nil } var claimed []int64 if len(row.ClaimedTiers) > 0 { _ = json.Unmarshal([]byte(row.ClaimedTiers), &claimed) } return &UserProgress{TaskID: taskID, UserID: userID, OrderCount: row.OrderCount, InviteCount: row.InviteCount, FirstOrder: row.FirstOrder == 1, ClaimedTiers: claimed}, nil } func (s *service) ClaimTier(ctx context.Context, userID int64, taskID int64, tierID int64) error { return s.repo.GetDbW().Transaction(func(tx *gorm.DB) error { var p tcmodel.UserTaskProgress if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=?", userID, taskID).First(&p).Error; err != nil { return errors.New("progress_not_found") } var claimed []int64 if len(p.ClaimedTiers) > 0 { _ = json.Unmarshal([]byte(p.ClaimedTiers), &claimed) } for _, id := range claimed { if id == tierID { return nil } } claimed = append(claimed, tierID) b, _ := json.Marshal(claimed) p.ClaimedTiers = datatypes.JSON(b) return tx.Model(&tcmodel.UserTaskProgress{}).Where("id=?", p.ID).Update("claimed_tiers", p.ClaimedTiers).Error }) } func (s *service) CreateTask(ctx context.Context, in CreateTaskInput) (int64, error) { db := s.repo.GetDbW() row := &tcmodel.Task{Name: in.Name, Description: in.Description, Status: in.Status, StartTime: in.StartTime, EndTime: in.EndTime, Visibility: in.Visibility} if err := db.Create(row).Error; err != nil { return 0, err } return row.ID, s.invalidateCache(ctx) } func (s *service) ModifyTask(ctx context.Context, id int64, in ModifyTaskInput) error { db := s.repo.GetDbW() if err := db.Model(&tcmodel.Task{}).Where("id=?", id).Updates(map[string]any{"name": in.Name, "description": in.Description, "status": in.Status, "start_time": in.StartTime, "end_time": in.EndTime, "visibility": in.Visibility}).Error; err != nil { return err } return s.invalidateCache(ctx) } func (s *service) DeleteTask(ctx context.Context, id int64) error { db := s.repo.GetDbW() if err := db.Where("id=?", id).Delete(&tcmodel.Task{}).Error; err != nil { return err } return s.invalidateCache(ctx) } func (s *service) ListTaskTiers(ctx context.Context, taskID int64) ([]TaskTierItem, error) { db := s.repo.GetDbR() var rows []tcmodel.TaskTier if err := db.Where("task_id=?", taskID).Order("priority asc, id asc").Find(&rows).Error; err != nil { return nil, err } out := make([]TaskTierItem, len(rows)) for i, v := range rows { out[i] = TaskTierItem{ID: v.ID, Metric: v.Metric, Operator: v.Operator, Threshold: v.Threshold, Window: v.Window, Repeatable: v.Repeatable, Priority: v.Priority, ExtraParams: v.ExtraParams} } return out, nil } func (s *service) UpsertTaskTiers(ctx context.Context, taskID int64, tiers []TaskTierInput) error { db := s.repo.GetDbW() if err := db.Where("task_id=?", taskID).Delete(&tcmodel.TaskTier{}).Error; err != nil { return err } for _, t := range tiers { row := &tcmodel.TaskTier{TaskID: taskID, Metric: t.Metric, Operator: t.Operator, Threshold: t.Threshold, Window: t.Window, Repeatable: t.Repeatable, Priority: t.Priority, ExtraParams: t.ExtraParams} if err := db.Create(row).Error; err != nil { return err } } return s.invalidateCache(ctx) } func (s *service) ListTaskRewards(ctx context.Context, taskID int64) ([]TaskRewardItem, error) { db := s.repo.GetDbR() var rows []tcmodel.TaskReward if err := db.Where("task_id=?", taskID).Order("id asc").Find(&rows).Error; err != nil { return nil, err } out := make([]TaskRewardItem, len(rows)) for i, v := range rows { out[i] = TaskRewardItem{ID: v.ID, TierID: v.TierID, RewardType: v.RewardType, RewardPayload: v.RewardPayload, Quantity: v.Quantity} } return out, nil } func (s *service) UpsertTaskRewards(ctx context.Context, taskID int64, rewards []TaskRewardInput) error { db := s.repo.GetDbW() if err := db.Where("task_id=?", taskID).Delete(&tcmodel.TaskReward{}).Error; err != nil { return err } for _, r := range rewards { row := &tcmodel.TaskReward{TaskID: taskID, TierID: r.TierID, RewardType: r.RewardType, RewardPayload: r.RewardPayload, Quantity: r.Quantity} if err := db.Create(row).Error; err != nil { return err } } return s.invalidateCache(ctx) } func (s *service) OnOrderPaid(ctx context.Context, userID int64, orderID int64) error { if s.queue != nil { return s.queue.PublishOrderPaid(ctx, userID, orderID) } return s.processOrderPaid(ctx, userID, orderID) } func (s *service) processOrderPaid(ctx context.Context, userID int64, orderID int64) error { // 1. 获取订单金额 ord, err := s.readDB.Orders.WithContext(ctx).Where(s.readDB.Orders.ID.Eq(orderID)).First() if err != nil { return err } amount := ord.ActualAmount // 2. 更新邀请人累计金额并检查是否触发有效邀请 var inviterID int64 var oldAmount int64 var newAmount int64 // 使用事务更新 UserInvites err = s.writeDB.Transaction(func(tx *dao.Query) error { uInv, err := tx.UserInvites.WithContext(ctx).Clauses(clause.Locking{Strength: "UPDATE"}).Where(tx.UserInvites.InviteeID.Eq(userID)).First() if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil } return err } inviterID = uInv.InviterID oldAmount = uInv.AccumulatedAmount newAmount = oldAmount + amount updates := map[string]any{ "accumulated_amount": newAmount, } _, err = tx.UserInvites.WithContext(ctx).Where(tx.UserInvites.ID.Eq(uInv.ID)).Updates(updates) return err }) if err != nil { return err } // 3. 处理普通任务 tasks, err := s.getActiveTasks(ctx) if err != nil { return err } for _, t := range tasks { // Filter tasks: Only process if it has order related metrics hasOrderMetric := false for _, tier := range t.Tiers { if tier.Metric == MetricFirstOrder || tier.Metric == MetricOrderCount { hasOrderMetric = true break } } if !hasOrderMetric { continue } var p tcmodel.UserTaskProgress err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error { if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=?", userID, t.ID).First(&p).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { p = tcmodel.UserTaskProgress{UserID: userID, TaskID: t.ID, OrderCount: 1, FirstOrder: 1} return tx.Create(&p).Error } return err } if err := s.checkAndResetDailyProgress(ctx, tx, &t, &p); err != nil { return err } p.OrderCount++ if p.OrderCount == 1 { p.FirstOrder = 1 } return tx.Save(&p).Error }) if err != nil { s.logger.Error("failed to update progress", zap.Error(err)) continue } if err := s.matchAndGrant(ctx, &t, &p, "order", orderID, fmt.Sprintf("ord:%d", orderID)); err != nil { s.logger.Error("failed to grant reward", zap.Error(err)) } } // 4. 处理邀请人任务 (有效邀请) if inviterID > 0 { for _, t := range tasks { tiers := t.Tiers // 检查该任务是否有 effective_invite_count 类型的 Tier,且是否刚好跨越阈值 triggered := false for _, tier := range tiers { if tier.Metric == MetricEffectiveInviteCount { var extra struct { AmountThreshold int64 `json:"amount_threshold"` } if len(tier.ExtraParams) > 0 { _ = json.Unmarshal([]byte(tier.ExtraParams), &extra) } threshold := extra.AmountThreshold if threshold <= 0 { threshold = 1 // 默认任意金额 } // 如果之前的累计金额未达到阈值,而现在的累计金额达到了阈值,则触发 if oldAmount < threshold && newAmount >= threshold { triggered = true break // 该任务触发一次即可 } } } if triggered { var pInv tcmodel.UserTaskProgress err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error { if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=?", inviterID, t.ID).First(&pInv).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { pInv = tcmodel.UserTaskProgress{UserID: inviterID, TaskID: t.ID, EffectiveInviteCount: 1} return tx.Create(&pInv).Error } return err } // 有效邀请通常不重置(除非是每日任务?一般是长期任务) if err := s.checkAndResetDailyProgress(ctx, tx, &t, &pInv); err != nil { return err } pInv.EffectiveInviteCount++ return tx.Save(&pInv).Error }) if err == nil { // 尝试发放奖励 // sourceID 使用 userID (被邀请人ID),eventID 使用特殊前缀防止混淆 _ = s.matchAndGrant(ctx, &t, &pInv, SourceTypeEffectiveInvite, userID, fmt.Sprintf("eff_inv:%d:%d", userID, t.ID)) } else { s.logger.Error("failed to update inviter progress", zap.Error(err)) } } } } return nil } func (s *service) OnInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error { if s.queue != nil { return s.queue.PublishInviteSuccess(ctx, inviterID, inviteeID) } return s.processInviteSuccess(ctx, inviterID, inviteeID) } func (s *service) processInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error { tasks, err := s.getActiveTasks(ctx) if err != nil { return err } for _, t := range tasks { hasInviteMetric := false for _, tier := range t.Tiers { if tier.Metric == MetricInviteCount { hasInviteMetric = true break } } if !hasInviteMetric { continue } var p tcmodel.UserTaskProgress err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error { if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=?", inviterID, t.ID).First(&p).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { p = tcmodel.UserTaskProgress{UserID: inviterID, TaskID: t.ID, InviteCount: 1} return tx.Create(&p).Error } return err } if err := s.checkAndResetDailyProgress(ctx, tx, &t, &p); err != nil { return err } p.InviteCount++ return tx.Save(&p).Error }) if err != nil { return err } if err := s.matchAndGrant(ctx, &t, &p, SourceTypeInvite, inviteeID, fmt.Sprintf("inv:%d", inviteeID)); err != nil { return err } } return nil } func (s *service) checkAndResetDailyProgress(ctx context.Context, tx *gorm.DB, t *tcmodel.Task, p *tcmodel.UserTaskProgress) error { isDaily := false if len(t.Tiers) > 0 { for _, tier := range t.Tiers { if tier.Window == WindowDaily { isDaily = true break } } } else { var count int64 if err := tx.Model(&tcmodel.TaskTier{}).Where("task_id = ? AND window = ?", t.ID, WindowDaily).Count(&count).Error; err != nil { return err } isDaily = count > 0 } if !isDaily { return nil } now := time.Now() if p.UpdatedAt.IsZero() { return nil } y1, m1, d1 := p.UpdatedAt.Date() y2, m2, d2 := now.Date() if y1 == y2 && m1 == m2 && d1 == d2 { return nil } p.OrderCount = 0 p.InviteCount = 0 p.FirstOrder = 0 p.ClaimedTiers = datatypes.JSON("[]") return nil } func (s *service) matchAndGrant(ctx context.Context, t *tcmodel.Task, p *tcmodel.UserTaskProgress, sourceType string, sourceID int64, eventID string) error { tiers := t.Tiers if len(tiers) == 0 { if err := s.repo.GetDbR().Where("task_id=?", t.ID).Order("priority asc").Find(&tiers).Error; err != nil { return err } } var claimed []int64 if len(p.ClaimedTiers) > 0 { _ = json.Unmarshal([]byte(p.ClaimedTiers), &claimed) } claimedSet := map[int64]struct{}{} for _, id := range claimed { claimedSet[id] = struct{}{} } for _, tier := range tiers { if _, ok := claimedSet[tier.ID]; ok { continue } hit := false switch tier.Metric { case MetricFirstOrder: hit = p.FirstOrder == 1 case MetricOrderCount: if tier.Operator == OperatorGTE { hit = p.OrderCount >= tier.Threshold } else { hit = p.OrderCount == tier.Threshold } case MetricInviteCount: if tier.Operator == OperatorGTE { hit = p.InviteCount >= tier.Threshold } else { hit = p.InviteCount == tier.Threshold } case MetricEffectiveInviteCount: if tier.Operator == OperatorGTE { hit = p.EffectiveInviteCount >= tier.Threshold } else { hit = p.EffectiveInviteCount == tier.Threshold } } if !hit { continue } if err := s.grantTierRewards(ctx, t.ID, tier.ID, p.UserID, sourceType, sourceID, eventID); err != nil { return err } // 安全更新状态 err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error { var latestP tcmodel.UserTaskProgress if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id=?", p.ID).First(&latestP).Error; err != nil { return err } var latestClaimed []int64 if len(latestP.ClaimedTiers) > 0 { _ = json.Unmarshal([]byte(latestP.ClaimedTiers), &latestClaimed) } for _, id := range latestClaimed { if id == tier.ID { return nil } } latestClaimed = append(latestClaimed, tier.ID) b, _ := json.Marshal(latestClaimed) latestP.ClaimedTiers = datatypes.JSON(b) return tx.Model(&latestP).Update("claimed_tiers", latestP.ClaimedTiers).Error }) if err != nil { return err } } return nil } func (s *service) grantTierRewards(ctx context.Context, taskID int64, tierID int64, userID int64, sourceType string, sourceID int64, eventID string) error { var rewards []tcmodel.TaskReward if err := s.repo.GetDbR().Where("task_id=? AND tier_id=?", taskID, tierID).Find(&rewards).Error; err != nil { return err } idk := fmt.Sprintf("%d:%d:%d:%s:%d", userID, taskID, tierID, sourceType, sourceID) var exists tcmodel.TaskEventLog if err := s.repo.GetDbR().Where("idempotency_key=?", idk).First(&exists).Error; err == nil && exists.ID > 0 { return nil } tx := s.repo.GetDbW().Begin() for _, r := range rewards { var err error switch r.RewardType { case "points": var pl struct{ Points int64 } _ = json.Unmarshal([]byte(r.RewardPayload), &pl) if pl.Points != 0 { err = s.userSvc.AddPoints(ctx, userID, pl.Points, "", "task_center", nil, nil) } case "coupon": var pl struct { CouponID int64 Quantity int } _ = json.Unmarshal([]byte(r.RewardPayload), &pl) if pl.CouponID > 0 { qty := 1 if pl.Quantity > 0 { qty = pl.Quantity } for i := 0; i < qty; i++ { if err = s.userSvc.AddCoupon(ctx, userID, pl.CouponID); err != nil { break } } } case "item_card": var pl struct { CardID int64 Quantity int } _ = json.Unmarshal([]byte(r.RewardPayload), &pl) if pl.CardID > 0 { if pl.Quantity <= 0 { pl.Quantity = 1 } err = s.userSvc.AddItemCard(ctx, userID, pl.CardID, pl.Quantity) } case "title": var pl struct{ TitleID int64 } _ = json.Unmarshal([]byte(r.RewardPayload), &pl) if pl.TitleID > 0 { err = s.titleSvc.AssignUserTitle(ctx, userID, pl.TitleID, nil, "task_center") } } if err != nil { tx.Rollback() return err } } if err := tx.Create(&tcmodel.TaskEventLog{EventID: eventID, SourceType: sourceType, SourceID: sourceID, UserID: userID, TaskID: taskID, TierID: tierID, IdempotencyKey: idk, Status: "granted"}).Error; err != nil { tx.Rollback() return err } return tx.Commit().Error }