2026-02-06 16:49:27 +08:00

1112 lines
35 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package taskcenter
import (
"bindbox-game/internal/pkg/async"
"bindbox-game/internal/pkg/logger"
"bindbox-game/internal/pkg/util/remark"
"bindbox-game/internal/repository/mysql"
"bindbox-game/internal/repository/mysql/dao"
"bindbox-game/internal/repository/mysql/model"
tcmodel "bindbox-game/internal/repository/mysql/task_center"
"context"
"encoding/json"
"errors"
"fmt"
"time"
gamesvc "bindbox-game/internal/service/game"
titlesvc "bindbox-game/internal/service/title"
usersvc "bindbox-game/internal/service/user"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/datatypes"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type Service interface {
ListTasks(ctx context.Context, in ListTasksInput) (items []TaskItem, total int64, err error)
CreateTask(ctx context.Context, in CreateTaskInput) (int64, error)
ModifyTask(ctx context.Context, id int64, in ModifyTaskInput) error
DeleteTask(ctx context.Context, id int64) error
ListTaskTiers(ctx context.Context, taskID int64) ([]TaskTierItem, error)
UpsertTaskTiers(ctx context.Context, taskID int64, tiers []TaskTierInput) error
ListTaskRewards(ctx context.Context, taskID int64) ([]TaskRewardItem, error)
UpsertTaskRewards(ctx context.Context, taskID int64, rewards []TaskRewardInput) error
GetUserProgress(ctx context.Context, userID int64, taskID int64) (*UserProgress, error)
ClaimTier(ctx context.Context, userID int64, taskID int64, tierID int64) error
OnOrderPaid(ctx context.Context, userID int64, orderID int64) error
OnInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error
StartWorker(ctx context.Context)
}
type service struct {
logger logger.CustomLogger
readDB *dao.Query
writeDB *dao.Query
repo mysql.Repo
redis *redis.Client
queue async.TaskQueue
userSvc usersvc.Service
titleSvc titlesvc.Service
}
func New(l logger.CustomLogger, db mysql.Repo, rdb *redis.Client, userSvc usersvc.Service, titleSvc titlesvc.Service) Service {
var q async.TaskQueue
if rdb != nil {
q = async.NewRedisTaskQueue(rdb)
}
return &service{
logger: l,
readDB: dao.Use(db.GetDbR()),
writeDB: dao.Use(db.GetDbW()),
repo: db,
redis: rdb,
queue: q,
userSvc: userSvc,
titleSvc: titleSvc,
}
}
type ListTasksInput struct {
Page int
PageSize int
}
type TaskItem struct {
ID int64
Name string
Description string
Status int32
StartTime int64
EndTime int64
Visibility int32
Tiers []TaskTierItem
Rewards []TaskRewardItem
}
type UserProgress struct {
TaskID int64
UserID int64
OrderCount int64
OrderAmount int64
InviteCount int64
FirstOrder bool
ClaimedTiers []int64
}
type CreateTaskInput struct {
Name string
Description string
Status int32
StartTime *time.Time
EndTime *time.Time
Visibility int32
}
type ModifyTaskInput struct {
Name string
Description string
Status int32
StartTime *time.Time
EndTime *time.Time
Visibility int32
}
type TaskTierInput struct {
Metric string
Operator string
Threshold int64
Window string
Repeatable int32
Priority int32
ActivityID int64
ExtraParams datatypes.JSON
}
type TaskTierItem struct {
ID int64 `json:"id"`
Metric string `json:"metric"`
Operator string `json:"operator"`
Threshold int64 `json:"threshold"`
Window string `json:"window"`
Repeatable int32 `json:"repeatable"`
Priority int32 `json:"priority"`
ActivityID int64 `json:"activity_id"`
ExtraParams datatypes.JSON `json:"extra_params"`
Quota int32 `json:"quota"` // 总限额0表示不限
ClaimedCount int32 `json:"claimed_count"` // 已领取数
Remaining int32 `json:"remaining"` // 剩余可领,-1表示不限
}
type TaskRewardInput struct {
TierID int64 `json:"tier_id"`
RewardType string `json:"reward_type"`
RewardPayload datatypes.JSON `json:"reward_payload"`
Quantity int64 `json:"quantity"`
}
type TaskRewardItem struct {
ID int64 `json:"id"`
TierID int64 `json:"tier_id"`
RewardType string `json:"reward_type"`
RewardPayload datatypes.JSON `json:"reward_payload"`
Quantity int64 `json:"quantity"`
RewardName string `json:"reward_name"`
}
func (s *service) ListTasks(ctx context.Context, in ListTasksInput) (items []TaskItem, total int64, err error) {
db := s.repo.GetDbR()
var rows []tcmodel.Task
q := db.Model(&tcmodel.Task{})
if in.PageSize <= 0 {
in.PageSize = 20
}
if in.Page <= 0 {
in.Page = 1
}
if err = q.Count(&total).Error; err != nil {
return nil, 0, err
}
if err = q.Preload("Tiers", func(db *gorm.DB) *gorm.DB {
return db.Order("priority asc, id asc")
}).Preload("Rewards", func(db *gorm.DB) *gorm.DB {
return db.Order("id asc")
}).Offset((in.Page - 1) * in.PageSize).Limit(in.PageSize).Order("id desc").Find(&rows).Error; err != nil {
return nil, 0, err
}
out := make([]TaskItem, len(rows))
// Pre-calculation: collect IDs for batch lookup
var couponIDs []int64
var itemCardIDs []int64
var titleIDs []int64
for _, v := range rows {
for _, r := range v.Rewards {
switch r.RewardType {
case RewardTypeCoupon:
var pl struct {
CouponID int64 `json:"coupon_id"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.CouponID > 0 {
couponIDs = append(couponIDs, pl.CouponID)
}
case RewardTypeItemCard:
var pl struct {
CardID int64 `json:"card_id"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.CardID > 0 {
itemCardIDs = append(itemCardIDs, pl.CardID)
}
case RewardTypeTitle:
var pl struct {
TitleID int64 `json:"title_id"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.TitleID > 0 {
titleIDs = append(titleIDs, pl.TitleID)
}
}
}
}
// Batch fetch names
couponMap := make(map[int64]string)
if len(couponIDs) > 0 {
var list []model.SystemCoupons
if err := db.Select("id, name").Find(&list, couponIDs).Error; err == nil {
for _, v := range list {
couponMap[v.ID] = v.Name
}
}
}
itemCardMap := make(map[int64]string)
if len(itemCardIDs) > 0 {
var list []model.SystemItemCards
if err := db.Select("id, name").Find(&list, itemCardIDs).Error; err == nil {
for _, v := range list {
itemCardMap[v.ID] = v.Name
}
}
}
titleMap := make(map[int64]string)
if len(titleIDs) > 0 {
var list []model.SystemTitles
if err := db.Select("id, name").Find(&list, titleIDs).Error; err == nil {
for _, v := range list {
titleMap[v.ID] = v.Name
}
}
}
for i, v := range rows {
var st, et int64
if v.StartTime != nil {
st = v.StartTime.Unix()
}
if v.EndTime != nil {
et = v.EndTime.Unix()
}
out[i] = TaskItem{ID: v.ID, Name: v.Name, Description: v.Description, Status: v.Status, StartTime: st, EndTime: et, Visibility: v.Visibility}
// 填充 Tiers
out[i].Tiers = make([]TaskTierItem, len(v.Tiers))
for j, t := range v.Tiers {
remaining := int32(-1) // -1 表示不限
if t.Quota > 0 {
remaining = t.Quota - t.ClaimedCount
if remaining < 0 {
remaining = 0
}
}
out[i].Tiers[j] = TaskTierItem{ID: t.ID, Metric: t.Metric, Operator: t.Operator, Threshold: t.Threshold, Window: t.Window, Repeatable: t.Repeatable, Priority: t.Priority, ActivityID: t.ActivityID, ExtraParams: t.ExtraParams, Quota: t.Quota, ClaimedCount: t.ClaimedCount, Remaining: remaining}
}
// 填充 Rewards
out[i].Rewards = make([]TaskRewardItem, len(v.Rewards))
for j, r := range v.Rewards {
name := ""
switch r.RewardType {
case RewardTypePoints:
name = "积分"
case RewardTypeGameTicket:
name = "抽奖券"
case RewardTypeCoupon:
var pl struct {
CouponID int64 `json:"coupon_id"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
name = couponMap[pl.CouponID]
case RewardTypeItemCard:
var pl struct {
CardID int64 `json:"card_id"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
name = itemCardMap[pl.CardID]
case RewardTypeTitle:
var pl struct {
TitleID int64 `json:"title_id"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
name = titleMap[pl.TitleID]
}
out[i].Rewards[j] = TaskRewardItem{ID: r.ID, TierID: r.TierID, RewardType: r.RewardType, RewardPayload: r.RewardPayload, Quantity: r.Quantity, RewardName: name}
}
}
return out, total, nil
}
func (s *service) GetUserProgress(ctx context.Context, userID int64, taskID int64) (*UserProgress, error) {
db := s.repo.GetDbR()
// 3.0 获取任务的 ActivityID 限制
var tiers []tcmodel.TaskTier
// 只需要查 ActivityID > 0 的记录即可判断
db.Select("activity_id").Where("task_id=? AND activity_id > 0", taskID).Limit(1).Find(&tiers)
targetActivityID := int64(0)
if len(tiers) > 0 {
targetActivityID = tiers[0].ActivityID
}
// 1. 实时统计订单数据
// BUG修复排除商城订单(source_type=1),只统计抽奖相关订单
// 通过 activity_draw_logs 和 activity_issues 表关联订单到活动
var orderCount int64
var orderAmount int64
if targetActivityID > 0 {
// 有活动ID限制时通过 activity_draw_logs → activity_issues 关联过滤
// 统计订单数量(使用 WHERE IN 子查询防止 JOIN 导致的重复计数问题)
db.Raw(`
SELECT COUNT(id)
FROM orders
WHERE user_id = ? AND status = 2 AND source_type != 1
AND id IN (
SELECT DISTINCT dl.order_id
FROM activity_draw_logs dl
INNER JOIN activity_issues ai ON ai.id = dl.issue_id
WHERE ai.activity_id = ?
)
`, userID, targetActivityID).Scan(&orderCount)
// 统计订单金额
// BUG修复已解决 JOIN activity_draw_logs 导致金额翻倍的问题
db.Raw(`
SELECT COALESCE(SUM(total_amount), 0)
FROM orders
WHERE user_id = ? AND status = 2 AND source_type != 1
AND id IN (
SELECT DISTINCT dl.order_id
FROM activity_draw_logs dl
INNER JOIN activity_issues ai ON ai.id = dl.issue_id
WHERE ai.activity_id = ?
)
`, userID, targetActivityID).Scan(&orderAmount)
} else {
// 无活动ID限制时统计所有非商城订单
// 增加 EXISTS 检查,确保订单已开奖(有开奖日志)
query := db.Model(&model.Orders{}).Where("user_id = ? AND status = 2 AND source_type != 1", userID)
query.Where("EXISTS (SELECT 1 FROM activity_draw_logs WHERE activity_draw_logs.order_id = orders.id)")
query.Count(&orderCount)
queryAmount := db.Model(&model.Orders{}).Where("user_id = ? AND status = 2 AND source_type != 1", userID)
queryAmount.Where("EXISTS (SELECT 1 FROM activity_draw_logs WHERE activity_draw_logs.order_id = orders.id)")
queryAmount.Select("COALESCE(SUM(total_amount), 0)").Scan(&orderAmount)
}
// 2. 实时统计邀请数据
var inviteCount int64
if targetActivityID > 0 {
// 根据配置计算:如果任务限定了活动,则只统计在该活动中有有效抽奖的人数(有效转化)
db.Raw(`
SELECT COUNT(DISTINCT ui.invitee_id)
FROM user_invites ui
INNER JOIN orders o ON o.user_id = ui.invitee_id AND o.status = 2 AND o.source_type != 1
WHERE ui.inviter_id = ?
AND o.id IN (
SELECT DISTINCT dl.order_id
FROM activity_draw_logs dl
INNER JOIN activity_issues ai ON ai.id = dl.issue_id
WHERE ai.activity_id = ?
)
`, userID, targetActivityID).Scan(&inviteCount)
} else {
// 全量统计(注册即计入):为了与前端“邀请记录”页面的总数对齐(针对全局任务)
db.Model(&model.UserInvites{}).Where("inviter_id = ?", userID).Count(&inviteCount)
}
// 3. 首单判断
hasFirstOrder := orderCount > 0
// 4. 从进度表读取已领取的档位(这部分仍需保留)
var rows []tcmodel.UserTaskProgress
db.Where("user_id=? AND task_id=?", userID, taskID).Find(&rows)
claimedSet := map[int64]struct{}{}
for _, row := range rows {
var claimed []int64
if len(row.ClaimedTiers) > 0 {
_ = json.Unmarshal([]byte(row.ClaimedTiers), &claimed)
}
for _, id := range claimed {
claimedSet[id] = struct{}{}
}
}
allClaimed := make([]int64, 0, len(claimedSet))
for id := range claimedSet {
allClaimed = append(allClaimed, id)
}
return &UserProgress{
TaskID: taskID,
UserID: userID,
OrderCount: orderCount,
OrderAmount: orderAmount,
InviteCount: inviteCount,
FirstOrder: hasFirstOrder,
ClaimedTiers: allClaimed,
}, nil
}
func (s *service) ClaimTier(ctx context.Context, userID int64, taskID int64, tierID int64) error {
// BUG FIX: 增加前置校验,确保用户真的完成了该档位任务
progress, err := s.GetUserProgress(ctx, userID, taskID)
if err != nil {
return err
}
// 获取档位配置
var tier tcmodel.TaskTier
if err := s.repo.GetDbR().First(&tier, tierID).Error; err != nil {
return err
}
// 校验是否达标
hit := false
switch tier.Metric {
case MetricFirstOrder:
hit = progress.FirstOrder
case MetricOrderCount:
if tier.Operator == OperatorGTE {
hit = progress.OrderCount >= tier.Threshold
} else {
hit = progress.OrderCount == tier.Threshold
}
case MetricOrderAmount:
if tier.Operator == OperatorGTE {
hit = progress.OrderAmount >= tier.Threshold
} else {
hit = progress.OrderAmount == tier.Threshold
}
case MetricInviteCount:
if tier.Operator == OperatorGTE {
hit = progress.InviteCount >= tier.Threshold
} else {
hit = progress.InviteCount == tier.Threshold
}
}
if !hit {
return errors.New("任务条件未达成,无法领取")
}
// 2. 限额校验如果设置了限额quota > 0需要原子性地增加 claimed_count
if tier.Quota > 0 {
result := s.repo.GetDbW().Model(&tcmodel.TaskTier{}).
Where("id = ? AND claimed_count < quota", tierID).
Update("claimed_count", gorm.Expr("claimed_count + 1"))
if result.Error != nil {
return result.Error
}
if result.RowsAffected == 0 {
return errors.New("奖励已领完")
}
s.logger.Info("ClaimTier: Quota check passed", zap.Int64("tier_id", tierID), zap.Int32("quota", tier.Quota))
}
// 3. 先尝试发放奖励 (grantTierRewards 内部有幂等校验)
// IDK logic inside grantTierRewards ensures we don't double grant.
// We use "manual_claim" as source type.
// IMPORTANT: Call this BEFORE updating the progress status to avoid "Claimed but not received" state if grant fails.
s.logger.Info("ClaimTier: Starting reward grant...", zap.Int64("user_id", userID), zap.Int64("task_id", taskID), zap.Int64("tier_id", tierID))
if err := s.grantTierRewards(ctx, taskID, tierID, userID, "manual_claim", 0, fmt.Sprintf("claim:%d:%d:%d", userID, taskID, tierID)); err != nil {
s.logger.Error("ClaimTier: Reward grant failed", zap.Error(err), zap.Int64("user_id", userID), zap.Int64("tier_id", tierID))
return err
}
s.logger.Info("ClaimTier: Reward granted successfully", zap.Int64("user_id", userID), zap.Int64("tier_id", tierID))
// 2. 奖励发放成功后,事务中更新领取状态
err = s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
var p tcmodel.UserTaskProgress
err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=? AND activity_id=0", userID, taskID).First(&p).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
// 实时模式兼容:自动创建进度记录用于存储已领取状态
p = tcmodel.UserTaskProgress{
UserID: userID,
TaskID: taskID,
ActivityID: 0,
ClaimedTiers: datatypes.JSON("[]"),
}
if err := tx.Create(&p).Error; err != nil {
return err
}
} else {
return err
}
}
var claimed []int64
if len(p.ClaimedTiers) > 0 {
_ = json.Unmarshal([]byte(p.ClaimedTiers), &claimed)
}
for _, id := range claimed {
if id == tierID {
return nil // 已更新状态,无需重复更新
}
}
claimed = append(claimed, tierID)
b, _ := json.Marshal(claimed)
p.ClaimedTiers = datatypes.JSON(b)
return tx.Model(&tcmodel.UserTaskProgress{}).Where("id=?", p.ID).Update("claimed_tiers", p.ClaimedTiers).Error
})
if err != nil {
s.logger.Error("ClaimTier: Failed to update status", zap.Error(err))
return err
}
s.logger.Info("ClaimTier: Status updated successfully", zap.Int64("user_id", userID), zap.Int64("tier_id", tierID))
return nil
}
func (s *service) CreateTask(ctx context.Context, in CreateTaskInput) (int64, error) {
db := s.repo.GetDbW()
row := &tcmodel.Task{Name: in.Name, Description: in.Description, Status: in.Status, StartTime: in.StartTime, EndTime: in.EndTime, Visibility: in.Visibility}
if err := db.Create(row).Error; err != nil {
return 0, err
}
return row.ID, s.invalidateCache(ctx)
}
func (s *service) ModifyTask(ctx context.Context, id int64, in ModifyTaskInput) error {
db := s.repo.GetDbW()
if err := db.Model(&tcmodel.Task{}).Where("id=?", id).Updates(map[string]any{"name": in.Name, "description": in.Description, "status": in.Status, "start_time": in.StartTime, "end_time": in.EndTime, "visibility": in.Visibility}).Error; err != nil {
return err
}
return s.invalidateCache(ctx)
}
func (s *service) DeleteTask(ctx context.Context, id int64) error {
db := s.repo.GetDbW()
if err := db.Where("id=?", id).Delete(&tcmodel.Task{}).Error; err != nil {
return err
}
return s.invalidateCache(ctx)
}
func (s *service) ListTaskTiers(ctx context.Context, taskID int64) ([]TaskTierItem, error) {
db := s.repo.GetDbR()
var rows []tcmodel.TaskTier
if err := db.Where("task_id=?", taskID).Order("priority asc, id asc").Find(&rows).Error; err != nil {
return nil, err
}
out := make([]TaskTierItem, len(rows))
for i, v := range rows {
remaining := int32(-1) // -1 表示不限
if v.Quota > 0 {
remaining = v.Quota - v.ClaimedCount
if remaining < 0 {
remaining = 0
}
}
out[i] = TaskTierItem{ID: v.ID, Metric: v.Metric, Operator: v.Operator, Threshold: v.Threshold, Window: v.Window, Repeatable: v.Repeatable, Priority: v.Priority, ActivityID: v.ActivityID, ExtraParams: v.ExtraParams, Quota: v.Quota, ClaimedCount: v.ClaimedCount, Remaining: remaining}
}
return out, nil
}
func (s *service) UpsertTaskTiers(ctx context.Context, taskID int64, tiers []TaskTierInput) error {
db := s.repo.GetDbW()
// 1. 获取现有档位
var existing []tcmodel.TaskTier
if err := db.Where("task_id=?", taskID).Find(&existing).Error; err != nil {
return err
}
existingMap := make(map[string]tcmodel.TaskTier)
for _, t := range existing {
// 使用指标+阈值+活动作为业务指纹
key := fmt.Sprintf("%s-%d-%d", t.Metric, t.Threshold, t.ActivityID)
existingMap[key] = t
}
var toDelete []int64
var toUpdate []tcmodel.TaskTier
var toCreate []tcmodel.TaskTier
processedKeys := make(map[string]struct{})
for _, t := range tiers {
key := fmt.Sprintf("%s-%d-%d", t.Metric, t.Threshold, t.ActivityID)
if old, ok := existingMap[key]; ok {
// 更新现有记录,保留 ID
old.Operator = t.Operator
old.Window = t.Window
old.Repeatable = t.Repeatable
old.Priority = t.Priority
old.ExtraParams = t.ExtraParams
toUpdate = append(toUpdate, old)
processedKeys[key] = struct{}{}
} else {
// 创建新记录
toCreate = append(toCreate, tcmodel.TaskTier{
TaskID: taskID,
Metric: t.Metric,
Operator: t.Operator,
Threshold: t.Threshold,
Window: t.Window,
Repeatable: t.Repeatable,
Priority: t.Priority,
ActivityID: t.ActivityID,
ExtraParams: t.ExtraParams,
})
}
}
for key, old := range existingMap {
if _, ok := processedKeys[key]; !ok {
toDelete = append(toDelete, old.ID)
}
}
return db.Transaction(func(tx *gorm.DB) error {
if len(toDelete) > 0 {
if err := tx.Delete(&tcmodel.TaskTier{}, toDelete).Error; err != nil {
return err
}
}
for _, t := range toUpdate {
if err := tx.Save(&t).Error; err != nil {
return err
}
}
if len(toCreate) > 0 {
if err := tx.Create(&toCreate).Error; err != nil {
return err
}
}
return nil
})
}
func (s *service) ListTaskRewards(ctx context.Context, taskID int64) ([]TaskRewardItem, error) {
db := s.repo.GetDbR()
var rows []tcmodel.TaskReward
if err := db.Where("task_id=?", taskID).Order("id asc").Find(&rows).Error; err != nil {
return nil, err
}
out := make([]TaskRewardItem, len(rows))
for i, v := range rows {
out[i] = TaskRewardItem{ID: v.ID, TierID: v.TierID, RewardType: v.RewardType, RewardPayload: v.RewardPayload, Quantity: v.Quantity}
}
return out, nil
}
func (s *service) UpsertTaskRewards(ctx context.Context, taskID int64, rewards []TaskRewardInput) error {
db := s.repo.GetDbW()
// 同理优化 ID 稳定性
var existing []tcmodel.TaskReward
if err := db.Where("task_id=?", taskID).Find(&existing).Error; err != nil {
return err
}
existingMap := make(map[string]tcmodel.TaskReward)
for _, r := range existing {
// 奖励类型+档位 ID 作为指纹
key := fmt.Sprintf("%d-%s", r.TierID, r.RewardType)
existingMap[key] = r
}
var toDelete []int64
var toUpdate []tcmodel.TaskReward
var toCreate []tcmodel.TaskReward
processedKeys := make(map[string]struct{})
for _, r := range rewards {
key := fmt.Sprintf("%d-%s", r.TierID, r.RewardType)
if old, ok := existingMap[key]; ok {
old.RewardPayload = r.RewardPayload
old.Quantity = r.Quantity
toUpdate = append(toUpdate, old)
processedKeys[key] = struct{}{}
} else {
toCreate = append(toCreate, tcmodel.TaskReward{
TaskID: taskID,
TierID: r.TierID,
RewardType: r.RewardType,
RewardPayload: r.RewardPayload,
Quantity: r.Quantity,
})
}
}
for key, old := range existingMap {
if _, ok := processedKeys[key]; !ok {
toDelete = append(toDelete, old.ID)
}
}
return db.Transaction(func(tx *gorm.DB) error {
if len(toDelete) > 0 {
if err := tx.Delete(&tcmodel.TaskReward{}, toDelete).Error; err != nil {
return err
}
}
for _, r := range toUpdate {
if err := tx.Save(&r).Error; err != nil {
return err
}
}
if len(toCreate) > 0 {
if err := tx.Create(&toCreate).Error; err != nil {
return err
}
}
return nil
})
}
func (s *service) OnOrderPaid(ctx context.Context, userID int64, orderID int64) error {
if s.queue != nil {
return s.queue.PublishOrderPaid(ctx, userID, orderID)
}
return s.processOrderPaid(ctx, userID, orderID)
}
func (s *service) processOrderPaid(ctx context.Context, userID int64, orderID int64) error {
// 1. 获取订单金额
ord, err := s.readDB.Orders.WithContext(ctx).Where(s.readDB.Orders.ID.Eq(orderID)).First()
if err != nil {
return err
}
// 1.0 状态校验与幂等性检查
// 仅处理已支付订单
if ord.Status != 2 {
s.logger.Warn("Order not paid, skip task center", zap.Int64("order_id", orderID), zap.Int32("status", ord.Status))
return nil
}
// 使用 Redis 进行 24 小时内的幂等性拦截,防止重复触发进度计算
if s.redis != nil {
lockKey := fmt.Sprintf("tc:proc:order:%d", orderID)
set, err := s.redis.SetNX(ctx, lockKey, "1", 24*time.Hour).Result()
if err != nil {
s.logger.Error("Redis idempotency check failed", zap.Error(err))
// 如果 Redis 异常,为了保险起见,我们可以选择继续处理(由数据库事务保证底层原子性,虽然非严格幂等)
// 或者返回错误。这里选择返回错误让调用方重试或记录日志。
return err
}
if !set {
s.logger.Info("Order already processed by task center", zap.Int64("order_id", orderID))
return nil
}
}
amount := ord.ActualAmount
rmk := remark.Parse(ord.Remark)
_ = rmk // 手动模式下不再需要 activityID
// 2. 更新邀请人累计金额(用于 GetUserProgress 中判断有效邀请)
// 使用事务更新 UserInvites
err = s.writeDB.Transaction(func(tx *dao.Query) error {
uInv, err := tx.UserInvites.WithContext(ctx).Clauses(clause.Locking{Strength: "UPDATE"}).Where(tx.UserInvites.InviteeID.Eq(userID)).First()
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil
}
return err
}
newAmount := uInv.AccumulatedAmount + amount
updates := map[string]any{
"accumulated_amount": newAmount,
}
_, err = tx.UserInvites.WithContext(ctx).Where(tx.UserInvites.ID.Eq(uInv.ID)).Updates(updates)
return err
})
if err != nil {
return err
}
// 手动领取模式:进度从订单表实时统计,此处不再预计算
// 仅保留邀请金额累计,用于判断有效邀请
return nil
}
func (s *service) OnInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error {
if s.queue != nil {
return s.queue.PublishInviteSuccess(ctx, inviterID, inviteeID)
}
return s.processInviteSuccess(ctx, inviterID, inviteeID)
}
func (s *service) processInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error {
// 手动领取模式:邀请数从 user_invites 表实时统计,此处不再预计算
return nil
}
func (s *service) checkAndResetDailyProgress(ctx context.Context, tx *gorm.DB, t *tcmodel.Task, p *tcmodel.UserTaskProgress) error {
isDaily := false
if len(t.Tiers) > 0 {
for _, tier := range t.Tiers {
if tier.Window == WindowDaily {
isDaily = true
break
}
}
} else {
var count int64
if err := tx.Model(&tcmodel.TaskTier{}).Where("task_id = ? AND window = ?", t.ID, WindowDaily).Count(&count).Error; err != nil {
return err
}
isDaily = count > 0
}
if !isDaily {
return nil
}
now := time.Now()
if p.UpdatedAt.IsZero() {
return nil
}
y1, m1, d1 := p.UpdatedAt.Date()
y2, m2, d2 := now.Date()
if y1 == y2 && m1 == m2 && d1 == d2 {
return nil
}
s.logger.Info("Daily progress reset",
zap.Int64("user_id", p.UserID),
zap.Int64("task_id", t.ID),
zap.Time("last_update", p.UpdatedAt),
zap.Time("now", now),
)
p.OrderCount = 0
p.OrderAmount = 0
p.InviteCount = 0
p.FirstOrder = 0
p.ClaimedTiers = datatypes.JSON("[]")
return nil
}
func (s *service) matchAndGrantExtended(ctx context.Context, t *tcmodel.Task, p *tcmodel.UserTaskProgress, sourceType string, sourceID int64, eventID string, isGlobalNewUser bool) error {
tiers := t.Tiers
if len(tiers) == 0 {
if err := s.repo.GetDbR().Where("task_id=?", t.ID).Order("priority asc").Find(&tiers).Error; err != nil {
return err
}
}
var claimed []int64
if len(p.ClaimedTiers) > 0 {
_ = json.Unmarshal([]byte(p.ClaimedTiers), &claimed)
}
claimedSet := map[int64]struct{}{}
for _, id := range claimed {
claimedSet[id] = struct{}{}
}
for _, tier := range tiers {
if _, ok := claimedSet[tier.ID]; ok {
continue
}
// 活动维度过滤:如果 Tier 指定了 ActivityID必须与进度记录中的 ActivityID 一致
if tier.ActivityID > 0 && tier.ActivityID != p.ActivityID {
continue
}
var extra struct {
NewUserOnly bool `json:"new_user_only"`
}
if len(tier.ExtraParams) > 0 {
_ = json.Unmarshal([]byte(tier.ExtraParams), &extra)
}
s.logger.Debug("Evaluating tier",
zap.Int64("tier_id", tier.ID),
zap.String("metric", tier.Metric),
zap.Int64("order_count", p.OrderCount),
zap.Int64("threshold", tier.Threshold),
zap.Int64("activity_id", tier.ActivityID),
zap.Int64("my_activity_id", p.ActivityID),
zap.Int32("first_order", p.FirstOrder),
zap.Bool("is_global_new", isGlobalNewUser),
)
hit := false
switch tier.Metric {
case MetricFirstOrder:
hit = p.FirstOrder == 1
// 如果要求新用户,则必须满足全局新用户条件
if extra.NewUserOnly && !isGlobalNewUser {
hit = false
}
case MetricOrderCount:
if tier.Operator == OperatorGTE {
hit = p.OrderCount >= tier.Threshold
} else {
hit = p.OrderCount == tier.Threshold
}
if extra.NewUserOnly && !isGlobalNewUser {
hit = false
}
case MetricOrderAmount:
if tier.Operator == OperatorGTE {
hit = p.OrderAmount >= tier.Threshold
} else {
hit = p.OrderAmount == tier.Threshold
}
if extra.NewUserOnly && !isGlobalNewUser {
hit = false
}
case MetricInviteCount:
if tier.Operator == OperatorGTE {
hit = p.InviteCount >= tier.Threshold
} else {
hit = p.InviteCount == tier.Threshold
}
}
if !hit {
s.logger.Debug("Tier not hit", zap.Int64("tier_id", tier.ID))
continue
}
s.logger.Info("Tier Hit! Granting...", zap.Int64("tier_id", tier.ID), zap.Int64("user_id", p.UserID))
if err := s.grantTierRewards(ctx, t.ID, tier.ID, p.UserID, sourceType, sourceID, eventID); err != nil {
return err
}
// 安全更新状态
err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
var latestP tcmodel.UserTaskProgress
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id=?", p.ID).First(&latestP).Error; err != nil {
return err
}
var latestClaimed []int64
if len(latestP.ClaimedTiers) > 0 {
_ = json.Unmarshal([]byte(latestP.ClaimedTiers), &latestClaimed)
}
for _, id := range latestClaimed {
if id == tier.ID {
return nil
}
}
latestClaimed = append(latestClaimed, tier.ID)
b, _ := json.Marshal(latestClaimed)
latestP.ClaimedTiers = datatypes.JSON(b)
return tx.Model(&latestP).Update("claimed_tiers", latestP.ClaimedTiers).Error
})
if err != nil {
return err
}
}
return nil
}
func (s *service) grantTierRewards(ctx context.Context, taskID int64, tierID int64, userID int64, sourceType string, sourceID int64, eventID string) error {
var rewards []tcmodel.TaskReward
if err := s.repo.GetDbR().Where("task_id=? AND tier_id=?", taskID, tierID).Find(&rewards).Error; err != nil {
return err
}
// 容错处理:如果直接根据 tier_id 找不到奖励,可能是 ID 变更导致的。
// 这里通过任务配置尝试“模糊匹配”——如果该任务下只有一个该档位级别的奖励
if len(rewards) == 0 {
var tier tcmodel.TaskTier
if err := s.repo.GetDbR().First(&tier, tierID).Error; err == nil {
s.logger.Warn("Tier ID mismatch or no rewards configured", zap.Int64("tier_id", tierID))
}
return errors.New("no rewards configured for this tier")
}
idk := fmt.Sprintf("%d:%d:%d:%s:%d", userID, taskID, tierID, sourceType, sourceID)
var exists tcmodel.TaskEventLog
if err := s.repo.GetDbR().Where("idempotency_key=?", idk).First(&exists).Error; err == nil && exists.ID > 0 {
return nil
}
tx := s.repo.GetDbW().Begin()
s.logger.Info("Granting rewards for task",
zap.Int64("user_id", userID),
zap.Int64("task_id", taskID),
zap.Int64("tier_id", tierID),
zap.String("event_id", eventID),
)
for _, r := range rewards {
var err error
switch r.RewardType {
case "points":
var pl struct {
Points int64 `json:"points"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
points := pl.Points
// 回退:如果 payload 中没有 points 字段,使用 quantity 字段
if points == 0 && r.Quantity > 0 {
points = r.Quantity
}
if points != 0 {
s.logger.Info("Granting points reward", zap.Int64("user_id", userID), zap.Int64("points", points))
err = s.userSvc.AddPoints(ctx, userID, points, "task_reward", "task_center", nil, nil)
}
case "coupon":
var pl struct {
CouponID int64 `json:"coupon_id"`
Quantity int `json:"quantity"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.CouponID > 0 {
// BUG 修复:优先使用 r.Quantity仅当 > 1 时),否则使用 payload否则默认 1
qty := 1
if r.Quantity > 1 {
qty = int(r.Quantity)
} else if pl.Quantity > 0 {
qty = pl.Quantity
} else if r.Quantity == 1 {
qty = 1 // 显式设置为 1
}
s.logger.Info("Granting coupon reward", zap.Int64("user_id", userID), zap.Int64("coupon_id", pl.CouponID), zap.Int("quantity", qty))
for i := 0; i < qty; i++ {
if err = s.userSvc.AddCoupon(ctx, userID, pl.CouponID); err != nil {
break
}
}
}
case "item_card":
var pl struct {
CardID int64 `json:"card_id"`
Quantity int `json:"quantity"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.CardID > 0 {
// BUG 修复:优先使用 r.Quantity仅当 > 1 时),否则使用 payload否则默认 1
qty := 1
if r.Quantity > 1 {
qty = int(r.Quantity)
} else if pl.Quantity > 0 {
qty = pl.Quantity
} else if r.Quantity == 1 {
qty = 1 // 显式设置为 1
}
s.logger.Info("Granting item card reward", zap.Int64("user_id", userID), zap.Int64("card_id", pl.CardID), zap.Int("quantity", qty))
err = s.userSvc.AddItemCard(ctx, userID, pl.CardID, qty)
}
case "title":
var pl struct {
TitleID int64 `json:"title_id"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.TitleID > 0 {
s.logger.Info("Granting title reward", zap.Int64("user_id", userID), zap.Int64("title_id", pl.TitleID))
err = s.titleSvc.AssignUserTitle(ctx, userID, pl.TitleID, nil, "task_center")
}
case "game_ticket":
var pl struct {
GameCode string `json:"game_code"`
Amount int `json:"amount"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.GameCode != "" {
// BUG 修复:增加对 r.Quantity 的支持,统一数量解析逻辑
amount := 1
if r.Quantity > 1 {
amount = int(r.Quantity)
} else if pl.Amount > 0 {
amount = pl.Amount
} else if r.Quantity == 1 {
amount = 1
}
s.logger.Info("Granting game ticket reward", zap.Int64("user_id", userID), zap.String("game_code", pl.GameCode), zap.Int("amount", amount))
gameSvc := gamesvc.NewTicketService(s.logger, s.repo)
err = gameSvc.GrantTicket(ctx, userID, pl.GameCode, amount, "task_center", taskID, "任务奖励")
}
case "product":
var pl struct {
ProductID int64 `json:"product_id"`
Quantity int `json:"quantity"`
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.ProductID > 0 {
// BUG 修复:优先使用 r.Quantity仅当 > 1 时),否则使用 payload否则默认 1
qty := 1
if r.Quantity > 1 {
qty = int(r.Quantity)
} else if pl.Quantity > 0 {
qty = pl.Quantity
} else if r.Quantity == 1 {
qty = 1 // 显式设置为 1
}
s.logger.Info("Granting product reward", zap.Int64("user_id", userID), zap.Int64("product_id", pl.ProductID), zap.Int("quantity", qty))
// 通过用户服务发放商品(创建待发货订单)
_, err = s.userSvc.GrantReward(ctx, userID, usersvc.GrantRewardRequest{
ProductID: pl.ProductID,
Quantity: qty,
Remark: "任务奖励",
})
}
default:
s.logger.Warn("Unknown reward type", zap.String("type", r.RewardType))
}
if err != nil {
s.logger.Error("Failed to grant reward",
zap.String("type", r.RewardType),
zap.Int64("user_id", userID),
zap.Error(err))
tx.Rollback()
return err
} else {
s.logger.Info("Successfully granted reward", zap.String("type", r.RewardType), zap.Int64("user_id", userID))
}
}
if err := tx.Create(&tcmodel.TaskEventLog{EventID: eventID, SourceType: sourceType, SourceID: sourceID, UserID: userID, TaskID: taskID, TierID: tierID, IdempotencyKey: idk, Status: "granted"}).Error; err != nil {
tx.Rollback()
return err
}
return tx.Commit().Error
}