邹方成 a7a0f639e1 feat: 新增取消发货功能并优化任务中心
fix: 修复微信通知字段截断导致的编码错误
feat: 添加有效邀请相关字段和任务中心常量
refactor: 重构一番赏奖品格位逻辑
perf: 优化道具卡列表聚合显示
docs: 更新项目说明文档和API文档
test: 添加字符串截断工具测试
2025-12-23 22:26:07 +08:00

680 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package taskcenter
import (
"bindbox-game/internal/pkg/async"
"bindbox-game/internal/pkg/logger"
"bindbox-game/internal/repository/mysql"
"bindbox-game/internal/repository/mysql/dao"
tcmodel "bindbox-game/internal/repository/mysql/task_center"
"context"
"encoding/json"
"errors"
"fmt"
"time"
titlesvc "bindbox-game/internal/service/title"
usersvc "bindbox-game/internal/service/user"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/datatypes"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type Service interface {
ListTasks(ctx context.Context, in ListTasksInput) (items []TaskItem, total int64, err error)
CreateTask(ctx context.Context, in CreateTaskInput) (int64, error)
ModifyTask(ctx context.Context, id int64, in ModifyTaskInput) error
DeleteTask(ctx context.Context, id int64) error
ListTaskTiers(ctx context.Context, taskID int64) ([]TaskTierItem, error)
UpsertTaskTiers(ctx context.Context, taskID int64, tiers []TaskTierInput) error
ListTaskRewards(ctx context.Context, taskID int64) ([]TaskRewardItem, error)
UpsertTaskRewards(ctx context.Context, taskID int64, rewards []TaskRewardInput) error
GetUserProgress(ctx context.Context, userID int64, taskID int64) (*UserProgress, error)
ClaimTier(ctx context.Context, userID int64, taskID int64, tierID int64) error
OnOrderPaid(ctx context.Context, userID int64, orderID int64) error
OnInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error
StartWorker(ctx context.Context)
}
type service struct {
logger logger.CustomLogger
readDB *dao.Query
writeDB *dao.Query
repo mysql.Repo
redis *redis.Client
queue async.TaskQueue
userSvc usersvc.Service
titleSvc titlesvc.Service
}
func New(l logger.CustomLogger, db mysql.Repo, rdb *redis.Client, userSvc usersvc.Service, titleSvc titlesvc.Service) Service {
var q async.TaskQueue
if rdb != nil {
q = async.NewRedisTaskQueue(rdb)
}
return &service{
logger: l,
readDB: dao.Use(db.GetDbR()),
writeDB: dao.Use(db.GetDbW()),
repo: db,
redis: rdb,
queue: q,
userSvc: userSvc,
titleSvc: titleSvc,
}
}
type ListTasksInput struct {
Page int
PageSize int
}
type TaskItem struct {
ID int64
Name string
Description string
Status int32
StartTime int64
EndTime int64
Visibility int32
Tiers []TaskTierItem
Rewards []TaskRewardItem
}
type UserProgress struct {
TaskID int64
UserID int64
OrderCount int64
InviteCount int64
FirstOrder bool
ClaimedTiers []int64
}
type CreateTaskInput struct {
Name string
Description string
Status int32
StartTime *time.Time
EndTime *time.Time
Visibility int32
}
type ModifyTaskInput struct {
Name string
Description string
Status int32
StartTime *time.Time
EndTime *time.Time
Visibility int32
}
type TaskTierInput struct {
Metric string
Operator string
Threshold int64
Window string
Repeatable int32
Priority int32
ExtraParams datatypes.JSON
}
type TaskTierItem struct {
ID int64 `json:"id"`
Metric string `json:"metric"`
Operator string `json:"operator"`
Threshold int64 `json:"threshold"`
Window string `json:"window"`
Repeatable int32 `json:"repeatable"`
Priority int32 `json:"priority"`
ExtraParams datatypes.JSON `json:"extra_params"`
}
type TaskRewardInput struct {
TierID int64 `json:"tier_id"`
RewardType string `json:"reward_type"`
RewardPayload datatypes.JSON `json:"reward_payload"`
Quantity int64 `json:"quantity"`
}
type TaskRewardItem struct {
ID int64 `json:"id"`
TierID int64 `json:"tier_id"`
RewardType string `json:"reward_type"`
RewardPayload datatypes.JSON `json:"reward_payload"`
Quantity int64 `json:"quantity"`
}
func (s *service) ListTasks(ctx context.Context, in ListTasksInput) (items []TaskItem, total int64, err error) {
db := s.repo.GetDbR()
var rows []tcmodel.Task
q := db.Model(&tcmodel.Task{})
if in.PageSize <= 0 {
in.PageSize = 20
}
if in.Page <= 0 {
in.Page = 1
}
if err = q.Count(&total).Error; err != nil {
return nil, 0, err
}
if err = q.Preload("Tiers", func(db *gorm.DB) *gorm.DB {
return db.Order("priority asc, id asc")
}).Preload("Rewards", func(db *gorm.DB) *gorm.DB {
return db.Order("id asc")
}).Offset((in.Page - 1) * in.PageSize).Limit(in.PageSize).Order("id desc").Find(&rows).Error; err != nil {
return nil, 0, err
}
out := make([]TaskItem, len(rows))
for i, v := range rows {
var st, et int64
if v.StartTime != nil {
st = v.StartTime.Unix()
}
if v.EndTime != nil {
et = v.EndTime.Unix()
}
out[i] = TaskItem{ID: v.ID, Name: v.Name, Description: v.Description, Status: v.Status, StartTime: st, EndTime: et, Visibility: v.Visibility}
// 填充 Tiers
out[i].Tiers = make([]TaskTierItem, len(v.Tiers))
for j, t := range v.Tiers {
out[i].Tiers[j] = TaskTierItem{ID: t.ID, Metric: t.Metric, Operator: t.Operator, Threshold: t.Threshold, Window: t.Window, Repeatable: t.Repeatable, Priority: t.Priority, ExtraParams: t.ExtraParams}
}
// 填充 Rewards
out[i].Rewards = make([]TaskRewardItem, len(v.Rewards))
for j, r := range v.Rewards {
out[i].Rewards[j] = TaskRewardItem{ID: r.ID, TierID: r.TierID, RewardType: r.RewardType, RewardPayload: r.RewardPayload, Quantity: r.Quantity}
}
}
return out, total, nil
}
func (s *service) GetUserProgress(ctx context.Context, userID int64, taskID int64) (*UserProgress, error) {
db := s.repo.GetDbR()
var row tcmodel.UserTaskProgress
if err := db.Where("user_id=? AND task_id=?", userID, taskID).First(&row).Error; err != nil {
return &UserProgress{TaskID: taskID, UserID: userID, ClaimedTiers: []int64{}}, nil
}
var claimed []int64
if len(row.ClaimedTiers) > 0 {
_ = json.Unmarshal([]byte(row.ClaimedTiers), &claimed)
}
return &UserProgress{TaskID: taskID, UserID: userID, OrderCount: row.OrderCount, InviteCount: row.InviteCount, FirstOrder: row.FirstOrder == 1, ClaimedTiers: claimed}, nil
}
func (s *service) ClaimTier(ctx context.Context, userID int64, taskID int64, tierID int64) error {
return s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
var p tcmodel.UserTaskProgress
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=?", userID, taskID).First(&p).Error; err != nil {
return errors.New("progress_not_found")
}
var claimed []int64
if len(p.ClaimedTiers) > 0 {
_ = json.Unmarshal([]byte(p.ClaimedTiers), &claimed)
}
for _, id := range claimed {
if id == tierID {
return nil
}
}
claimed = append(claimed, tierID)
b, _ := json.Marshal(claimed)
p.ClaimedTiers = datatypes.JSON(b)
return tx.Model(&tcmodel.UserTaskProgress{}).Where("id=?", p.ID).Update("claimed_tiers", p.ClaimedTiers).Error
})
}
func (s *service) CreateTask(ctx context.Context, in CreateTaskInput) (int64, error) {
db := s.repo.GetDbW()
row := &tcmodel.Task{Name: in.Name, Description: in.Description, Status: in.Status, StartTime: in.StartTime, EndTime: in.EndTime, Visibility: in.Visibility}
if err := db.Create(row).Error; err != nil {
return 0, err
}
return row.ID, s.invalidateCache(ctx)
}
func (s *service) ModifyTask(ctx context.Context, id int64, in ModifyTaskInput) error {
db := s.repo.GetDbW()
if err := db.Model(&tcmodel.Task{}).Where("id=?", id).Updates(map[string]any{"name": in.Name, "description": in.Description, "status": in.Status, "start_time": in.StartTime, "end_time": in.EndTime, "visibility": in.Visibility}).Error; err != nil {
return err
}
return s.invalidateCache(ctx)
}
func (s *service) DeleteTask(ctx context.Context, id int64) error {
db := s.repo.GetDbW()
if err := db.Where("id=?", id).Delete(&tcmodel.Task{}).Error; err != nil {
return err
}
return s.invalidateCache(ctx)
}
func (s *service) ListTaskTiers(ctx context.Context, taskID int64) ([]TaskTierItem, error) {
db := s.repo.GetDbR()
var rows []tcmodel.TaskTier
if err := db.Where("task_id=?", taskID).Order("priority asc, id asc").Find(&rows).Error; err != nil {
return nil, err
}
out := make([]TaskTierItem, len(rows))
for i, v := range rows {
out[i] = TaskTierItem{ID: v.ID, Metric: v.Metric, Operator: v.Operator, Threshold: v.Threshold, Window: v.Window, Repeatable: v.Repeatable, Priority: v.Priority, ExtraParams: v.ExtraParams}
}
return out, nil
}
func (s *service) UpsertTaskTiers(ctx context.Context, taskID int64, tiers []TaskTierInput) error {
db := s.repo.GetDbW()
if err := db.Where("task_id=?", taskID).Delete(&tcmodel.TaskTier{}).Error; err != nil {
return err
}
for _, t := range tiers {
row := &tcmodel.TaskTier{TaskID: taskID, Metric: t.Metric, Operator: t.Operator, Threshold: t.Threshold, Window: t.Window, Repeatable: t.Repeatable, Priority: t.Priority, ExtraParams: t.ExtraParams}
if err := db.Create(row).Error; err != nil {
return err
}
}
return s.invalidateCache(ctx)
}
func (s *service) ListTaskRewards(ctx context.Context, taskID int64) ([]TaskRewardItem, error) {
db := s.repo.GetDbR()
var rows []tcmodel.TaskReward
if err := db.Where("task_id=?", taskID).Order("id asc").Find(&rows).Error; err != nil {
return nil, err
}
out := make([]TaskRewardItem, len(rows))
for i, v := range rows {
out[i] = TaskRewardItem{ID: v.ID, TierID: v.TierID, RewardType: v.RewardType, RewardPayload: v.RewardPayload, Quantity: v.Quantity}
}
return out, nil
}
func (s *service) UpsertTaskRewards(ctx context.Context, taskID int64, rewards []TaskRewardInput) error {
db := s.repo.GetDbW()
if err := db.Where("task_id=?", taskID).Delete(&tcmodel.TaskReward{}).Error; err != nil {
return err
}
for _, r := range rewards {
row := &tcmodel.TaskReward{TaskID: taskID, TierID: r.TierID, RewardType: r.RewardType, RewardPayload: r.RewardPayload, Quantity: r.Quantity}
if err := db.Create(row).Error; err != nil {
return err
}
}
return s.invalidateCache(ctx)
}
func (s *service) OnOrderPaid(ctx context.Context, userID int64, orderID int64) error {
if s.queue != nil {
return s.queue.PublishOrderPaid(ctx, userID, orderID)
}
return s.processOrderPaid(ctx, userID, orderID)
}
func (s *service) processOrderPaid(ctx context.Context, userID int64, orderID int64) error {
// 1. 获取订单金额
ord, err := s.readDB.Orders.WithContext(ctx).Where(s.readDB.Orders.ID.Eq(orderID)).First()
if err != nil {
return err
}
amount := ord.ActualAmount
// 2. 更新邀请人累计金额并检查是否触发有效邀请
var inviterID int64
var oldAmount int64
var newAmount int64
// 使用事务更新 UserInvites
err = s.writeDB.Transaction(func(tx *dao.Query) error {
uInv, err := tx.UserInvites.WithContext(ctx).Clauses(clause.Locking{Strength: "UPDATE"}).Where(tx.UserInvites.InviteeID.Eq(userID)).First()
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil
}
return err
}
inviterID = uInv.InviterID
oldAmount = uInv.AccumulatedAmount
newAmount = oldAmount + amount
updates := map[string]any{
"accumulated_amount": newAmount,
}
_, err = tx.UserInvites.WithContext(ctx).Where(tx.UserInvites.ID.Eq(uInv.ID)).Updates(updates)
return err
})
if err != nil {
return err
}
// 3. 处理普通任务
tasks, err := s.getActiveTasks(ctx)
if err != nil {
return err
}
for _, t := range tasks {
// Filter tasks: Only process if it has order related metrics
hasOrderMetric := false
for _, tier := range t.Tiers {
if tier.Metric == MetricFirstOrder || tier.Metric == MetricOrderCount {
hasOrderMetric = true
break
}
}
if !hasOrderMetric {
continue
}
var p tcmodel.UserTaskProgress
err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=?", userID, t.ID).First(&p).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
p = tcmodel.UserTaskProgress{UserID: userID, TaskID: t.ID, OrderCount: 1, FirstOrder: 1}
return tx.Create(&p).Error
}
return err
}
if err := s.checkAndResetDailyProgress(ctx, tx, &t, &p); err != nil {
return err
}
p.OrderCount++
if p.OrderCount == 1 {
p.FirstOrder = 1
}
return tx.Save(&p).Error
})
if err != nil {
s.logger.Error("failed to update progress", zap.Error(err))
continue
}
if err := s.matchAndGrant(ctx, &t, &p, "order", orderID, fmt.Sprintf("ord:%d", orderID)); err != nil {
s.logger.Error("failed to grant reward", zap.Error(err))
}
}
// 4. 处理邀请人任务 (有效邀请)
if inviterID > 0 {
for _, t := range tasks {
tiers := t.Tiers
// 检查该任务是否有 effective_invite_count 类型的 Tier且是否刚好跨越阈值
triggered := false
for _, tier := range tiers {
if tier.Metric == MetricEffectiveInviteCount {
var extra struct {
AmountThreshold int64 `json:"amount_threshold"`
}
if len(tier.ExtraParams) > 0 {
_ = json.Unmarshal([]byte(tier.ExtraParams), &extra)
}
threshold := extra.AmountThreshold
if threshold <= 0 {
threshold = 1 // 默认任意金额
}
// 如果之前的累计金额未达到阈值,而现在的累计金额达到了阈值,则触发
if oldAmount < threshold && newAmount >= threshold {
triggered = true
break // 该任务触发一次即可
}
}
}
if triggered {
var pInv tcmodel.UserTaskProgress
err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=?", inviterID, t.ID).First(&pInv).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
pInv = tcmodel.UserTaskProgress{UserID: inviterID, TaskID: t.ID, EffectiveInviteCount: 1}
return tx.Create(&pInv).Error
}
return err
}
// 有效邀请通常不重置(除非是每日任务?一般是长期任务)
if err := s.checkAndResetDailyProgress(ctx, tx, &t, &pInv); err != nil {
return err
}
pInv.EffectiveInviteCount++
return tx.Save(&pInv).Error
})
if err == nil {
// 尝试发放奖励
// sourceID 使用 userID (被邀请人ID)eventID 使用特殊前缀防止混淆
_ = s.matchAndGrant(ctx, &t, &pInv, SourceTypeEffectiveInvite, userID, fmt.Sprintf("eff_inv:%d:%d", userID, t.ID))
} else {
s.logger.Error("failed to update inviter progress", zap.Error(err))
}
}
}
}
return nil
}
func (s *service) OnInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error {
if s.queue != nil {
return s.queue.PublishInviteSuccess(ctx, inviterID, inviteeID)
}
return s.processInviteSuccess(ctx, inviterID, inviteeID)
}
func (s *service) processInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error {
tasks, err := s.getActiveTasks(ctx)
if err != nil {
return err
}
for _, t := range tasks {
hasInviteMetric := false
for _, tier := range t.Tiers {
if tier.Metric == MetricInviteCount {
hasInviteMetric = true
break
}
}
if !hasInviteMetric {
continue
}
var p tcmodel.UserTaskProgress
err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=?", inviterID, t.ID).First(&p).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
p = tcmodel.UserTaskProgress{UserID: inviterID, TaskID: t.ID, InviteCount: 1}
return tx.Create(&p).Error
}
return err
}
if err := s.checkAndResetDailyProgress(ctx, tx, &t, &p); err != nil {
return err
}
p.InviteCount++
return tx.Save(&p).Error
})
if err != nil {
return err
}
if err := s.matchAndGrant(ctx, &t, &p, SourceTypeInvite, inviteeID, fmt.Sprintf("inv:%d", inviteeID)); err != nil {
return err
}
}
return nil
}
func (s *service) checkAndResetDailyProgress(ctx context.Context, tx *gorm.DB, t *tcmodel.Task, p *tcmodel.UserTaskProgress) error {
isDaily := false
if len(t.Tiers) > 0 {
for _, tier := range t.Tiers {
if tier.Window == WindowDaily {
isDaily = true
break
}
}
} else {
var count int64
if err := tx.Model(&tcmodel.TaskTier{}).Where("task_id = ? AND window = ?", t.ID, WindowDaily).Count(&count).Error; err != nil {
return err
}
isDaily = count > 0
}
if !isDaily {
return nil
}
now := time.Now()
if p.UpdatedAt.IsZero() {
return nil
}
y1, m1, d1 := p.UpdatedAt.Date()
y2, m2, d2 := now.Date()
if y1 == y2 && m1 == m2 && d1 == d2 {
return nil
}
p.OrderCount = 0
p.InviteCount = 0
p.FirstOrder = 0
p.ClaimedTiers = datatypes.JSON("[]")
return nil
}
func (s *service) matchAndGrant(ctx context.Context, t *tcmodel.Task, p *tcmodel.UserTaskProgress, sourceType string, sourceID int64, eventID string) error {
tiers := t.Tiers
if len(tiers) == 0 {
if err := s.repo.GetDbR().Where("task_id=?", t.ID).Order("priority asc").Find(&tiers).Error; err != nil {
return err
}
}
var claimed []int64
if len(p.ClaimedTiers) > 0 {
_ = json.Unmarshal([]byte(p.ClaimedTiers), &claimed)
}
claimedSet := map[int64]struct{}{}
for _, id := range claimed {
claimedSet[id] = struct{}{}
}
for _, tier := range tiers {
if _, ok := claimedSet[tier.ID]; ok {
continue
}
hit := false
switch tier.Metric {
case MetricFirstOrder:
hit = p.FirstOrder == 1
case MetricOrderCount:
if tier.Operator == OperatorGTE {
hit = p.OrderCount >= tier.Threshold
} else {
hit = p.OrderCount == tier.Threshold
}
case MetricInviteCount:
if tier.Operator == OperatorGTE {
hit = p.InviteCount >= tier.Threshold
} else {
hit = p.InviteCount == tier.Threshold
}
case MetricEffectiveInviteCount:
if tier.Operator == OperatorGTE {
hit = p.EffectiveInviteCount >= tier.Threshold
} else {
hit = p.EffectiveInviteCount == tier.Threshold
}
}
if !hit {
continue
}
if err := s.grantTierRewards(ctx, t.ID, tier.ID, p.UserID, sourceType, sourceID, eventID); err != nil {
return err
}
// 安全更新状态
err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
var latestP tcmodel.UserTaskProgress
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id=?", p.ID).First(&latestP).Error; err != nil {
return err
}
var latestClaimed []int64
if len(latestP.ClaimedTiers) > 0 {
_ = json.Unmarshal([]byte(latestP.ClaimedTiers), &latestClaimed)
}
for _, id := range latestClaimed {
if id == tier.ID {
return nil
}
}
latestClaimed = append(latestClaimed, tier.ID)
b, _ := json.Marshal(latestClaimed)
latestP.ClaimedTiers = datatypes.JSON(b)
return tx.Model(&latestP).Update("claimed_tiers", latestP.ClaimedTiers).Error
})
if err != nil {
return err
}
}
return nil
}
func (s *service) grantTierRewards(ctx context.Context, taskID int64, tierID int64, userID int64, sourceType string, sourceID int64, eventID string) error {
var rewards []tcmodel.TaskReward
if err := s.repo.GetDbR().Where("task_id=? AND tier_id=?", taskID, tierID).Find(&rewards).Error; err != nil {
return err
}
idk := fmt.Sprintf("%d:%d:%d:%s:%d", userID, taskID, tierID, sourceType, sourceID)
var exists tcmodel.TaskEventLog
if err := s.repo.GetDbR().Where("idempotency_key=?", idk).First(&exists).Error; err == nil && exists.ID > 0 {
return nil
}
tx := s.repo.GetDbW().Begin()
for _, r := range rewards {
var err error
switch r.RewardType {
case "points":
var pl struct{ Points int64 }
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.Points != 0 {
err = s.userSvc.AddPoints(ctx, userID, pl.Points, "", "task_center", nil, nil)
}
case "coupon":
var pl struct {
CouponID int64
Quantity int
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.CouponID > 0 {
qty := 1
if pl.Quantity > 0 {
qty = pl.Quantity
}
for i := 0; i < qty; i++ {
if err = s.userSvc.AddCoupon(ctx, userID, pl.CouponID); err != nil {
break
}
}
}
case "item_card":
var pl struct {
CardID int64
Quantity int
}
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.CardID > 0 {
if pl.Quantity <= 0 {
pl.Quantity = 1
}
err = s.userSvc.AddItemCard(ctx, userID, pl.CardID, pl.Quantity)
}
case "title":
var pl struct{ TitleID int64 }
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
if pl.TitleID > 0 {
err = s.titleSvc.AssignUserTitle(ctx, userID, pl.TitleID, nil, "task_center")
}
}
if err != nil {
tx.Rollback()
return err
}
}
if err := tx.Create(&tcmodel.TaskEventLog{EventID: eventID, SourceType: sourceType, SourceID: sourceID, UserID: userID, TaskID: taskID, TierID: tierID, IdempotencyKey: idk, Status: "granted"}).Error; err != nil {
tx.Rollback()
return err
}
return tx.Commit().Error
}