2026-02-27 00:08:02 +08:00

1530 lines
47 KiB
Go
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package taskcenter
import (
"bindbox-game/internal/pkg/async"
"bindbox-game/internal/pkg/logger"
"bindbox-game/internal/pkg/util/remark"
"bindbox-game/internal/repository/mysql"
"bindbox-game/internal/repository/mysql/dao"
"bindbox-game/internal/repository/mysql/model"
tcmodel "bindbox-game/internal/repository/mysql/task_center"
"context"
"encoding/json"
"errors"
"fmt"
"time"
gamesvc "bindbox-game/internal/service/game"
titlesvc "bindbox-game/internal/service/title"
usersvc "bindbox-game/internal/service/user"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/datatypes"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type Service interface {
ListTasks(ctx context.Context, in ListTasksInput) (items []TaskItem, total int64, err error)
CreateTask(ctx context.Context, in CreateTaskInput) (int64, error)
ModifyTask(ctx context.Context, id int64, in ModifyTaskInput) error
DeleteTask(ctx context.Context, id int64) error
ListTaskTiers(ctx context.Context, taskID int64) ([]TaskTierItem, error)
UpsertTaskTiers(ctx context.Context, taskID int64, tiers []TaskTierInput) error
ListTaskRewards(ctx context.Context, taskID int64) ([]TaskRewardItem, error)
UpsertTaskRewards(ctx context.Context, taskID int64, rewards []TaskRewardInput, deleteIDs []int64) error
GetUserProgress(ctx context.Context, userID int64, taskID int64) (*UserProgress, error)
ClaimTier(ctx context.Context, userID int64, taskID int64, tierID int64) error
OnOrderPaid(ctx context.Context, userID int64, orderID int64) error
OnInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error
StartWorker(ctx context.Context)
}
type service struct {
logger logger.CustomLogger
readDB *dao.Query
writeDB *dao.Query
repo mysql.Repo
redis *redis.Client
queue async.TaskQueue
userSvc usersvc.Service
titleSvc titlesvc.Service
}
func New(l logger.CustomLogger, db mysql.Repo, rdb *redis.Client, userSvc usersvc.Service, titleSvc titlesvc.Service) Service {
var q async.TaskQueue
if rdb != nil {
q = async.NewRedisTaskQueue(rdb)
}
return &service{
logger: l,
readDB: dao.Use(db.GetDbR()),
writeDB: dao.Use(db.GetDbW()),
repo: db,
redis: rdb,
queue: q,
userSvc: userSvc,
titleSvc: titleSvc,
}
}
type ListTasksInput struct {
Page int
PageSize int
OnlyActive bool // 是否只返回有效期内的任务
}
type TaskItem struct {
ID int64
Name string
Description string
Status int32
StartTime int64
EndTime int64
Visibility int32
Quota int32
ClaimedCount int32
Tiers []TaskTierItem
Rewards []TaskRewardItem
}
// TierProgress 记录单个档位在其配置窗口内的独立统计进度
type TierProgress struct {
TierID int64 `json:"tier_id"`
OrderCount int64 `json:"order_count"`
OrderAmount int64 `json:"order_amount"`
InviteCount int64 `json:"invite_count"`
FirstOrder bool `json:"first_order"`
}
type UserProgress struct {
TaskID int64 `json:"task_id"`
UserID int64 `json:"user_id"`
OrderCount int64 `json:"order_count"`
OrderAmount int64 `json:"order_amount"`
InviteCount int64 `json:"invite_count"`
FirstOrder bool `json:"first_order"`
ClaimedTiers []int64 `json:"claimed_tiers"`
SubProgress []ActivityProgress `json:"sub_progress"` // 各活动独立进度(向后兼容)
TierProgressMap map[int64]TierProgress `json:"tier_progress_map"` // 每个 Tier 的窗口化独立进度
}
type ActivityProgress struct {
ActivityID int64 `json:"activity_id"`
OrderCount int64 `json:"order_count"`
OrderAmount int64 `json:"order_amount"`
}
type CreateTaskInput struct {
Name string
Description string
Status int32
StartTime *time.Time
EndTime *time.Time
Visibility int32
Quota int32
}
type ModifyTaskInput struct {
Name string
Description string
Status int32
StartTime *time.Time
EndTime *time.Time
Visibility int32
Quota int32
}
type TaskTierInput struct {
Metric string
Operator string
Threshold int64
Window string
Repeatable int32
Priority int32
ActivityID int64
ExtraParams datatypes.JSON
}
type TaskTierItem struct {
ID int64 `json:"id"`
Metric string `json:"metric"`
Operator string `json:"operator"`
Threshold int64 `json:"threshold"`
Window string `json:"window"`
Repeatable int32 `json:"repeatable"`
Priority int32 `json:"priority"`
ActivityID int64 `json:"activity_id"`
ExtraParams datatypes.JSON `json:"extra_params"`
Quota int32 `json:"quota"` // 总限额0表示不限
ClaimedCount int32 `json:"claimed_count"` // 已领取数
Remaining int32 `json:"remaining"` // 剩余可领,-1表示不限
}
type TaskRewardInput struct {
ID int64 `json:"id"`
TierID int64 `json:"tier_id"`
RewardType string `json:"reward_type"`
RewardPayload datatypes.JSON `json:"reward_payload"`
Quantity int64 `json:"quantity"`
}
type TaskRewardItem struct {
ID int64 `json:"id"`
TierID int64 `json:"tier_id"`
RewardType string `json:"reward_type"`
RewardPayload datatypes.JSON `json:"reward_payload"`
Quantity int64 `json:"quantity"`
RewardName string `json:"reward_name"`
}
type orderMetricRow struct {
OrderID int64
ActivityID int64
DrawCount int64
TicketPrice int64
TotalAmount int64
}
var allowedWindows = map[string]struct{}{
WindowDaily: {},
WindowWeekly: {},
WindowMonthly: {},
WindowLifetime: {},
WindowActivityPeriod: {},
WindowSinceRegistration: {},
}
func normalizeWindow(value string) string {
if value == "" {
return WindowLifetime
}
if _, ok := allowedWindows[value]; !ok {
return WindowLifetime
}
return value
}
func normalizeWindowStrict(value string) (string, error) {
if value == "" {
return WindowLifetime, nil
}
if _, ok := allowedWindows[value]; !ok {
return "", fmt.Errorf("invalid window value: %s", value)
}
return value, nil
}
func tierFingerprint(metric string, threshold int64, activityID int64, window string) string {
return fmt.Sprintf("%s-%d-%d-%s", metric, threshold, activityID, window)
}
func (s *service) fetchOrderMetricRows(ctx context.Context, userID int64, activityIDs []int64, start, end *time.Time) ([]orderMetricRow, error) {
query := s.repo.GetDbR().WithContext(ctx).Table(model.TableNameOrders).
Select("orders.id AS order_id, activity_issues.activity_id AS activity_id, COUNT(activity_draw_logs.id) AS draw_count, COALESCE(activities.price_draw, 0) AS ticket_price, orders.total_amount").
Joins("JOIN activity_draw_logs ON activity_draw_logs.order_id = orders.id").
Joins("JOIN activity_issues ON activity_issues.id = activity_draw_logs.issue_id").
Joins("LEFT JOIN activities ON activities.id = activity_issues.activity_id").
Where("orders.user_id = ? AND orders.status = 2 AND orders.source_type != 1", userID).
Group("orders.id, activity_issues.activity_id, activities.price_draw, orders.total_amount")
if len(activityIDs) > 0 {
query = query.Where("activity_issues.activity_id IN ?", activityIDs)
}
if start != nil {
query = query.Where("orders.created_at >= ?", *start)
}
if end != nil {
query = query.Where("orders.created_at <= ?", *end)
}
var rows []orderMetricRow
if err := query.Scan(&rows).Error; err != nil {
return nil, err
}
return rows, nil
}
func (s *service) calculateEffectiveAmount(row orderMetricRow) int64 {
if row.TicketPrice > 0 && row.DrawCount > 0 {
return row.TicketPrice * row.DrawCount
}
if row.TotalAmount > 0 {
if s.logger != nil && row.TicketPrice == 0 {
s.logger.Warn("task center: missing ticket price snapshot, fallback to order amount",
zap.Int64("order_id", row.OrderID),
zap.Int64("activity_id", row.ActivityID))
}
return row.TotalAmount
}
return 0
}
func (s *service) aggregateOrderMetrics(rows []orderMetricRow, perActivity bool) (count int64, amount int64) {
if perActivity {
for _, row := range rows {
amount += s.calculateEffectiveAmount(row)
}
return int64(len(rows)), amount
}
seen := make(map[int64]struct{})
for _, row := range rows {
amount += s.calculateEffectiveAmount(row)
if _, ok := seen[row.OrderID]; !ok {
seen[row.OrderID] = struct{}{}
count++
}
}
return count, amount
}
func (s *service) countInvites(ctx context.Context, inviterID int64, activityID int64, start, end *time.Time) (int64, error) {
db := s.repo.GetDbR().WithContext(ctx)
var count int64
if activityID > 0 {
query := `
SELECT COUNT(DISTINCT ui.invitee_id)
FROM user_invites ui
INNER JOIN orders o ON o.user_id = ui.invitee_id AND o.status = 2 AND o.source_type != 1
INNER JOIN activity_draw_logs dl ON dl.order_id = o.id
INNER JOIN activity_issues ai ON ai.id = dl.issue_id
WHERE ui.inviter_id = ? AND ai.activity_id = ?
`
args := []interface{}{inviterID, activityID}
if start != nil {
query += " AND o.created_at >= ?"
args = append(args, *start)
}
if end != nil {
query += " AND o.created_at <= ?"
args = append(args, *end)
}
if err := db.Raw(query, args...).Scan(&count).Error; err != nil {
return 0, err
}
return count, nil
}
query := db.Model(&model.UserInvites{}).Where("inviter_id = ?", inviterID)
if start != nil {
query = query.Where("created_at >= ?", *start)
}
if end != nil {
query = query.Where("created_at <= ?", *end)
}
if err := query.Count(&count).Error; err != nil {
return 0, err
}
return count, nil
}
func (s *service) countInvitesForActivities(ctx context.Context, inviterID int64, activityIDs []int64) (int64, error) {
db := s.repo.GetDbR().WithContext(ctx)
var count int64
if len(activityIDs) == 0 {
if err := db.Model(&model.UserInvites{}).Where("inviter_id = ?", inviterID).Count(&count).Error; err != nil {
return 0, err
}
return count, nil
}
if err := db.Raw(`
SELECT COUNT(DISTINCT ui.invitee_id)
FROM user_invites ui
INNER JOIN orders o ON o.user_id = ui.invitee_id AND o.status = 2 AND o.source_type != 1
INNER JOIN activity_draw_logs dl ON dl.order_id = o.id
INNER JOIN activity_issues ai ON ai.id = dl.issue_id
WHERE ui.inviter_id = ? AND ai.activity_id IN (?)
`, inviterID, activityIDs).Scan(&count).Error; err != nil {
return 0, err
}
return count, nil
}
func (s *service) ListTasks(ctx context.Context, in ListTasksInput) (items []TaskItem, total int64, err error) {
db := s.repo.GetDbR()
var rows []tcmodel.Task
q := db.Model(&tcmodel.Task{})
// 过滤条件
if in.OnlyActive {
now := time.Now()
q = q.Where("status = ? AND visibility = ?", 1, 1)
q = q.Where("(start_time IS NULL OR start_time <= ?) AND (end_time IS NULL OR end_time >= ?)", now, now)
} else {
// 管理后台默认也至少基于启用状态看?或者去掉限制以便管理全量
// 维持原有逻辑:
q = q.Where("status = ? AND visibility = ?", 1, 1)
}
if in.PageSize <= 0 {
in.PageSize = 20
}
if in.Page <= 0 {
in.Page = 1
}
if err = q.Count(&total).Error; err != nil {
return nil, 0, err
}
if err = q.Preload("Tiers", func(db *gorm.DB) *gorm.DB {
return db.Order("priority asc, id asc")
}).Preload("Rewards", func(db *gorm.DB) *gorm.DB {
return db.Order("id asc")
}).Offset((in.Page - 1) * in.PageSize).Limit(in.PageSize).Order("id desc").Find(&rows).Error; err != nil {
return nil, 0, err
}
out := make([]TaskItem, len(rows))
// Pre-calculation: collect IDs for batch lookup
var couponIDs []int64
var itemCardIDs []int64
var titleIDs []int64
for _, v := range rows {
for _, r := range v.Rewards {
switch r.RewardType {
case RewardTypeCoupon:
var pl struct {
CouponID int64 `json:"coupon_id"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.CouponID > 0 {
couponIDs = append(couponIDs, pl.CouponID)
}
case RewardTypeItemCard:
var pl struct {
CardID int64 `json:"card_id"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.CardID > 0 {
itemCardIDs = append(itemCardIDs, pl.CardID)
}
case RewardTypeTitle:
var pl struct {
TitleID int64 `json:"title_id"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.TitleID > 0 {
titleIDs = append(titleIDs, pl.TitleID)
}
}
}
}
// Batch fetch names
couponMap := make(map[int64]string)
if len(couponIDs) > 0 {
var list []model.SystemCoupons
if err := db.Select("id, name").Find(&list, couponIDs).Error; err == nil {
for _, v := range list {
couponMap[v.ID] = v.Name
}
}
}
itemCardMap := make(map[int64]string)
if len(itemCardIDs) > 0 {
var list []model.SystemItemCards
if err := db.Select("id, name").Find(&list, itemCardIDs).Error; err == nil {
for _, v := range list {
itemCardMap[v.ID] = v.Name
}
}
}
titleMap := make(map[int64]string)
if len(titleIDs) > 0 {
var list []model.SystemTitles
if err := db.Select("id, name").Find(&list, titleIDs).Error; err == nil {
for _, v := range list {
titleMap[v.ID] = v.Name
}
}
}
for i, v := range rows {
var st, et int64
if v.StartTime != nil {
st = v.StartTime.Unix()
}
if v.EndTime != nil {
et = v.EndTime.Unix()
}
out[i] = TaskItem{ID: v.ID, Name: v.Name, Description: v.Description, Status: v.Status, StartTime: st, EndTime: et, Visibility: v.Visibility, Quota: v.Quota, ClaimedCount: v.ClaimedCount}
// 填充 Tiers
out[i].Tiers = make([]TaskTierItem, len(v.Tiers))
for j, t := range v.Tiers {
remaining := int32(-1) // -1 表示不限
if t.Quota > 0 {
remaining = t.Quota - t.ClaimedCount
if remaining < 0 {
remaining = 0
}
}
out[i].Tiers[j] = TaskTierItem{ID: t.ID, Metric: t.Metric, Operator: t.Operator, Threshold: t.Threshold, Window: normalizeWindow(t.Window), Repeatable: t.Repeatable, Priority: t.Priority, ActivityID: t.ActivityID, ExtraParams: t.ExtraParams, Quota: t.Quota, ClaimedCount: t.ClaimedCount, Remaining: remaining}
}
// 填充 Rewards
out[i].Rewards = make([]TaskRewardItem, len(v.Rewards))
for j, r := range v.Rewards {
name := ""
switch r.RewardType {
case RewardTypePoints:
name = "积分"
case RewardTypeGameTicket:
name = "抽奖券"
case RewardTypeCoupon:
var pl struct {
CouponID int64 `json:"coupon_id"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
name = couponMap[pl.CouponID]
case RewardTypeItemCard:
var pl struct {
CardID int64 `json:"card_id"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
name = itemCardMap[pl.CardID]
case RewardTypeTitle:
var pl struct {
TitleID int64 `json:"title_id"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
name = titleMap[pl.TitleID]
}
out[i].Rewards[j] = TaskRewardItem{ID: r.ID, TierID: r.TierID, RewardType: r.RewardType, RewardPayload: r.RewardPayload, Quantity: r.Quantity, RewardName: name}
}
}
return out, total, nil
}
// computeTimeWindow 根据 window 配置计算时间范围
// 返回 (windowStart, windowEnd)nil 表示该端不限制
func computeTimeWindow(window string, taskStart, taskEnd *time.Time) (start *time.Time, end *time.Time) {
now := time.Now()
switch window {
case WindowDaily:
s := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
return &s, &now
case WindowWeekly:
weekday := int(now.Weekday())
if weekday == 0 {
weekday = 7
}
s := now.AddDate(0, 0, -(weekday - 1))
s = time.Date(s.Year(), s.Month(), s.Day(), 0, 0, 0, 0, s.Location())
return &s, &now
case WindowMonthly:
s := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
return &s, &now
case WindowActivityPeriod:
// 使用任务级别的 StartTime / EndTimenil 端不加限制
return taskStart, taskEnd
default:
// lifetime / since_registration / 未知值 → 不限制
return nil, nil
}
}
func (s *service) GetUserProgress(ctx context.Context, userID int64, taskID int64) (*UserProgress, error) {
db := s.repo.GetDbR()
var task tcmodel.Task
if err := db.First(&task, taskID).Error; err != nil {
return nil, err
}
var tiers []tcmodel.TaskTier
if err := db.Where("task_id = ?", taskID).Find(&tiers).Error; err != nil {
return nil, err
}
targetActivityIDs := make([]int64, 0)
seenActivity := make(map[int64]struct{})
for _, t := range tiers {
if t.ActivityID > 0 {
if _, ok := seenActivity[t.ActivityID]; !ok {
seenActivity[t.ActivityID] = struct{}{}
targetActivityIDs = append(targetActivityIDs, t.ActivityID)
}
}
}
type windowGroupKey struct {
Window string
ActivityID int64
}
groupMap := make(map[windowGroupKey][]tcmodel.TaskTier)
for _, t := range tiers {
window := normalizeWindow(t.Window)
t.Window = window
key := windowGroupKey{Window: window, ActivityID: t.ActivityID}
groupMap[key] = append(groupMap[key], t)
}
tierProgressMap := make(map[int64]TierProgress)
for wk, groupTiers := range groupMap {
wStart, wEnd := computeTimeWindow(wk.Window, task.StartTime, task.EndTime)
var activityIDs []int64
perActivity := false
if wk.ActivityID > 0 {
activityIDs = []int64{wk.ActivityID}
perActivity = true
}
rows, err := s.fetchOrderMetricRows(ctx, userID, activityIDs, wStart, wEnd)
if err != nil {
return nil, err
}
orderCount, orderAmount := s.aggregateOrderMetrics(rows, perActivity)
inviteCount, err := s.countInvites(ctx, userID, wk.ActivityID, wStart, wEnd)
if err != nil {
return nil, err
}
for _, tier := range groupTiers {
tierProgressMap[tier.ID] = TierProgress{
TierID: tier.ID,
OrderCount: orderCount,
OrderAmount: orderAmount,
InviteCount: inviteCount,
FirstOrder: orderCount > 0,
}
}
}
var (
allRows []orderMetricRow
err error
)
if len(targetActivityIDs) > 0 {
allRows, err = s.fetchOrderMetricRows(ctx, userID, targetActivityIDs, nil, nil)
} else {
allRows, err = s.fetchOrderMetricRows(ctx, userID, nil, nil, nil)
}
if err != nil {
return nil, err
}
orderCount, orderAmount := s.aggregateOrderMetrics(allRows, false)
var subProgressList []ActivityProgress
if len(targetActivityIDs) > 0 {
subStats := make(map[int64]ActivityProgress)
for _, row := range allRows {
if row.ActivityID == 0 {
continue
}
stat := subStats[row.ActivityID]
stat.ActivityID = row.ActivityID
stat.OrderCount++
stat.OrderAmount += s.calculateEffectiveAmount(row)
subStats[row.ActivityID] = stat
}
subProgressList = make([]ActivityProgress, 0, len(targetActivityIDs))
for _, actID := range targetActivityIDs {
if stat, ok := subStats[actID]; ok {
subProgressList = append(subProgressList, stat)
}
}
}
inviteCount, err := s.countInvitesForActivities(ctx, userID, targetActivityIDs)
if err != nil {
return nil, err
}
var progressRows []tcmodel.UserTaskProgress
if err := db.Where("user_id=? AND task_id=?", userID, taskID).Find(&progressRows).Error; err != nil {
return nil, err
}
claimedSet := map[int64]struct{}{}
for _, row := range progressRows {
var claimed []int64
if len(row.ClaimedTiers) > 0 {
_ = json.Unmarshal([]byte(row.ClaimedTiers), &claimed)
}
for _, id := range claimed {
claimedSet[id] = struct{}{}
}
}
allClaimed := make([]int64, 0, len(claimedSet))
for id := range claimedSet {
allClaimed = append(allClaimed, id)
}
hasFirstOrder := orderCount > 0
return &UserProgress{
TaskID: taskID,
UserID: userID,
OrderCount: orderCount,
OrderAmount: orderAmount,
InviteCount: inviteCount,
FirstOrder: hasFirstOrder,
ClaimedTiers: allClaimed,
SubProgress: subProgressList,
TierProgressMap: tierProgressMap,
}, nil
}
func (s *service) ClaimTier(ctx context.Context, userID int64, taskID int64, tierID int64) error {
// BUG FIX: 增加前置校验,确保用户真的完成了该档位任务
progress, err := s.GetUserProgress(ctx, userID, taskID)
if err != nil {
return err
}
// 获取档位配置
var tier tcmodel.TaskTier
if err := s.repo.GetDbR().First(&tier, tierID).Error; err != nil {
return err
}
// BUG2 FIX: 多任务共享订单池问题 —— 获取 Redis 分布式锁,防止并发重复领取
if tier.ActivityID > 0 && s.redis != nil {
claimLockKey := fmt.Sprintf("tc:claim_lock:%d:%d", userID, tier.ActivityID)
locked, lockErr := s.redis.SetNX(ctx, claimLockKey, "1", 10*time.Second).Result()
if lockErr != nil {
s.logger.Error("ClaimTier: Redis lock error", zap.Error(lockErr), zap.Int64("user_id", userID), zap.Int64("activity_id", tier.ActivityID))
return lockErr
}
if !locked {
return errors.New("操作频繁,请稍后再试")
}
defer s.redis.Del(ctx, claimLockKey)
}
// 校验是否达标
// Bug1 修复:优先使用 TierProgressMap窗口化进度回退到全局进度
hit := false
var currentOrderCount, currentOrderAmount, currentInviteCount int64
if tp, ok := progress.TierProgressMap[tierID]; ok {
// 使用该 tier 所配置 window 内的独立统计值
currentOrderCount = tp.OrderCount
currentOrderAmount = tp.OrderAmount
currentInviteCount = tp.InviteCount
} else if tier.ActivityID > 0 {
// 回退:从 SubProgress 中找对应活动的进度
for _, sub := range progress.SubProgress {
if sub.ActivityID == tier.ActivityID {
currentOrderCount = sub.OrderCount
currentOrderAmount = sub.OrderAmount
break
}
}
} else {
currentOrderCount = progress.OrderCount
currentOrderAmount = progress.OrderAmount
currentInviteCount = progress.InviteCount
}
switch tier.Metric {
case MetricFirstOrder:
hit = progress.FirstOrder
case MetricOrderCount:
if tier.Operator == OperatorGTE {
hit = currentOrderCount >= tier.Threshold
} else {
hit = currentOrderCount == tier.Threshold
}
case MetricOrderAmount:
if tier.Operator == OperatorGTE {
hit = currentOrderAmount >= tier.Threshold
} else {
hit = currentOrderAmount == tier.Threshold
}
case MetricInviteCount:
if tier.Operator == OperatorGTE {
hit = currentInviteCount >= tier.Threshold
} else {
hit = currentInviteCount == tier.Threshold
}
}
// BUG2 FIX: 跨任务累加校验 —— 防止多任务共享同一 activityID 订单池,用户用同一批订单重复领多个任务奖励
// 规则:同一 activityID + 同一 metric 下,不同 taskID 间各取已领最大 threshold 后求和,
// 要求 currentValue >= consumedThreshold已消耗+ tier.Threshold本次需消耗
if tier.ActivityID > 0 && (tier.Metric == MetricOrderCount || tier.Metric == MetricOrderAmount || tier.Metric == MetricInviteCount) {
// 1. 查出同 activityID + 同 metric 下,属于其他 taskID 的所有 tier
var siblingTiers []tcmodel.TaskTier
if dbErr := s.repo.GetDbR().
Where("activity_id = ? AND metric = ? AND task_id != ?", tier.ActivityID, tier.Metric, taskID).
Find(&siblingTiers).Error; dbErr != nil {
return dbErr
}
// 2. 收集所有不同的 sibling taskID
siblingTaskIDs := make([]int64, 0, len(siblingTiers))
siblingTaskSet := make(map[int64]struct{})
for _, st := range siblingTiers {
if _, exists := siblingTaskSet[st.TaskID]; !exists {
siblingTaskSet[st.TaskID] = struct{}{}
siblingTaskIDs = append(siblingTaskIDs, st.TaskID)
}
}
// 3. 计算已被其他 taskID 消耗的 threshold 总和
// 同一 taskID 内按阶梯处理(取最大已领 threshold不同 taskID 间求和
var consumedThreshold int64
if len(siblingTaskIDs) > 0 {
// 查用户在这些 sibling task 下的进度记录(含 claimed_tiers JSON
var siblingProgresses []tcmodel.UserTaskProgress
if dbErr := s.repo.GetDbR().
Where("user_id = ? AND task_id IN ? AND activity_id = 0", userID, siblingTaskIDs).
Find(&siblingProgresses).Error; dbErr != nil {
return dbErr
}
// 按 taskID 整理已领取的 tierID 列表
taskClaimedTierIDs := make(map[int64][]int64) // taskID -> []claimedTierID
for _, sp := range siblingProgresses {
var claimedIDs []int64
if len(sp.ClaimedTiers) > 0 {
_ = json.Unmarshal([]byte(sp.ClaimedTiers), &claimedIDs)
}
if len(claimedIDs) > 0 {
taskClaimedTierIDs[sp.TaskID] = claimedIDs
}
}
// 对每个 sibling taskID查已领取 tier 中属于同 activityID+metric 的最大 threshold
for _, sibTaskID := range siblingTaskIDs {
claimedIDs, ok := taskClaimedTierIDs[sibTaskID]
if !ok || len(claimedIDs) == 0 {
continue
}
var claimedThresholds []int64
if dbErr := s.repo.GetDbR().Model(&tcmodel.TaskTier{}).
Where("id IN ? AND task_id = ? AND activity_id = ? AND metric = ?",
claimedIDs, sibTaskID, tier.ActivityID, tier.Metric).
Pluck("threshold", &claimedThresholds).Error; dbErr != nil {
return dbErr
}
// 同一 taskID 下阶梯式:只计最大已领 threshold
var maxThreshold int64
for _, th := range claimedThresholds {
if th > maxThreshold {
maxThreshold = th
}
}
consumedThreshold += maxThreshold
}
}
// 4. 校验当前进度是否足以同时覆盖已消耗量和本次所需量
var currentValue int64
switch tier.Metric {
case MetricOrderCount:
currentValue = currentOrderCount
case MetricOrderAmount:
currentValue = currentOrderAmount
case MetricInviteCount:
currentValue = currentInviteCount
}
if currentValue < consumedThreshold+tier.Threshold {
s.logger.Warn("ClaimTier: cross-task threshold validation failed",
zap.Int64("user_id", userID),
zap.Int64("task_id", taskID),
zap.Int64("tier_id", tierID),
zap.Int64("current_value", currentValue),
zap.Int64("consumed_threshold", consumedThreshold),
zap.Int64("tier_threshold", tier.Threshold),
)
return errors.New("订单量不足,已被其他任务消耗,无法领取")
}
}
if !hit {
return errors.New("任务条件未达成,无法领取")
}
// 获取任务信息
var task tcmodel.Task
if err := s.repo.GetDbR().First(&task, taskID).Error; err != nil {
return err
}
// 1.5 校验任务有效期
now := time.Now()
if task.StartTime != nil && now.Before(*task.StartTime) {
return errors.New("任务尚未开始")
}
if task.EndTime != nil && now.After(*task.EndTime) {
return errors.New("任务已经结束")
}
if task.Quota > 0 {
result := s.repo.GetDbW().Model(&tcmodel.Task{}).
Where("id = ? AND claimed_count < quota", taskID).
Update("claimed_count", gorm.Expr("claimed_count + 1"))
if result.Error != nil {
return result.Error
}
if result.RowsAffected == 0 {
return errors.New("奖励已领完")
}
s.logger.Info("ClaimTier: Task quota check passed", zap.Int64("task_id", taskID), zap.Int32("quota", task.Quota))
}
// 3. 先尝试发放奖励 (grantTierRewards 内部有幂等校验)
// IDK logic inside grantTierRewards ensures we don't double grant.
// We use "manual_claim" as source type.
// IMPORTANT: Call this BEFORE updating the progress status to avoid "Claimed but not received" state if grant fails.
s.logger.Info("ClaimTier: Starting reward grant...", zap.Int64("user_id", userID), zap.Int64("task_id", taskID), zap.Int64("tier_id", tierID))
if err := s.grantTierRewards(ctx, taskID, tierID, userID, "manual_claim", 0, fmt.Sprintf("claim:%d:%d:%d", userID, taskID, tierID)); err != nil {
s.logger.Error("ClaimTier: Reward grant failed", zap.Error(err), zap.Int64("user_id", userID), zap.Int64("tier_id", tierID))
return err
}
s.logger.Info("ClaimTier: Reward granted successfully", zap.Int64("user_id", userID), zap.Int64("tier_id", tierID))
// 2. 奖励发放成功后,事务中更新领取状态
err = s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
var p tcmodel.UserTaskProgress
err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=? AND activity_id=0", userID, taskID).First(&p).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
// 实时模式兼容:自动创建进度记录用于存储已领取状态
p = tcmodel.UserTaskProgress{
UserID: userID,
TaskID: taskID,
ActivityID: 0,
ClaimedTiers: datatypes.JSON("[]"),
}
if err := tx.Create(&p).Error; err != nil {
return err
}
} else {
return err
}
}
var claimed []int64
if len(p.ClaimedTiers) > 0 {
_ = json.Unmarshal([]byte(p.ClaimedTiers), &claimed)
}
for _, id := range claimed {
if id == tierID {
return nil // 已更新状态,无需重复更新
}
}
claimed = append(claimed, tierID)
b, _ := json.Marshal(claimed)
p.ClaimedTiers = datatypes.JSON(b)
return tx.Model(&tcmodel.UserTaskProgress{}).Where("id=?", p.ID).Update("claimed_tiers", p.ClaimedTiers).Error
})
if err != nil {
s.logger.Error("ClaimTier: Failed to update status", zap.Error(err))
return err
}
s.logger.Info("ClaimTier: Status updated successfully", zap.Int64("user_id", userID), zap.Int64("tier_id", tierID))
return nil
}
func (s *service) CreateTask(ctx context.Context, in CreateTaskInput) (int64, error) {
db := s.repo.GetDbW()
row := &tcmodel.Task{Name: in.Name, Description: in.Description, Status: in.Status, StartTime: in.StartTime, EndTime: in.EndTime, Visibility: in.Visibility, Quota: in.Quota, ClaimedCount: 0}
if err := db.Create(row).Error; err != nil {
return 0, err
}
return row.ID, s.invalidateCache(ctx)
}
func (s *service) ModifyTask(ctx context.Context, id int64, in ModifyTaskInput) error {
db := s.repo.GetDbW()
if err := db.Model(&tcmodel.Task{}).Where("id=?", id).Updates(map[string]any{"name": in.Name, "description": in.Description, "status": in.Status, "start_time": in.StartTime, "end_time": in.EndTime, "visibility": in.Visibility, "quota": in.Quota}).Error; err != nil {
return err
}
return s.invalidateCache(ctx)
}
func (s *service) DeleteTask(ctx context.Context, id int64) error {
db := s.repo.GetDbW()
if err := db.Where("id=?", id).Delete(&tcmodel.Task{}).Error; err != nil {
return err
}
return s.invalidateCache(ctx)
}
func (s *service) ListTaskTiers(ctx context.Context, taskID int64) ([]TaskTierItem, error) {
db := s.repo.GetDbR()
var rows []tcmodel.TaskTier
if err := db.Where("task_id=?", taskID).Order("priority asc, id asc").Find(&rows).Error; err != nil {
return nil, err
}
out := make([]TaskTierItem, len(rows))
for i, v := range rows {
remaining := int32(-1) // -1 表示不限
if v.Quota > 0 {
remaining = v.Quota - v.ClaimedCount
if remaining < 0 {
remaining = 0
}
}
out[i] = TaskTierItem{ID: v.ID, Metric: v.Metric, Operator: v.Operator, Threshold: v.Threshold, Window: normalizeWindow(v.Window), Repeatable: v.Repeatable, Priority: v.Priority, ActivityID: v.ActivityID, ExtraParams: v.ExtraParams, Quota: v.Quota, ClaimedCount: v.ClaimedCount, Remaining: remaining}
}
return out, nil
}
func (s *service) UpsertTaskTiers(ctx context.Context, taskID int64, tiers []TaskTierInput) error {
db := s.repo.GetDbW()
// 1. 获取现有档位
var existing []tcmodel.TaskTier
if err := db.Where("task_id=?", taskID).Find(&existing).Error; err != nil {
return err
}
existingMap := make(map[string]tcmodel.TaskTier)
for _, t := range existing {
window := normalizeWindow(t.Window)
t.Window = window
key := tierFingerprint(t.Metric, t.Threshold, t.ActivityID, window)
existingMap[key] = t
}
var toDelete []int64
var toUpdate []tcmodel.TaskTier
var toCreate []tcmodel.TaskTier
processedKeys := make(map[string]struct{})
for _, t := range tiers {
window, err := normalizeWindowStrict(t.Window)
if err != nil {
return err
}
key := tierFingerprint(t.Metric, t.Threshold, t.ActivityID, window)
if old, ok := existingMap[key]; ok {
// 更新现有记录,保留 ID 和 ClaimedCount
old.Operator = t.Operator
old.Window = window
old.Repeatable = t.Repeatable
old.Priority = t.Priority
old.ExtraParams = t.ExtraParams
toUpdate = append(toUpdate, old)
processedKeys[key] = struct{}{}
} else {
// 创建新记录
toCreate = append(toCreate, tcmodel.TaskTier{
TaskID: taskID,
Metric: t.Metric,
Operator: t.Operator,
Threshold: t.Threshold,
Window: window,
Repeatable: t.Repeatable,
Priority: t.Priority,
ActivityID: t.ActivityID,
ExtraParams: t.ExtraParams,
})
}
}
for key, old := range existingMap {
if _, ok := processedKeys[key]; !ok {
toDelete = append(toDelete, old.ID)
}
}
return db.Transaction(func(tx *gorm.DB) error {
if len(toDelete) > 0 {
if err := tx.Delete(&tcmodel.TaskTier{}, toDelete).Error; err != nil {
return err
}
}
for _, t := range toUpdate {
if err := tx.Save(&t).Error; err != nil {
return err
}
}
if len(toCreate) > 0 {
if err := tx.Create(&toCreate).Error; err != nil {
return err
}
}
return nil
})
}
func (s *service) ListTaskRewards(ctx context.Context, taskID int64) ([]TaskRewardItem, error) {
db := s.repo.GetDbR()
var rows []tcmodel.TaskReward
if err := db.Where("task_id=?", taskID).Order("id asc").Find(&rows).Error; err != nil {
return nil, err
}
out := make([]TaskRewardItem, len(rows))
for i, v := range rows {
out[i] = TaskRewardItem{ID: v.ID, TierID: v.TierID, RewardType: v.RewardType, RewardPayload: v.RewardPayload, Quantity: v.Quantity}
}
return out, nil
}
func (s *service) UpsertTaskRewards(ctx context.Context, taskID int64, rewards []TaskRewardInput, deleteIDs []int64) error {
db := s.repo.GetDbW()
var existing []tcmodel.TaskReward
if err := db.Where("task_id=?", taskID).Find(&existing).Error; err != nil {
return err
}
existingByID := make(map[int64]tcmodel.TaskReward, len(existing))
for _, r := range existing {
existingByID[r.ID] = r
}
var toUpdate []tcmodel.TaskReward
var toCreate []tcmodel.TaskReward
seen := make(map[int64]struct{})
for _, r := range rewards {
if r.ID > 0 {
old, ok := existingByID[r.ID]
if !ok || old.TaskID != taskID {
return fmt.Errorf("reward %d not found", r.ID)
}
old.TierID = r.TierID
old.RewardType = r.RewardType
old.RewardPayload = r.RewardPayload
old.Quantity = r.Quantity
toUpdate = append(toUpdate, old)
seen[r.ID] = struct{}{}
continue
}
toCreate = append(toCreate, tcmodel.TaskReward{
TaskID: taskID,
TierID: r.TierID,
RewardType: r.RewardType,
RewardPayload: r.RewardPayload,
Quantity: r.Quantity,
})
}
var toDelete []int64
if len(deleteIDs) > 0 {
for _, id := range deleteIDs {
if reward, ok := existingByID[id]; ok {
toDelete = append(toDelete, reward.ID)
}
}
} else {
for id := range existingByID {
if _, ok := seen[id]; !ok {
toDelete = append(toDelete, id)
}
}
}
return db.Transaction(func(tx *gorm.DB) error {
if len(toDelete) > 0 {
if err := tx.Delete(&tcmodel.TaskReward{}, toDelete).Error; err != nil {
return err
}
}
for _, r := range toUpdate {
if err := tx.Save(&r).Error; err != nil {
return err
}
}
if len(toCreate) > 0 {
if err := tx.Create(&toCreate).Error; err != nil {
return err
}
}
return nil
})
}
func (s *service) OnOrderPaid(ctx context.Context, userID int64, orderID int64) error {
if s.queue != nil {
return s.queue.PublishOrderPaid(ctx, userID, orderID)
}
return s.processOrderPaid(ctx, userID, orderID)
}
func (s *service) processOrderPaid(ctx context.Context, userID int64, orderID int64) error {
// 1. 获取订单金额
ord, err := s.readDB.Orders.WithContext(ctx).Where(s.readDB.Orders.ID.Eq(orderID)).First()
if err != nil {
return err
}
// 1.0 状态校验与幂等性检查
// 仅处理已支付订单
if ord.Status != 2 {
s.logger.Warn("Order not paid, skip task center", zap.Int64("order_id", orderID), zap.Int32("status", ord.Status))
return nil
}
// 使用 Redis 进行 24 小时内的幂等性拦截,防止重复触发进度计算
if s.redis != nil {
lockKey := fmt.Sprintf("tc:proc:order:%d", orderID)
set, err := s.redis.SetNX(ctx, lockKey, "1", 24*time.Hour).Result()
if err != nil {
s.logger.Error("Redis idempotency check failed", zap.Error(err))
// 如果 Redis 异常,为了保险起见,我们可以选择继续处理(由数据库事务保证底层原子性,虽然非严格幂等)
// 或者返回错误。这里选择返回错误让调用方重试或记录日志。
return err
}
if !set {
s.logger.Info("Order already processed by task center", zap.Int64("order_id", orderID))
return nil
}
}
amount := ord.ActualAmount
rmk := remark.Parse(ord.Remark)
_ = rmk // 手动模式下不再需要 activityID
// 2. 更新邀请人累计金额(用于 GetUserProgress 中判断有效邀请)
// 使用事务更新 UserInvites
err = s.writeDB.Transaction(func(tx *dao.Query) error {
uInv, err := tx.UserInvites.WithContext(ctx).Clauses(clause.Locking{Strength: "UPDATE"}).Where(tx.UserInvites.InviteeID.Eq(userID)).First()
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil
}
return err
}
newAmount := uInv.AccumulatedAmount + amount
updates := map[string]any{
"accumulated_amount": newAmount,
}
_, err = tx.UserInvites.WithContext(ctx).Where(tx.UserInvites.ID.Eq(uInv.ID)).Updates(updates)
return err
})
if err != nil {
return err
}
// 手动领取模式:进度从订单表实时统计,此处不再预计算
// 仅保留邀请金额累计,用于判断有效邀请
return nil
}
func (s *service) OnInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error {
if s.queue != nil {
return s.queue.PublishInviteSuccess(ctx, inviterID, inviteeID)
}
return s.processInviteSuccess(ctx, inviterID, inviteeID)
}
func (s *service) processInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error {
// 手动领取模式:邀请数从 user_invites 表实时统计,此处不再预计算
return nil
}
func (s *service) checkAndResetDailyProgress(ctx context.Context, tx *gorm.DB, t *tcmodel.Task, p *tcmodel.UserTaskProgress) error {
isDaily := false
if len(t.Tiers) > 0 {
for _, tier := range t.Tiers {
if tier.Window == WindowDaily {
isDaily = true
break
}
}
} else {
var count int64
if err := tx.Model(&tcmodel.TaskTier{}).Where("task_id = ? AND window = ?", t.ID, WindowDaily).Count(&count).Error; err != nil {
return err
}
isDaily = count > 0
}
if !isDaily {
return nil
}
now := time.Now()
if p.UpdatedAt.IsZero() {
return nil
}
y1, m1, d1 := p.UpdatedAt.Date()
y2, m2, d2 := now.Date()
if y1 == y2 && m1 == m2 && d1 == d2 {
return nil
}
s.logger.Info("Daily progress reset",
zap.Int64("user_id", p.UserID),
zap.Int64("task_id", t.ID),
zap.Time("last_update", p.UpdatedAt),
zap.Time("now", now),
)
p.OrderCount = 0
p.OrderAmount = 0
p.InviteCount = 0
p.FirstOrder = 0
p.ClaimedTiers = datatypes.JSON("[]")
return nil
}
func (s *service) matchAndGrantExtended(ctx context.Context, t *tcmodel.Task, p *tcmodel.UserTaskProgress, sourceType string, sourceID int64, eventID string, isGlobalNewUser bool) error {
tiers := t.Tiers
if len(tiers) == 0 {
if err := s.repo.GetDbR().Where("task_id=?", t.ID).Order("priority asc").Find(&tiers).Error; err != nil {
return err
}
}
var claimed []int64
if len(p.ClaimedTiers) > 0 {
_ = json.Unmarshal([]byte(p.ClaimedTiers), &claimed)
}
claimedSet := map[int64]struct{}{}
for _, id := range claimed {
claimedSet[id] = struct{}{}
}
for _, tier := range tiers {
if _, ok := claimedSet[tier.ID]; ok {
continue
}
// 活动维度过滤:如果 Tier 指定了 ActivityID必须与进度记录中的 ActivityID 一致
if tier.ActivityID > 0 && tier.ActivityID != p.ActivityID {
continue
}
var extra struct {
NewUserOnly bool `json:"new_user_only"`
}
if len(tier.ExtraParams) > 0 {
_ = json.Unmarshal([]byte(tier.ExtraParams), &extra)
}
s.logger.Debug("Evaluating tier",
zap.Int64("tier_id", tier.ID),
zap.String("metric", tier.Metric),
zap.Int64("order_count", p.OrderCount),
zap.Int64("threshold", tier.Threshold),
zap.Int64("activity_id", tier.ActivityID),
zap.Int64("my_activity_id", p.ActivityID),
zap.Int32("first_order", p.FirstOrder),
zap.Bool("is_global_new", isGlobalNewUser),
)
hit := false
switch tier.Metric {
case MetricFirstOrder:
hit = p.FirstOrder == 1
// 如果要求新用户,则必须满足全局新用户条件
if extra.NewUserOnly && !isGlobalNewUser {
hit = false
}
case MetricOrderCount:
if tier.Operator == OperatorGTE {
hit = p.OrderCount >= tier.Threshold
} else {
hit = p.OrderCount == tier.Threshold
}
if extra.NewUserOnly && !isGlobalNewUser {
hit = false
}
case MetricOrderAmount:
if tier.Operator == OperatorGTE {
hit = p.OrderAmount >= tier.Threshold
} else {
hit = p.OrderAmount == tier.Threshold
}
if extra.NewUserOnly && !isGlobalNewUser {
hit = false
}
case MetricInviteCount:
if tier.Operator == OperatorGTE {
hit = p.InviteCount >= tier.Threshold
} else {
hit = p.InviteCount == tier.Threshold
}
}
if !hit {
s.logger.Debug("Tier not hit", zap.Int64("tier_id", tier.ID))
continue
}
s.logger.Info("Tier Hit! Granting...", zap.Int64("tier_id", tier.ID), zap.Int64("user_id", p.UserID))
if err := s.grantTierRewards(ctx, t.ID, tier.ID, p.UserID, sourceType, sourceID, eventID); err != nil {
return err
}
// 安全更新状态
err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
var latestP tcmodel.UserTaskProgress
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id=?", p.ID).First(&latestP).Error; err != nil {
return err
}
var latestClaimed []int64
if len(latestP.ClaimedTiers) > 0 {
_ = json.Unmarshal([]byte(latestP.ClaimedTiers), &latestClaimed)
}
for _, id := range latestClaimed {
if id == tier.ID {
return nil
}
}
latestClaimed = append(latestClaimed, tier.ID)
b, _ := json.Marshal(latestClaimed)
latestP.ClaimedTiers = datatypes.JSON(b)
return tx.Model(&latestP).Update("claimed_tiers", latestP.ClaimedTiers).Error
})
if err != nil {
return err
}
}
return nil
}
func (s *service) grantTierRewards(ctx context.Context, taskID int64, tierID int64, userID int64, sourceType string, sourceID int64, eventID string) error {
var rewards []tcmodel.TaskReward
if err := s.repo.GetDbR().Where("task_id=? AND tier_id=?", taskID, tierID).Find(&rewards).Error; err != nil {
return err
}
// 容错处理:如果直接根据 tier_id 找不到奖励,可能是 ID 变更导致的。
// 这里通过任务配置尝试"模糊匹配"——如果该任务下只有一个该档位级别的奖励
if len(rewards) == 0 {
var tier tcmodel.TaskTier
if err := s.repo.GetDbR().First(&tier, tierID).Error; err == nil {
s.logger.Warn("Tier ID mismatch or no rewards configured", zap.Int64("tier_id", tierID))
}
return errors.New("no rewards configured for this tier")
}
idk := fmt.Sprintf("%d:%d:%d:%s:%d", userID, taskID, tierID, sourceType, sourceID)
var exists tcmodel.TaskEventLog
if err := s.repo.GetDbR().Where("idempotency_key=?", idk).First(&exists).Error; err == nil && exists.ID > 0 {
return nil
}
tx := s.repo.GetDbW().Begin()
s.logger.Info("Granting rewards for task",
zap.Int64("user_id", userID),
zap.Int64("task_id", taskID),
zap.Int64("tier_id", tierID),
zap.String("event_id", eventID),
)
for _, r := range rewards {
var err error
switch r.RewardType {
case "points":
var pl struct {
Points int64 `json:"points"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
points := pl.Points
// 回退:如果 payload 中没有 points 字段,使用 quantity 字段
if points == 0 && r.Quantity > 0 {
points = r.Quantity
}
if points != 0 {
s.logger.Info("Granting points reward", zap.Int64("user_id", userID), zap.Int64("points", points))
err = s.userSvc.AddPoints(ctx, userID, points, "task_reward", "task_center", nil, nil)
}
case "coupon":
var pl struct {
CouponID int64 `json:"coupon_id"`
Quantity int `json:"quantity"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.CouponID > 0 {
// BUG 修复:优先使用 r.Quantity仅当 > 1 时),否则使用 payload否则默认 1
qty := 1
if r.Quantity > 1 {
qty = int(r.Quantity)
} else if pl.Quantity > 0 {
qty = pl.Quantity
} else if r.Quantity == 1 {
qty = 1 // 显式设置为 1
}
s.logger.Info("Granting coupon reward", zap.Int64("user_id", userID), zap.Int64("coupon_id", pl.CouponID), zap.Int("quantity", qty))
for i := 0; i < qty; i++ {
if err = s.userSvc.AddCoupon(ctx, userID, pl.CouponID); err != nil {
break
}
}
}
case "item_card":
var pl struct {
CardID int64 `json:"card_id"`
Quantity int `json:"quantity"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.CardID > 0 {
// BUG 修复:优先使用 r.Quantity仅当 > 1 时),否则使用 payload否则默认 1
qty := 1
if r.Quantity > 1 {
qty = int(r.Quantity)
} else if pl.Quantity > 0 {
qty = pl.Quantity
} else if r.Quantity == 1 {
qty = 1 // 显式设置为 1
}
s.logger.Info("Granting item card reward", zap.Int64("user_id", userID), zap.Int64("card_id", pl.CardID), zap.Int("quantity", qty))
err = s.userSvc.AddItemCard(ctx, userID, pl.CardID, qty)
}
case "title":
var pl struct {
TitleID int64 `json:"title_id"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.TitleID > 0 {
s.logger.Info("Granting title reward", zap.Int64("user_id", userID), zap.Int64("title_id", pl.TitleID))
err = s.titleSvc.AssignUserTitle(ctx, userID, pl.TitleID, nil, "task_center")
}
case "game_ticket":
var pl struct {
GameCode string `json:"game_code"`
Amount int `json:"amount"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.GameCode != "" {
// BUG 修复:增加对 r.Quantity 的支持,统一数量解析逻辑
amount := 1
if r.Quantity > 1 {
amount = int(r.Quantity)
} else if pl.Amount > 0 {
amount = pl.Amount
} else if r.Quantity == 1 {
amount = 1
}
s.logger.Info("Granting game ticket reward", zap.Int64("user_id", userID), zap.String("game_code", pl.GameCode), zap.Int("amount", amount))
gameSvc := gamesvc.NewTicketService(s.logger, s.repo)
err = gameSvc.GrantTicket(ctx, userID, pl.GameCode, amount, "task_center", taskID, "任务奖励")
}
case "product":
var pl struct {
ProductID int64 `json:"product_id"`
Quantity int `json:"quantity"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.ProductID > 0 {
// BUG 修复:优先使用 r.Quantity仅当 > 1 时),否则使用 payload否则默认 1
qty := 1
if r.Quantity > 1 {
qty = int(r.Quantity)
} else if pl.Quantity > 0 {
qty = pl.Quantity
} else if r.Quantity == 1 {
qty = 1 // 显式设置为 1
}
s.logger.Info("Granting product reward", zap.Int64("user_id", userID), zap.Int64("product_id", pl.ProductID), zap.Int("quantity", qty))
// 通过用户服务发放商品(创建待发货订单)
_, err = s.userSvc.GrantReward(ctx, userID, usersvc.GrantRewardRequest{
ProductID: pl.ProductID,
Quantity: qty,
Remark: "任务奖励",
})
}
default:
s.logger.Warn("Unknown reward type", zap.String("type", r.RewardType))
}
if err != nil {
s.logger.Error("Failed to grant reward",
zap.String("type", r.RewardType),
zap.Int64("user_id", userID),
zap.Error(err))
tx.Rollback()
return err
} else {
s.logger.Info("Successfully granted reward", zap.String("type", r.RewardType), zap.Int64("user_id", userID))
}
}
if err := tx.Create(&tcmodel.TaskEventLog{EventID: eventID, SourceType: sourceType, SourceID: sourceID, UserID: userID, TaskID: taskID, TierID: tierID, IdempotencyKey: idk, Status: "granted"}).Error; err != nil {
tx.Rollback()
return err
}
return tx.Commit().Error
}