929 lines
29 KiB
Go
929 lines
29 KiB
Go
package taskcenter
|
||
|
||
import (
|
||
"bindbox-game/internal/pkg/async"
|
||
"bindbox-game/internal/pkg/logger"
|
||
"bindbox-game/internal/pkg/util/remark"
|
||
"bindbox-game/internal/repository/mysql"
|
||
"bindbox-game/internal/repository/mysql/dao"
|
||
"bindbox-game/internal/repository/mysql/model"
|
||
tcmodel "bindbox-game/internal/repository/mysql/task_center"
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"time"
|
||
|
||
gamesvc "bindbox-game/internal/service/game"
|
||
titlesvc "bindbox-game/internal/service/title"
|
||
usersvc "bindbox-game/internal/service/user"
|
||
|
||
"github.com/redis/go-redis/v9"
|
||
"go.uber.org/zap"
|
||
"gorm.io/datatypes"
|
||
"gorm.io/gorm"
|
||
"gorm.io/gorm/clause"
|
||
)
|
||
|
||
type Service interface {
|
||
ListTasks(ctx context.Context, in ListTasksInput) (items []TaskItem, total int64, err error)
|
||
CreateTask(ctx context.Context, in CreateTaskInput) (int64, error)
|
||
ModifyTask(ctx context.Context, id int64, in ModifyTaskInput) error
|
||
DeleteTask(ctx context.Context, id int64) error
|
||
ListTaskTiers(ctx context.Context, taskID int64) ([]TaskTierItem, error)
|
||
UpsertTaskTiers(ctx context.Context, taskID int64, tiers []TaskTierInput) error
|
||
ListTaskRewards(ctx context.Context, taskID int64) ([]TaskRewardItem, error)
|
||
UpsertTaskRewards(ctx context.Context, taskID int64, rewards []TaskRewardInput) error
|
||
GetUserProgress(ctx context.Context, userID int64, taskID int64) (*UserProgress, error)
|
||
ClaimTier(ctx context.Context, userID int64, taskID int64, tierID int64) error
|
||
OnOrderPaid(ctx context.Context, userID int64, orderID int64) error
|
||
OnInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error
|
||
StartWorker(ctx context.Context)
|
||
}
|
||
|
||
type service struct {
|
||
logger logger.CustomLogger
|
||
readDB *dao.Query
|
||
writeDB *dao.Query
|
||
repo mysql.Repo
|
||
redis *redis.Client
|
||
queue async.TaskQueue
|
||
userSvc usersvc.Service
|
||
titleSvc titlesvc.Service
|
||
}
|
||
|
||
func New(l logger.CustomLogger, db mysql.Repo, rdb *redis.Client, userSvc usersvc.Service, titleSvc titlesvc.Service) Service {
|
||
var q async.TaskQueue
|
||
if rdb != nil {
|
||
q = async.NewRedisTaskQueue(rdb)
|
||
}
|
||
return &service{
|
||
logger: l,
|
||
readDB: dao.Use(db.GetDbR()),
|
||
writeDB: dao.Use(db.GetDbW()),
|
||
repo: db,
|
||
redis: rdb,
|
||
queue: q,
|
||
userSvc: userSvc,
|
||
titleSvc: titleSvc,
|
||
}
|
||
}
|
||
|
||
type ListTasksInput struct {
|
||
Page int
|
||
PageSize int
|
||
}
|
||
|
||
type TaskItem struct {
|
||
ID int64
|
||
Name string
|
||
Description string
|
||
Status int32
|
||
StartTime int64
|
||
EndTime int64
|
||
Visibility int32
|
||
Tiers []TaskTierItem
|
||
Rewards []TaskRewardItem
|
||
}
|
||
|
||
type UserProgress struct {
|
||
TaskID int64
|
||
UserID int64
|
||
OrderCount int64
|
||
OrderAmount int64
|
||
InviteCount int64
|
||
FirstOrder bool
|
||
ClaimedTiers []int64
|
||
}
|
||
|
||
type CreateTaskInput struct {
|
||
Name string
|
||
Description string
|
||
Status int32
|
||
StartTime *time.Time
|
||
EndTime *time.Time
|
||
Visibility int32
|
||
}
|
||
|
||
type ModifyTaskInput struct {
|
||
Name string
|
||
Description string
|
||
Status int32
|
||
StartTime *time.Time
|
||
EndTime *time.Time
|
||
Visibility int32
|
||
}
|
||
|
||
type TaskTierInput struct {
|
||
Metric string
|
||
Operator string
|
||
Threshold int64
|
||
Window string
|
||
Repeatable int32
|
||
Priority int32
|
||
ActivityID int64
|
||
ExtraParams datatypes.JSON
|
||
}
|
||
|
||
type TaskTierItem struct {
|
||
ID int64 `json:"id"`
|
||
Metric string `json:"metric"`
|
||
Operator string `json:"operator"`
|
||
Threshold int64 `json:"threshold"`
|
||
Window string `json:"window"`
|
||
Repeatable int32 `json:"repeatable"`
|
||
Priority int32 `json:"priority"`
|
||
ActivityID int64 `json:"activity_id"`
|
||
ExtraParams datatypes.JSON `json:"extra_params"`
|
||
}
|
||
|
||
type TaskRewardInput struct {
|
||
TierID int64 `json:"tier_id"`
|
||
RewardType string `json:"reward_type"`
|
||
RewardPayload datatypes.JSON `json:"reward_payload"`
|
||
Quantity int64 `json:"quantity"`
|
||
}
|
||
|
||
type TaskRewardItem struct {
|
||
ID int64 `json:"id"`
|
||
TierID int64 `json:"tier_id"`
|
||
RewardType string `json:"reward_type"`
|
||
RewardPayload datatypes.JSON `json:"reward_payload"`
|
||
Quantity int64 `json:"quantity"`
|
||
RewardName string `json:"reward_name"`
|
||
}
|
||
|
||
func (s *service) ListTasks(ctx context.Context, in ListTasksInput) (items []TaskItem, total int64, err error) {
|
||
db := s.repo.GetDbR()
|
||
var rows []tcmodel.Task
|
||
q := db.Model(&tcmodel.Task{})
|
||
if in.PageSize <= 0 {
|
||
in.PageSize = 20
|
||
}
|
||
if in.Page <= 0 {
|
||
in.Page = 1
|
||
}
|
||
if err = q.Count(&total).Error; err != nil {
|
||
return nil, 0, err
|
||
}
|
||
if err = q.Preload("Tiers", func(db *gorm.DB) *gorm.DB {
|
||
return db.Order("priority asc, id asc")
|
||
}).Preload("Rewards", func(db *gorm.DB) *gorm.DB {
|
||
return db.Order("id asc")
|
||
}).Offset((in.Page - 1) * in.PageSize).Limit(in.PageSize).Order("id desc").Find(&rows).Error; err != nil {
|
||
return nil, 0, err
|
||
}
|
||
out := make([]TaskItem, len(rows))
|
||
|
||
// Pre-calculation: collect IDs for batch lookup
|
||
var couponIDs []int64
|
||
var itemCardIDs []int64
|
||
var titleIDs []int64
|
||
for _, v := range rows {
|
||
for _, r := range v.Rewards {
|
||
switch r.RewardType {
|
||
case RewardTypeCoupon:
|
||
var pl struct {
|
||
CouponID int64 `json:"coupon_id"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.CouponID > 0 {
|
||
couponIDs = append(couponIDs, pl.CouponID)
|
||
}
|
||
case RewardTypeItemCard:
|
||
var pl struct {
|
||
CardID int64 `json:"card_id"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.CardID > 0 {
|
||
itemCardIDs = append(itemCardIDs, pl.CardID)
|
||
}
|
||
case RewardTypeTitle:
|
||
var pl struct {
|
||
TitleID int64 `json:"title_id"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.TitleID > 0 {
|
||
titleIDs = append(titleIDs, pl.TitleID)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// Batch fetch names
|
||
couponMap := make(map[int64]string)
|
||
if len(couponIDs) > 0 {
|
||
var list []model.SystemCoupons
|
||
if err := db.Select("id, name").Find(&list, couponIDs).Error; err == nil {
|
||
for _, v := range list {
|
||
couponMap[v.ID] = v.Name
|
||
}
|
||
}
|
||
}
|
||
itemCardMap := make(map[int64]string)
|
||
if len(itemCardIDs) > 0 {
|
||
var list []model.SystemItemCards
|
||
if err := db.Select("id, name").Find(&list, itemCardIDs).Error; err == nil {
|
||
for _, v := range list {
|
||
itemCardMap[v.ID] = v.Name
|
||
}
|
||
}
|
||
}
|
||
titleMap := make(map[int64]string)
|
||
if len(titleIDs) > 0 {
|
||
var list []model.SystemTitles
|
||
if err := db.Select("id, name").Find(&list, titleIDs).Error; err == nil {
|
||
for _, v := range list {
|
||
titleMap[v.ID] = v.Name
|
||
}
|
||
}
|
||
}
|
||
|
||
for i, v := range rows {
|
||
var st, et int64
|
||
if v.StartTime != nil {
|
||
st = v.StartTime.Unix()
|
||
}
|
||
if v.EndTime != nil {
|
||
et = v.EndTime.Unix()
|
||
}
|
||
out[i] = TaskItem{ID: v.ID, Name: v.Name, Description: v.Description, Status: v.Status, StartTime: st, EndTime: et, Visibility: v.Visibility}
|
||
// 填充 Tiers
|
||
out[i].Tiers = make([]TaskTierItem, len(v.Tiers))
|
||
for j, t := range v.Tiers {
|
||
out[i].Tiers[j] = TaskTierItem{ID: t.ID, Metric: t.Metric, Operator: t.Operator, Threshold: t.Threshold, Window: t.Window, Repeatable: t.Repeatable, Priority: t.Priority, ActivityID: t.ActivityID, ExtraParams: t.ExtraParams}
|
||
}
|
||
// 填充 Rewards
|
||
out[i].Rewards = make([]TaskRewardItem, len(v.Rewards))
|
||
for j, r := range v.Rewards {
|
||
name := ""
|
||
switch r.RewardType {
|
||
case RewardTypePoints:
|
||
name = "积分"
|
||
case RewardTypeGameTicket:
|
||
name = "抽奖券"
|
||
case RewardTypeCoupon:
|
||
var pl struct {
|
||
CouponID int64 `json:"coupon_id"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
name = couponMap[pl.CouponID]
|
||
case RewardTypeItemCard:
|
||
var pl struct {
|
||
CardID int64 `json:"card_id"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
name = itemCardMap[pl.CardID]
|
||
case RewardTypeTitle:
|
||
var pl struct {
|
||
TitleID int64 `json:"title_id"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
name = titleMap[pl.TitleID]
|
||
}
|
||
out[i].Rewards[j] = TaskRewardItem{ID: r.ID, TierID: r.TierID, RewardType: r.RewardType, RewardPayload: r.RewardPayload, Quantity: r.Quantity, RewardName: name}
|
||
}
|
||
}
|
||
return out, total, nil
|
||
}
|
||
|
||
func (s *service) GetUserProgress(ctx context.Context, userID int64, taskID int64) (*UserProgress, error) {
|
||
db := s.repo.GetDbR()
|
||
var row tcmodel.UserTaskProgress
|
||
if err := db.Where("user_id=? AND task_id=?", userID, taskID).First(&row).Error; err != nil {
|
||
return &UserProgress{TaskID: taskID, UserID: userID, ClaimedTiers: []int64{}}, nil
|
||
}
|
||
var claimed []int64
|
||
if len(row.ClaimedTiers) > 0 {
|
||
_ = json.Unmarshal([]byte(row.ClaimedTiers), &claimed)
|
||
}
|
||
return &UserProgress{TaskID: taskID, UserID: userID, OrderCount: row.OrderCount, OrderAmount: row.OrderAmount, InviteCount: row.InviteCount, FirstOrder: row.FirstOrder == 1, ClaimedTiers: claimed}, nil
|
||
}
|
||
|
||
func (s *service) ClaimTier(ctx context.Context, userID int64, taskID int64, tierID int64) error {
|
||
// 事务中更新领取状态
|
||
err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
|
||
var p tcmodel.UserTaskProgress
|
||
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=?", userID, taskID).First(&p).Error; err != nil {
|
||
return errors.New("progress_not_found")
|
||
}
|
||
var claimed []int64
|
||
if len(p.ClaimedTiers) > 0 {
|
||
_ = json.Unmarshal([]byte(p.ClaimedTiers), &claimed)
|
||
}
|
||
for _, id := range claimed {
|
||
if id == tierID {
|
||
return nil // 已领取,跳过
|
||
}
|
||
}
|
||
claimed = append(claimed, tierID)
|
||
b, _ := json.Marshal(claimed)
|
||
p.ClaimedTiers = datatypes.JSON(b)
|
||
return tx.Model(&tcmodel.UserTaskProgress{}).Where("id=?", p.ID).Update("claimed_tiers", p.ClaimedTiers).Error
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 发放奖励
|
||
return s.grantTierRewards(ctx, taskID, tierID, userID, "manual_claim", 0, fmt.Sprintf("claim:%d:%d:%d", userID, taskID, tierID))
|
||
}
|
||
|
||
func (s *service) CreateTask(ctx context.Context, in CreateTaskInput) (int64, error) {
|
||
db := s.repo.GetDbW()
|
||
row := &tcmodel.Task{Name: in.Name, Description: in.Description, Status: in.Status, StartTime: in.StartTime, EndTime: in.EndTime, Visibility: in.Visibility}
|
||
if err := db.Create(row).Error; err != nil {
|
||
return 0, err
|
||
}
|
||
return row.ID, s.invalidateCache(ctx)
|
||
}
|
||
|
||
func (s *service) ModifyTask(ctx context.Context, id int64, in ModifyTaskInput) error {
|
||
db := s.repo.GetDbW()
|
||
if err := db.Model(&tcmodel.Task{}).Where("id=?", id).Updates(map[string]any{"name": in.Name, "description": in.Description, "status": in.Status, "start_time": in.StartTime, "end_time": in.EndTime, "visibility": in.Visibility}).Error; err != nil {
|
||
return err
|
||
}
|
||
return s.invalidateCache(ctx)
|
||
}
|
||
|
||
func (s *service) DeleteTask(ctx context.Context, id int64) error {
|
||
db := s.repo.GetDbW()
|
||
if err := db.Where("id=?", id).Delete(&tcmodel.Task{}).Error; err != nil {
|
||
return err
|
||
}
|
||
return s.invalidateCache(ctx)
|
||
}
|
||
|
||
func (s *service) ListTaskTiers(ctx context.Context, taskID int64) ([]TaskTierItem, error) {
|
||
db := s.repo.GetDbR()
|
||
var rows []tcmodel.TaskTier
|
||
if err := db.Where("task_id=?", taskID).Order("priority asc, id asc").Find(&rows).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
out := make([]TaskTierItem, len(rows))
|
||
for i, v := range rows {
|
||
out[i] = TaskTierItem{ID: v.ID, Metric: v.Metric, Operator: v.Operator, Threshold: v.Threshold, Window: v.Window, Repeatable: v.Repeatable, Priority: v.Priority, ActivityID: v.ActivityID, ExtraParams: v.ExtraParams}
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
func (s *service) UpsertTaskTiers(ctx context.Context, taskID int64, tiers []TaskTierInput) error {
|
||
db := s.repo.GetDbW()
|
||
if err := db.Where("task_id=?", taskID).Delete(&tcmodel.TaskTier{}).Error; err != nil {
|
||
return err
|
||
}
|
||
for _, t := range tiers {
|
||
row := &tcmodel.TaskTier{TaskID: taskID, Metric: t.Metric, Operator: t.Operator, Threshold: t.Threshold, Window: t.Window, Repeatable: t.Repeatable, Priority: t.Priority, ActivityID: t.ActivityID, ExtraParams: t.ExtraParams}
|
||
if err := db.Create(row).Error; err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return s.invalidateCache(ctx)
|
||
}
|
||
|
||
func (s *service) ListTaskRewards(ctx context.Context, taskID int64) ([]TaskRewardItem, error) {
|
||
db := s.repo.GetDbR()
|
||
var rows []tcmodel.TaskReward
|
||
if err := db.Where("task_id=?", taskID).Order("id asc").Find(&rows).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
out := make([]TaskRewardItem, len(rows))
|
||
for i, v := range rows {
|
||
out[i] = TaskRewardItem{ID: v.ID, TierID: v.TierID, RewardType: v.RewardType, RewardPayload: v.RewardPayload, Quantity: v.Quantity}
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
func (s *service) UpsertTaskRewards(ctx context.Context, taskID int64, rewards []TaskRewardInput) error {
|
||
db := s.repo.GetDbW()
|
||
if err := db.Where("task_id=?", taskID).Delete(&tcmodel.TaskReward{}).Error; err != nil {
|
||
return err
|
||
}
|
||
for _, r := range rewards {
|
||
row := &tcmodel.TaskReward{TaskID: taskID, TierID: r.TierID, RewardType: r.RewardType, RewardPayload: r.RewardPayload, Quantity: r.Quantity}
|
||
if err := db.Create(row).Error; err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return s.invalidateCache(ctx)
|
||
}
|
||
|
||
func (s *service) OnOrderPaid(ctx context.Context, userID int64, orderID int64) error {
|
||
if s.queue != nil {
|
||
return s.queue.PublishOrderPaid(ctx, userID, orderID)
|
||
}
|
||
return s.processOrderPaid(ctx, userID, orderID)
|
||
}
|
||
|
||
func (s *service) processOrderPaid(ctx context.Context, userID int64, orderID int64) error {
|
||
// 1. 获取订单金额
|
||
ord, err := s.readDB.Orders.WithContext(ctx).Where(s.readDB.Orders.ID.Eq(orderID)).First()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 1.0 状态校验与幂等性检查
|
||
// 仅处理已支付订单
|
||
if ord.Status != 2 {
|
||
s.logger.Warn("Order not paid, skip task center", zap.Int64("order_id", orderID), zap.Int32("status", ord.Status))
|
||
return nil
|
||
}
|
||
|
||
// 使用 Redis 进行 24 小时内的幂等性拦截,防止重复触发进度计算
|
||
if s.redis != nil {
|
||
lockKey := fmt.Sprintf("tc:proc:order:%d", orderID)
|
||
set, err := s.redis.SetNX(ctx, lockKey, "1", 24*time.Hour).Result()
|
||
if err != nil {
|
||
s.logger.Error("Redis idempotency check failed", zap.Error(err))
|
||
// 如果 Redis 异常,为了保险起见,我们可以选择继续处理(由数据库事务保证底层原子性,虽然非严格幂等)
|
||
// 或者返回错误。这里选择返回错误让调用方重试或记录日志。
|
||
return err
|
||
}
|
||
if !set {
|
||
s.logger.Info("Order already processed by task center", zap.Int64("order_id", orderID))
|
||
return nil
|
||
}
|
||
}
|
||
|
||
amount := ord.ActualAmount
|
||
rmk := remark.Parse(ord.Remark)
|
||
activityID := rmk.ActivityID
|
||
|
||
// 1.1 检查是否为全局新用户 (在此订单之前没有已支付订单)
|
||
prevOrders, _ := s.readDB.Orders.WithContext(ctx).Where(
|
||
s.readDB.Orders.UserID.Eq(userID),
|
||
s.readDB.Orders.Status.Eq(2),
|
||
s.readDB.Orders.ID.Neq(orderID),
|
||
).Count()
|
||
isNewUser := prevOrders == 0
|
||
s.logger.Info("Check new user status",
|
||
zap.Int64("user_id", userID),
|
||
zap.Int64("order_id", orderID),
|
||
zap.Int64("prev_orders", prevOrders),
|
||
zap.Bool("is_new_user", isNewUser),
|
||
)
|
||
|
||
// 2. 更新邀请人累计金额并检查是否触发有效邀请
|
||
var inviterID int64
|
||
var oldAmount int64
|
||
var newAmount int64
|
||
|
||
// 使用事务更新 UserInvites
|
||
err = s.writeDB.Transaction(func(tx *dao.Query) error {
|
||
uInv, err := tx.UserInvites.WithContext(ctx).Clauses(clause.Locking{Strength: "UPDATE"}).Where(tx.UserInvites.InviteeID.Eq(userID)).First()
|
||
if err != nil {
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
return nil
|
||
}
|
||
return err
|
||
}
|
||
inviterID = uInv.InviterID
|
||
oldAmount = uInv.AccumulatedAmount
|
||
newAmount = oldAmount + amount
|
||
|
||
updates := map[string]any{
|
||
"accumulated_amount": newAmount,
|
||
}
|
||
_, err = tx.UserInvites.WithContext(ctx).Where(tx.UserInvites.ID.Eq(uInv.ID)).Updates(updates)
|
||
return err
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 3. 处理普通任务
|
||
tasks, err := s.getActiveTasks(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
for _, t := range tasks {
|
||
// Filter tasks: Only process if it matches ActivityID (0 matches all)
|
||
taskActivityID := int64(0)
|
||
hasOrderMetric := false
|
||
for _, tier := range t.Tiers {
|
||
if tier.ActivityID > 0 {
|
||
taskActivityID = tier.ActivityID
|
||
}
|
||
if tier.Metric == MetricFirstOrder || tier.Metric == MetricOrderCount || tier.Metric == MetricOrderAmount {
|
||
hasOrderMetric = true
|
||
}
|
||
}
|
||
|
||
if !hasOrderMetric {
|
||
continue
|
||
}
|
||
// 如果任务指定了活动ID,且与当前订单活动不符,则跳过
|
||
if taskActivityID > 0 && taskActivityID != activityID {
|
||
continue
|
||
}
|
||
|
||
var p tcmodel.UserTaskProgress
|
||
err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
|
||
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=? AND activity_id=?", userID, t.ID, taskActivityID).First(&p).Error; err != nil {
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
p = tcmodel.UserTaskProgress{UserID: userID, TaskID: t.ID, ActivityID: taskActivityID, OrderCount: 1, OrderAmount: amount, FirstOrder: 1}
|
||
return tx.Create(&p).Error
|
||
}
|
||
return err
|
||
}
|
||
if err := s.checkAndResetDailyProgress(ctx, tx, &t, &p); err != nil {
|
||
return err
|
||
}
|
||
p.OrderCount++
|
||
p.OrderAmount += amount
|
||
if p.OrderCount == 1 {
|
||
p.FirstOrder = 1
|
||
}
|
||
return tx.Save(&p).Error
|
||
})
|
||
if err != nil {
|
||
s.logger.Error("failed to update progress", zap.Error(err))
|
||
continue
|
||
}
|
||
if err := s.matchAndGrantExtended(ctx, &t, &p, "order", orderID, fmt.Sprintf("ord:%d", orderID), isNewUser); err != nil {
|
||
s.logger.Error("failed to grant reward", zap.Error(err))
|
||
}
|
||
}
|
||
|
||
// 4. 处理邀请人任务 (有效邀请:消费达到金额门槛时触发)
|
||
if inviterID > 0 {
|
||
for _, t := range tasks {
|
||
tiers := t.Tiers
|
||
// 检查该任务是否有 invite_count 类型且设置了消费门槛的 Tier
|
||
triggered := false
|
||
var matchedThreshold int64
|
||
for _, tier := range tiers {
|
||
if tier.Metric == MetricInviteCount {
|
||
var extra struct {
|
||
AmountThreshold int64 `json:"amount_threshold"`
|
||
}
|
||
if len(tier.ExtraParams) > 0 {
|
||
_ = json.Unmarshal([]byte(tier.ExtraParams), &extra)
|
||
}
|
||
// 只有设置了消费门槛的邀请任务,才在消费时触发
|
||
if extra.AmountThreshold > 0 {
|
||
// 如果之前的累计金额未达到阈值,而现在的累计金额达到了阈值,则触发
|
||
if oldAmount < extra.AmountThreshold && newAmount >= extra.AmountThreshold {
|
||
triggered = true
|
||
matchedThreshold = extra.AmountThreshold
|
||
break
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
if triggered {
|
||
taskActivityID := int64(0)
|
||
for _, tier := range t.Tiers {
|
||
if tier.ActivityID > 0 {
|
||
taskActivityID = tier.ActivityID
|
||
break
|
||
}
|
||
}
|
||
// 如果任务指定了活动ID,且与当前订单活动不符,则跳过
|
||
if taskActivityID > 0 && taskActivityID != activityID {
|
||
continue
|
||
}
|
||
|
||
var pInv tcmodel.UserTaskProgress
|
||
err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
|
||
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=? AND activity_id=?", inviterID, t.ID, taskActivityID).First(&pInv).Error; err != nil {
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
pInv = tcmodel.UserTaskProgress{UserID: inviterID, TaskID: t.ID, ActivityID: taskActivityID, InviteCount: 1}
|
||
return tx.Create(&pInv).Error
|
||
}
|
||
return err
|
||
}
|
||
if err := s.checkAndResetDailyProgress(ctx, tx, &t, &pInv); err != nil {
|
||
return err
|
||
}
|
||
pInv.InviteCount++
|
||
return tx.Save(&pInv).Error
|
||
})
|
||
if err == nil {
|
||
// 尝试发放奖励
|
||
_ = s.matchAndGrantExtended(ctx, &t, &pInv, SourceTypeInvite, userID, fmt.Sprintf("inv_paid:%d:%d:%d", userID, t.ID, matchedThreshold), false)
|
||
} else {
|
||
s.logger.Error("failed to update inviter progress", zap.Error(err))
|
||
}
|
||
}
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *service) OnInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error {
|
||
if s.queue != nil {
|
||
return s.queue.PublishInviteSuccess(ctx, inviterID, inviteeID)
|
||
}
|
||
return s.processInviteSuccess(ctx, inviterID, inviteeID)
|
||
}
|
||
|
||
func (s *service) processInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error {
|
||
tasks, err := s.getActiveTasks(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
for _, t := range tasks {
|
||
hasInviteMetric := false
|
||
for _, tier := range t.Tiers {
|
||
if tier.Metric == MetricInviteCount {
|
||
hasInviteMetric = true
|
||
break
|
||
}
|
||
}
|
||
if !hasInviteMetric {
|
||
continue
|
||
}
|
||
|
||
var p tcmodel.UserTaskProgress
|
||
err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
|
||
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=?", inviterID, t.ID).First(&p).Error; err != nil {
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
p = tcmodel.UserTaskProgress{UserID: inviterID, TaskID: t.ID, InviteCount: 1}
|
||
return tx.Create(&p).Error
|
||
}
|
||
return err
|
||
}
|
||
if err := s.checkAndResetDailyProgress(ctx, tx, &t, &p); err != nil {
|
||
return err
|
||
}
|
||
p.InviteCount++
|
||
return tx.Save(&p).Error
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if err := s.matchAndGrantExtended(ctx, &t, &p, SourceTypeInvite, inviteeID, fmt.Sprintf("inv:%d", inviteeID), false); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *service) checkAndResetDailyProgress(ctx context.Context, tx *gorm.DB, t *tcmodel.Task, p *tcmodel.UserTaskProgress) error {
|
||
isDaily := false
|
||
if len(t.Tiers) > 0 {
|
||
for _, tier := range t.Tiers {
|
||
if tier.Window == WindowDaily {
|
||
isDaily = true
|
||
break
|
||
}
|
||
}
|
||
} else {
|
||
var count int64
|
||
if err := tx.Model(&tcmodel.TaskTier{}).Where("task_id = ? AND window = ?", t.ID, WindowDaily).Count(&count).Error; err != nil {
|
||
return err
|
||
}
|
||
isDaily = count > 0
|
||
}
|
||
|
||
if !isDaily {
|
||
return nil
|
||
}
|
||
now := time.Now()
|
||
if p.UpdatedAt.IsZero() {
|
||
return nil
|
||
}
|
||
y1, m1, d1 := p.UpdatedAt.Date()
|
||
y2, m2, d2 := now.Date()
|
||
if y1 == y2 && m1 == m2 && d1 == d2 {
|
||
return nil
|
||
}
|
||
s.logger.Info("Daily progress reset",
|
||
zap.Int64("user_id", p.UserID),
|
||
zap.Int64("task_id", t.ID),
|
||
zap.Time("last_update", p.UpdatedAt),
|
||
zap.Time("now", now),
|
||
)
|
||
p.OrderCount = 0
|
||
p.OrderAmount = 0
|
||
p.InviteCount = 0
|
||
p.FirstOrder = 0
|
||
p.ClaimedTiers = datatypes.JSON("[]")
|
||
return nil
|
||
}
|
||
|
||
func (s *service) matchAndGrantExtended(ctx context.Context, t *tcmodel.Task, p *tcmodel.UserTaskProgress, sourceType string, sourceID int64, eventID string, isGlobalNewUser bool) error {
|
||
tiers := t.Tiers
|
||
if len(tiers) == 0 {
|
||
if err := s.repo.GetDbR().Where("task_id=?", t.ID).Order("priority asc").Find(&tiers).Error; err != nil {
|
||
return err
|
||
}
|
||
}
|
||
var claimed []int64
|
||
if len(p.ClaimedTiers) > 0 {
|
||
_ = json.Unmarshal([]byte(p.ClaimedTiers), &claimed)
|
||
}
|
||
claimedSet := map[int64]struct{}{}
|
||
for _, id := range claimed {
|
||
claimedSet[id] = struct{}{}
|
||
}
|
||
for _, tier := range tiers {
|
||
if _, ok := claimedSet[tier.ID]; ok {
|
||
continue
|
||
}
|
||
|
||
// 活动维度过滤:如果 Tier 指定了 ActivityID,必须与进度记录中的 ActivityID 一致
|
||
if tier.ActivityID > 0 && tier.ActivityID != p.ActivityID {
|
||
continue
|
||
}
|
||
|
||
var extra struct {
|
||
NewUserOnly bool `json:"new_user_only"`
|
||
}
|
||
if len(tier.ExtraParams) > 0 {
|
||
_ = json.Unmarshal([]byte(tier.ExtraParams), &extra)
|
||
}
|
||
|
||
s.logger.Debug("Evaluating tier",
|
||
zap.Int64("tier_id", tier.ID),
|
||
zap.String("metric", tier.Metric),
|
||
zap.Int64("order_count", p.OrderCount),
|
||
zap.Int64("threshold", tier.Threshold),
|
||
zap.Int64("activity_id", tier.ActivityID),
|
||
zap.Int64("my_activity_id", p.ActivityID),
|
||
zap.Int32("first_order", p.FirstOrder),
|
||
zap.Bool("is_global_new", isGlobalNewUser),
|
||
)
|
||
|
||
hit := false
|
||
switch tier.Metric {
|
||
case MetricFirstOrder:
|
||
hit = p.FirstOrder == 1
|
||
// 如果要求新用户,则必须满足全局新用户条件
|
||
if extra.NewUserOnly && !isGlobalNewUser {
|
||
hit = false
|
||
}
|
||
case MetricOrderCount:
|
||
if tier.Operator == OperatorGTE {
|
||
hit = p.OrderCount >= tier.Threshold
|
||
} else {
|
||
hit = p.OrderCount == tier.Threshold
|
||
}
|
||
if extra.NewUserOnly && !isGlobalNewUser {
|
||
hit = false
|
||
}
|
||
case MetricOrderAmount:
|
||
if tier.Operator == OperatorGTE {
|
||
hit = p.OrderAmount >= tier.Threshold
|
||
} else {
|
||
hit = p.OrderAmount == tier.Threshold
|
||
}
|
||
if extra.NewUserOnly && !isGlobalNewUser {
|
||
hit = false
|
||
}
|
||
case MetricInviteCount:
|
||
if tier.Operator == OperatorGTE {
|
||
hit = p.InviteCount >= tier.Threshold
|
||
} else {
|
||
hit = p.InviteCount == tier.Threshold
|
||
}
|
||
}
|
||
if !hit {
|
||
s.logger.Debug("Tier not hit", zap.Int64("tier_id", tier.ID))
|
||
continue
|
||
}
|
||
s.logger.Info("Tier Hit! Granting...", zap.Int64("tier_id", tier.ID), zap.Int64("user_id", p.UserID))
|
||
if err := s.grantTierRewards(ctx, t.ID, tier.ID, p.UserID, sourceType, sourceID, eventID); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 安全更新状态
|
||
err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
|
||
var latestP tcmodel.UserTaskProgress
|
||
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id=?", p.ID).First(&latestP).Error; err != nil {
|
||
return err
|
||
}
|
||
var latestClaimed []int64
|
||
if len(latestP.ClaimedTiers) > 0 {
|
||
_ = json.Unmarshal([]byte(latestP.ClaimedTiers), &latestClaimed)
|
||
}
|
||
for _, id := range latestClaimed {
|
||
if id == tier.ID {
|
||
return nil
|
||
}
|
||
}
|
||
latestClaimed = append(latestClaimed, tier.ID)
|
||
b, _ := json.Marshal(latestClaimed)
|
||
latestP.ClaimedTiers = datatypes.JSON(b)
|
||
return tx.Model(&latestP).Update("claimed_tiers", latestP.ClaimedTiers).Error
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *service) grantTierRewards(ctx context.Context, taskID int64, tierID int64, userID int64, sourceType string, sourceID int64, eventID string) error {
|
||
var rewards []tcmodel.TaskReward
|
||
if err := s.repo.GetDbR().Where("task_id=? AND tier_id=?", taskID, tierID).Find(&rewards).Error; err != nil {
|
||
return err
|
||
}
|
||
idk := fmt.Sprintf("%d:%d:%d:%s:%d", userID, taskID, tierID, sourceType, sourceID)
|
||
var exists tcmodel.TaskEventLog
|
||
if err := s.repo.GetDbR().Where("idempotency_key=?", idk).First(&exists).Error; err == nil && exists.ID > 0 {
|
||
return nil
|
||
}
|
||
tx := s.repo.GetDbW().Begin()
|
||
s.logger.Info("Granting rewards for task",
|
||
zap.Int64("user_id", userID),
|
||
zap.Int64("task_id", taskID),
|
||
zap.Int64("tier_id", tierID),
|
||
zap.String("event_id", eventID),
|
||
)
|
||
for _, r := range rewards {
|
||
var err error
|
||
switch r.RewardType {
|
||
case "points":
|
||
var pl struct {
|
||
Points int64 `json:"points"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
points := pl.Points
|
||
// 回退:如果 payload 中没有 points 字段,使用 quantity 字段
|
||
if points == 0 && r.Quantity > 0 {
|
||
points = r.Quantity
|
||
}
|
||
if points != 0 {
|
||
s.logger.Info("Granting points reward", zap.Int64("user_id", userID), zap.Int64("points", points))
|
||
err = s.userSvc.AddPoints(ctx, userID, points, "task_reward", "task_center", nil, nil)
|
||
}
|
||
case "coupon":
|
||
var pl struct {
|
||
CouponID int64 `json:"coupon_id"`
|
||
Quantity int `json:"quantity"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.CouponID > 0 {
|
||
qty := 1
|
||
if r.Quantity > 0 {
|
||
qty = int(r.Quantity)
|
||
} else if pl.Quantity > 0 {
|
||
qty = pl.Quantity
|
||
}
|
||
s.logger.Info("Granting coupon reward", zap.Int64("user_id", userID), zap.Int64("coupon_id", pl.CouponID), zap.Int("quantity", qty))
|
||
for i := 0; i < qty; i++ {
|
||
if err = s.userSvc.AddCoupon(ctx, userID, pl.CouponID); err != nil {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
case "item_card":
|
||
var pl struct {
|
||
CardID int64 `json:"card_id"`
|
||
Quantity int `json:"quantity"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.CardID > 0 {
|
||
qty := 1
|
||
if r.Quantity > 0 {
|
||
qty = int(r.Quantity)
|
||
} else if pl.Quantity > 0 {
|
||
qty = pl.Quantity
|
||
}
|
||
s.logger.Info("Granting item card reward", zap.Int64("user_id", userID), zap.Int64("card_id", pl.CardID), zap.Int("quantity", qty))
|
||
err = s.userSvc.AddItemCard(ctx, userID, pl.CardID, qty)
|
||
}
|
||
case "title":
|
||
var pl struct {
|
||
TitleID int64 `json:"title_id"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.TitleID > 0 {
|
||
s.logger.Info("Granting title reward", zap.Int64("user_id", userID), zap.Int64("title_id", pl.TitleID))
|
||
err = s.titleSvc.AssignUserTitle(ctx, userID, pl.TitleID, nil, "task_center")
|
||
}
|
||
case "game_ticket":
|
||
var pl struct {
|
||
GameCode string `json:"game_code"`
|
||
Amount int `json:"amount"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.GameCode != "" && pl.Amount > 0 {
|
||
s.logger.Info("Granting game ticket reward", zap.Int64("user_id", userID), zap.String("game_code", pl.GameCode), zap.Int("amount", pl.Amount))
|
||
gameSvc := gamesvc.NewTicketService(s.logger, s.repo)
|
||
err = gameSvc.GrantTicket(ctx, userID, pl.GameCode, pl.Amount, "task_center", taskID, "任务奖励")
|
||
}
|
||
default:
|
||
s.logger.Warn("Unknown reward type", zap.String("type", r.RewardType))
|
||
}
|
||
if err != nil {
|
||
s.logger.Error("Failed to grant reward",
|
||
zap.String("type", r.RewardType),
|
||
zap.Int64("user_id", userID),
|
||
zap.Error(err))
|
||
tx.Rollback()
|
||
return err
|
||
} else {
|
||
s.logger.Info("Successfully granted reward", zap.String("type", r.RewardType), zap.Int64("user_id", userID))
|
||
}
|
||
}
|
||
if err := tx.Create(&tcmodel.TaskEventLog{EventID: eventID, SourceType: sourceType, SourceID: sourceID, UserID: userID, TaskID: taskID, TierID: tierID, IdempotencyKey: idk, Status: "granted"}).Error; err != nil {
|
||
tx.Rollback()
|
||
return err
|
||
}
|
||
return tx.Commit().Error
|
||
}
|