1591 lines
49 KiB
Go
Executable File
1591 lines
49 KiB
Go
Executable File
package taskcenter
|
||
|
||
import (
|
||
"bindbox-game/internal/pkg/async"
|
||
"bindbox-game/internal/pkg/logger"
|
||
"bindbox-game/internal/pkg/util/remark"
|
||
"bindbox-game/internal/repository/mysql"
|
||
"bindbox-game/internal/repository/mysql/dao"
|
||
"bindbox-game/internal/repository/mysql/model"
|
||
tcmodel "bindbox-game/internal/repository/mysql/task_center"
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"time"
|
||
|
||
gamesvc "bindbox-game/internal/service/game"
|
||
titlesvc "bindbox-game/internal/service/title"
|
||
usersvc "bindbox-game/internal/service/user"
|
||
|
||
"github.com/redis/go-redis/v9"
|
||
"go.uber.org/zap"
|
||
"gorm.io/datatypes"
|
||
"gorm.io/gorm"
|
||
"gorm.io/gorm/clause"
|
||
)
|
||
|
||
type Service interface {
|
||
ListTasks(ctx context.Context, in ListTasksInput) (items []TaskItem, total int64, err error)
|
||
CreateTask(ctx context.Context, in CreateTaskInput) (int64, error)
|
||
ModifyTask(ctx context.Context, id int64, in ModifyTaskInput) error
|
||
DeleteTask(ctx context.Context, id int64) error
|
||
ListTaskTiers(ctx context.Context, taskID int64) ([]TaskTierItem, error)
|
||
UpsertTaskTiers(ctx context.Context, taskID int64, tiers []TaskTierInput) error
|
||
ListTaskRewards(ctx context.Context, taskID int64) ([]TaskRewardItem, error)
|
||
UpsertTaskRewards(ctx context.Context, taskID int64, rewards []TaskRewardInput, deleteIDs []int64) error
|
||
GetUserProgress(ctx context.Context, userID int64, taskID int64) (*UserProgress, error)
|
||
ClaimTier(ctx context.Context, userID int64, taskID int64, tierID int64) error
|
||
OnOrderPaid(ctx context.Context, userID int64, orderID int64) error
|
||
OnInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error
|
||
StartWorker(ctx context.Context)
|
||
}
|
||
|
||
type service struct {
|
||
logger logger.CustomLogger
|
||
readDB *dao.Query
|
||
writeDB *dao.Query
|
||
repo mysql.Repo
|
||
redis *redis.Client
|
||
queue async.TaskQueue
|
||
userSvc usersvc.Service
|
||
titleSvc titlesvc.Service
|
||
}
|
||
|
||
func New(l logger.CustomLogger, db mysql.Repo, rdb *redis.Client, userSvc usersvc.Service, titleSvc titlesvc.Service) Service {
|
||
var q async.TaskQueue
|
||
if rdb != nil {
|
||
q = async.NewRedisTaskQueue(rdb)
|
||
}
|
||
return &service{
|
||
logger: l,
|
||
readDB: dao.Use(db.GetDbR()),
|
||
writeDB: dao.Use(db.GetDbW()),
|
||
repo: db,
|
||
redis: rdb,
|
||
queue: q,
|
||
userSvc: userSvc,
|
||
titleSvc: titleSvc,
|
||
}
|
||
}
|
||
|
||
type ListTasksInput struct {
|
||
Page int
|
||
PageSize int
|
||
OnlyActive bool // 是否只返回有效期内的任务
|
||
}
|
||
|
||
type TaskItem struct {
|
||
ID int64
|
||
Name string
|
||
Description string
|
||
Status int32
|
||
StartTime int64
|
||
EndTime int64
|
||
Visibility int32
|
||
Quota int32
|
||
ClaimedCount int32
|
||
Tiers []TaskTierItem
|
||
Rewards []TaskRewardItem
|
||
}
|
||
|
||
// TierProgress 记录单个档位在其配置窗口内的独立统计进度
|
||
type TierProgress struct {
|
||
TierID int64 `json:"tier_id"`
|
||
OrderCount int64 `json:"order_count"`
|
||
OrderAmount int64 `json:"order_amount"`
|
||
InviteCount int64 `json:"invite_count"`
|
||
FirstOrder bool `json:"first_order"`
|
||
}
|
||
|
||
type UserProgress struct {
|
||
TaskID int64 `json:"task_id"`
|
||
UserID int64 `json:"user_id"`
|
||
OrderCount int64 `json:"order_count"`
|
||
OrderAmount int64 `json:"order_amount"`
|
||
InviteCount int64 `json:"invite_count"`
|
||
FirstOrder bool `json:"first_order"`
|
||
ClaimedTiers []int64 `json:"claimed_tiers"`
|
||
SubProgress []ActivityProgress `json:"sub_progress"` // 各活动独立进度(向后兼容)
|
||
TierProgressMap map[int64]TierProgress `json:"tier_progress_map"` // 每个 Tier 的窗口化独立进度
|
||
}
|
||
|
||
type ActivityProgress struct {
|
||
ActivityID int64 `json:"activity_id"`
|
||
OrderCount int64 `json:"order_count"`
|
||
OrderAmount int64 `json:"order_amount"`
|
||
}
|
||
|
||
type CreateTaskInput struct {
|
||
Name string
|
||
Description string
|
||
Status int32
|
||
StartTime *time.Time
|
||
EndTime *time.Time
|
||
Visibility int32
|
||
Quota int32
|
||
}
|
||
|
||
type ModifyTaskInput struct {
|
||
Name string
|
||
Description string
|
||
Status int32
|
||
StartTime *time.Time
|
||
EndTime *time.Time
|
||
Visibility int32
|
||
Quota int32
|
||
}
|
||
|
||
type TaskTierInput struct {
|
||
Metric string
|
||
Operator string
|
||
Threshold int64
|
||
Window string
|
||
Repeatable int32
|
||
Priority int32
|
||
ActivityID int64
|
||
ExtraParams datatypes.JSON
|
||
}
|
||
|
||
type TaskTierItem struct {
|
||
ID int64 `json:"id"`
|
||
Metric string `json:"metric"`
|
||
Operator string `json:"operator"`
|
||
Threshold int64 `json:"threshold"`
|
||
Window string `json:"window"`
|
||
Repeatable int32 `json:"repeatable"`
|
||
Priority int32 `json:"priority"`
|
||
ActivityID int64 `json:"activity_id"`
|
||
ExtraParams datatypes.JSON `json:"extra_params"`
|
||
Quota int32 `json:"quota"` // 总限额,0表示不限
|
||
ClaimedCount int32 `json:"claimed_count"` // 已领取数
|
||
Remaining int32 `json:"remaining"` // 剩余可领,-1表示不限
|
||
}
|
||
|
||
type TaskRewardInput struct {
|
||
ID int64 `json:"id"`
|
||
TierID int64 `json:"tier_id"`
|
||
RewardType string `json:"reward_type"`
|
||
RewardPayload datatypes.JSON `json:"reward_payload"`
|
||
Quantity int64 `json:"quantity"`
|
||
}
|
||
|
||
type TaskRewardItem struct {
|
||
ID int64 `json:"id"`
|
||
TierID int64 `json:"tier_id"`
|
||
RewardType string `json:"reward_type"`
|
||
RewardPayload datatypes.JSON `json:"reward_payload"`
|
||
Quantity int64 `json:"quantity"`
|
||
RewardName string `json:"reward_name"`
|
||
}
|
||
|
||
type orderMetricRow struct {
|
||
OrderID int64
|
||
ActivityID int64
|
||
DrawCount int64
|
||
TicketPrice int64
|
||
TotalAmount int64
|
||
}
|
||
|
||
var allowedWindows = map[string]struct{}{
|
||
WindowDaily: {},
|
||
WindowWeekly: {},
|
||
WindowMonthly: {},
|
||
WindowLifetime: {},
|
||
WindowActivityPeriod: {},
|
||
WindowSinceRegistration: {},
|
||
}
|
||
|
||
func normalizeWindow(value string) string {
|
||
if value == "" {
|
||
return WindowLifetime
|
||
}
|
||
if _, ok := allowedWindows[value]; !ok {
|
||
return WindowLifetime
|
||
}
|
||
return value
|
||
}
|
||
|
||
func normalizeWindowStrict(value string) (string, error) {
|
||
if value == "" {
|
||
return WindowLifetime, nil
|
||
}
|
||
if _, ok := allowedWindows[value]; !ok {
|
||
return "", fmt.Errorf("invalid window value: %s", value)
|
||
}
|
||
return value, nil
|
||
}
|
||
|
||
func tierFingerprint(metric string, threshold int64, activityID int64, window string) string {
|
||
return fmt.Sprintf("%s-%d-%d-%s", metric, threshold, activityID, window)
|
||
}
|
||
|
||
func (s *service) fetchOrderMetricRows(ctx context.Context, userID int64, activityIDs []int64, start, end *time.Time) ([]orderMetricRow, error) {
|
||
query := s.repo.GetDbR().WithContext(ctx).Table(model.TableNameOrders).
|
||
Select("orders.id AS order_id, activity_issues.activity_id AS activity_id, COUNT(activity_draw_logs.id) AS draw_count, COALESCE(activities.price_draw, 0) AS ticket_price, orders.total_amount").
|
||
Joins("JOIN activity_draw_logs ON activity_draw_logs.order_id = orders.id").
|
||
Joins("JOIN activity_issues ON activity_issues.id = activity_draw_logs.issue_id").
|
||
Joins("LEFT JOIN activities ON activities.id = activity_issues.activity_id").
|
||
Where("orders.user_id = ? AND orders.status = 2 AND orders.source_type != 1", userID).
|
||
Group("orders.id, activity_issues.activity_id, activities.price_draw, orders.total_amount")
|
||
|
||
if len(activityIDs) > 0 {
|
||
query = query.Where("activity_issues.activity_id IN ?", activityIDs)
|
||
}
|
||
if start != nil {
|
||
query = query.Where("orders.created_at >= ?", *start)
|
||
}
|
||
if end != nil {
|
||
query = query.Where("orders.created_at <= ?", *end)
|
||
}
|
||
|
||
var rows []orderMetricRow
|
||
if err := query.Scan(&rows).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
return rows, nil
|
||
}
|
||
|
||
func (s *service) calculateEffectiveAmount(row orderMetricRow) int64 {
|
||
if row.TicketPrice > 0 && row.DrawCount > 0 {
|
||
return row.TicketPrice * row.DrawCount
|
||
}
|
||
if row.TotalAmount > 0 {
|
||
if s.logger != nil && row.TicketPrice == 0 {
|
||
s.logger.Warn("task center: missing ticket price snapshot, fallback to order amount",
|
||
zap.Int64("order_id", row.OrderID),
|
||
zap.Int64("activity_id", row.ActivityID))
|
||
}
|
||
return row.TotalAmount
|
||
}
|
||
return 0
|
||
}
|
||
|
||
func (s *service) aggregateOrderMetrics(rows []orderMetricRow, perActivity bool) (count int64, amount int64) {
|
||
if perActivity {
|
||
for _, row := range rows {
|
||
amount += s.calculateEffectiveAmount(row)
|
||
}
|
||
return int64(len(rows)), amount
|
||
}
|
||
|
||
seen := make(map[int64]struct{})
|
||
for _, row := range rows {
|
||
amount += s.calculateEffectiveAmount(row)
|
||
if _, ok := seen[row.OrderID]; !ok {
|
||
seen[row.OrderID] = struct{}{}
|
||
count++
|
||
}
|
||
}
|
||
return count, amount
|
||
}
|
||
|
||
func (s *service) countInvites(ctx context.Context, inviterID int64, activityID int64, start, end *time.Time) (int64, error) {
|
||
db := s.repo.GetDbR().WithContext(ctx)
|
||
var count int64
|
||
|
||
if activityID > 0 {
|
||
query := `
|
||
SELECT COUNT(DISTINCT ui.invitee_id)
|
||
FROM user_invites ui
|
||
INNER JOIN orders o ON o.user_id = ui.invitee_id AND o.status = 2 AND o.source_type != 1
|
||
INNER JOIN activity_draw_logs dl ON dl.order_id = o.id
|
||
INNER JOIN activity_issues ai ON ai.id = dl.issue_id
|
||
WHERE ui.inviter_id = ? AND ai.activity_id = ?
|
||
`
|
||
args := []interface{}{inviterID, activityID}
|
||
if start != nil {
|
||
query += " AND o.created_at >= ?"
|
||
args = append(args, *start)
|
||
}
|
||
if end != nil {
|
||
query += " AND o.created_at <= ?"
|
||
args = append(args, *end)
|
||
}
|
||
if err := db.Raw(query, args...).Scan(&count).Error; err != nil {
|
||
return 0, err
|
||
}
|
||
return count, nil
|
||
}
|
||
|
||
query := db.Model(&model.UserInvites{}).Where("inviter_id = ?", inviterID)
|
||
if start != nil {
|
||
query = query.Where("created_at >= ?", *start)
|
||
}
|
||
if end != nil {
|
||
query = query.Where("created_at <= ?", *end)
|
||
}
|
||
if err := query.Count(&count).Error; err != nil {
|
||
return 0, err
|
||
}
|
||
return count, nil
|
||
}
|
||
|
||
func (s *service) countInvitesForActivities(ctx context.Context, inviterID int64, activityIDs []int64) (int64, error) {
|
||
db := s.repo.GetDbR().WithContext(ctx)
|
||
var count int64
|
||
|
||
if len(activityIDs) == 0 {
|
||
if err := db.Model(&model.UserInvites{}).Where("inviter_id = ?", inviterID).Count(&count).Error; err != nil {
|
||
return 0, err
|
||
}
|
||
return count, nil
|
||
}
|
||
|
||
if err := db.Raw(`
|
||
SELECT COUNT(DISTINCT ui.invitee_id)
|
||
FROM user_invites ui
|
||
INNER JOIN orders o ON o.user_id = ui.invitee_id AND o.status = 2 AND o.source_type != 1
|
||
INNER JOIN activity_draw_logs dl ON dl.order_id = o.id
|
||
INNER JOIN activity_issues ai ON ai.id = dl.issue_id
|
||
WHERE ui.inviter_id = ? AND ai.activity_id IN (?)
|
||
`, inviterID, activityIDs).Scan(&count).Error; err != nil {
|
||
return 0, err
|
||
}
|
||
return count, nil
|
||
}
|
||
|
||
func (s *service) ListTasks(ctx context.Context, in ListTasksInput) (items []TaskItem, total int64, err error) {
|
||
db := s.repo.GetDbR()
|
||
var rows []tcmodel.Task
|
||
q := db.Model(&tcmodel.Task{})
|
||
// 过滤条件
|
||
if in.OnlyActive {
|
||
now := time.Now()
|
||
q = q.Where("status = ? AND visibility = ?", 1, 1)
|
||
q = q.Where("(start_time IS NULL OR start_time <= ?) AND (end_time IS NULL OR end_time >= ?)", now, now)
|
||
} else {
|
||
// 管理后台默认也至少基于启用状态看?或者去掉限制以便管理全量
|
||
// 维持原有逻辑:
|
||
q = q.Where("status = ? AND visibility = ?", 1, 1)
|
||
}
|
||
|
||
if in.PageSize <= 0 {
|
||
in.PageSize = 20
|
||
}
|
||
if in.Page <= 0 {
|
||
in.Page = 1
|
||
}
|
||
if err = q.Count(&total).Error; err != nil {
|
||
return nil, 0, err
|
||
}
|
||
if err = q.Preload("Tiers", func(db *gorm.DB) *gorm.DB {
|
||
return db.Order("priority asc, id asc")
|
||
}).Preload("Rewards", func(db *gorm.DB) *gorm.DB {
|
||
return db.Order("id asc")
|
||
}).Offset((in.Page - 1) * in.PageSize).Limit(in.PageSize).Order("id desc").Find(&rows).Error; err != nil {
|
||
return nil, 0, err
|
||
}
|
||
out := make([]TaskItem, len(rows))
|
||
|
||
// Pre-calculation: collect IDs for batch lookup
|
||
var couponIDs []int64
|
||
var itemCardIDs []int64
|
||
var titleIDs []int64
|
||
for _, v := range rows {
|
||
for _, r := range v.Rewards {
|
||
switch r.RewardType {
|
||
case RewardTypeCoupon:
|
||
var pl struct {
|
||
CouponID int64 `json:"coupon_id"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.CouponID > 0 {
|
||
couponIDs = append(couponIDs, pl.CouponID)
|
||
}
|
||
case RewardTypeItemCard:
|
||
var pl struct {
|
||
CardID int64 `json:"card_id"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.CardID > 0 {
|
||
itemCardIDs = append(itemCardIDs, pl.CardID)
|
||
}
|
||
case RewardTypeTitle:
|
||
var pl struct {
|
||
TitleID int64 `json:"title_id"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.TitleID > 0 {
|
||
titleIDs = append(titleIDs, pl.TitleID)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// Batch fetch names
|
||
couponMap := make(map[int64]string)
|
||
if len(couponIDs) > 0 {
|
||
var list []model.SystemCoupons
|
||
if err := db.Select("id, name").Find(&list, couponIDs).Error; err == nil {
|
||
for _, v := range list {
|
||
couponMap[v.ID] = v.Name
|
||
}
|
||
}
|
||
}
|
||
itemCardMap := make(map[int64]string)
|
||
if len(itemCardIDs) > 0 {
|
||
var list []model.SystemItemCards
|
||
if err := db.Select("id, name").Find(&list, itemCardIDs).Error; err == nil {
|
||
for _, v := range list {
|
||
itemCardMap[v.ID] = v.Name
|
||
}
|
||
}
|
||
}
|
||
titleMap := make(map[int64]string)
|
||
if len(titleIDs) > 0 {
|
||
var list []model.SystemTitles
|
||
if err := db.Select("id, name").Find(&list, titleIDs).Error; err == nil {
|
||
for _, v := range list {
|
||
titleMap[v.ID] = v.Name
|
||
}
|
||
}
|
||
}
|
||
|
||
for i, v := range rows {
|
||
var st, et int64
|
||
if v.StartTime != nil {
|
||
st = v.StartTime.Unix()
|
||
}
|
||
if v.EndTime != nil {
|
||
et = v.EndTime.Unix()
|
||
}
|
||
out[i] = TaskItem{ID: v.ID, Name: v.Name, Description: v.Description, Status: v.Status, StartTime: st, EndTime: et, Visibility: v.Visibility, Quota: v.Quota, ClaimedCount: v.ClaimedCount}
|
||
// 填充 Tiers
|
||
out[i].Tiers = make([]TaskTierItem, len(v.Tiers))
|
||
for j, t := range v.Tiers {
|
||
remaining := int32(-1) // -1 表示不限
|
||
if t.Quota > 0 {
|
||
remaining = t.Quota - t.ClaimedCount
|
||
if remaining < 0 {
|
||
remaining = 0
|
||
}
|
||
}
|
||
out[i].Tiers[j] = TaskTierItem{ID: t.ID, Metric: t.Metric, Operator: t.Operator, Threshold: t.Threshold, Window: normalizeWindow(t.Window), Repeatable: t.Repeatable, Priority: t.Priority, ActivityID: t.ActivityID, ExtraParams: t.ExtraParams, Quota: t.Quota, ClaimedCount: t.ClaimedCount, Remaining: remaining}
|
||
}
|
||
// 填充 Rewards
|
||
out[i].Rewards = make([]TaskRewardItem, len(v.Rewards))
|
||
for j, r := range v.Rewards {
|
||
name := ""
|
||
switch r.RewardType {
|
||
case RewardTypePoints:
|
||
name = "积分"
|
||
case RewardTypeGameTicket:
|
||
name = "抽奖券"
|
||
case RewardTypeCoupon:
|
||
var pl struct {
|
||
CouponID int64 `json:"coupon_id"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
name = couponMap[pl.CouponID]
|
||
case RewardTypeItemCard:
|
||
var pl struct {
|
||
CardID int64 `json:"card_id"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
name = itemCardMap[pl.CardID]
|
||
case RewardTypeTitle:
|
||
var pl struct {
|
||
TitleID int64 `json:"title_id"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
name = titleMap[pl.TitleID]
|
||
}
|
||
out[i].Rewards[j] = TaskRewardItem{ID: r.ID, TierID: r.TierID, RewardType: r.RewardType, RewardPayload: r.RewardPayload, Quantity: r.Quantity, RewardName: name}
|
||
}
|
||
}
|
||
return out, total, nil
|
||
}
|
||
|
||
// computeTimeWindow 根据 window 配置计算时间范围
|
||
// 返回 (windowStart, windowEnd),nil 表示该端不限制
|
||
// 重要:所有窗口类型都受任务时间约束,防止历史数据被用于领取新任务
|
||
func computeTimeWindow(window string, taskStart, taskEnd *time.Time) (start *time.Time, end *time.Time) {
|
||
now := time.Now()
|
||
switch window {
|
||
case WindowDaily:
|
||
s := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
|
||
return applyTaskBounds(&s, &now, taskStart, taskEnd)
|
||
case WindowWeekly:
|
||
weekday := int(now.Weekday())
|
||
if weekday == 0 {
|
||
weekday = 7
|
||
}
|
||
s := now.AddDate(0, 0, -(weekday - 1))
|
||
s = time.Date(s.Year(), s.Month(), s.Day(), 0, 0, 0, 0, s.Location())
|
||
return applyTaskBounds(&s, &now, taskStart, taskEnd)
|
||
case WindowMonthly:
|
||
s := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
|
||
return applyTaskBounds(&s, &now, taskStart, taskEnd)
|
||
case WindowActivityPeriod:
|
||
// 使用任务级别的 StartTime / EndTime,nil 端不加限制
|
||
return taskStart, taskEnd
|
||
default:
|
||
// lifetime / since_registration / 空值 / 未知值
|
||
// CRITICAL FIX: 受任务时间约束,防止历史数据领取新任务
|
||
return taskStart, taskEnd
|
||
}
|
||
}
|
||
|
||
// applyTaskBounds 将窗口时间与任务时间取交集,确保不超过任务有效期
|
||
func applyTaskBounds(windowStart, windowEnd, taskStart, taskEnd *time.Time) (*time.Time, *time.Time) {
|
||
start := windowStart
|
||
end := windowEnd
|
||
|
||
// 如果任务开始时间晚于窗口开始时间,使用任务开始时间
|
||
if taskStart != nil && (start == nil || taskStart.After(*start)) {
|
||
start = taskStart
|
||
}
|
||
|
||
// 如果任务结束时间早于窗口结束时间,使用任务结束时间
|
||
if taskEnd != nil && (end == nil || taskEnd.Before(*end)) {
|
||
end = taskEnd
|
||
}
|
||
|
||
return start, end
|
||
}
|
||
|
||
func (s *service) GetUserProgress(ctx context.Context, userID int64, taskID int64) (*UserProgress, error) {
|
||
db := s.repo.GetDbR()
|
||
|
||
var task tcmodel.Task
|
||
if err := db.First(&task, taskID).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
var tiers []tcmodel.TaskTier
|
||
if err := db.Where("task_id = ?", taskID).Find(&tiers).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
targetActivityIDs := make([]int64, 0)
|
||
seenActivity := make(map[int64]struct{})
|
||
for _, t := range tiers {
|
||
if t.ActivityID > 0 {
|
||
if _, ok := seenActivity[t.ActivityID]; !ok {
|
||
seenActivity[t.ActivityID] = struct{}{}
|
||
targetActivityIDs = append(targetActivityIDs, t.ActivityID)
|
||
}
|
||
}
|
||
}
|
||
|
||
type windowGroupKey struct {
|
||
Window string
|
||
ActivityID int64
|
||
}
|
||
groupMap := make(map[windowGroupKey][]tcmodel.TaskTier)
|
||
for _, t := range tiers {
|
||
window := normalizeWindow(t.Window)
|
||
t.Window = window
|
||
key := windowGroupKey{Window: window, ActivityID: t.ActivityID}
|
||
groupMap[key] = append(groupMap[key], t)
|
||
}
|
||
|
||
tierProgressMap := make(map[int64]TierProgress)
|
||
for wk, groupTiers := range groupMap {
|
||
wStart, wEnd := computeTimeWindow(wk.Window, task.StartTime, task.EndTime)
|
||
var activityIDs []int64
|
||
perActivity := false
|
||
if wk.ActivityID > 0 {
|
||
activityIDs = []int64{wk.ActivityID}
|
||
perActivity = true
|
||
}
|
||
rows, err := s.fetchOrderMetricRows(ctx, userID, activityIDs, wStart, wEnd)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
orderCount, orderAmount := s.aggregateOrderMetrics(rows, perActivity)
|
||
inviteCount, err := s.countInvites(ctx, userID, wk.ActivityID, wStart, wEnd)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
for _, tier := range groupTiers {
|
||
tierProgressMap[tier.ID] = TierProgress{
|
||
TierID: tier.ID,
|
||
OrderCount: orderCount,
|
||
OrderAmount: orderAmount,
|
||
InviteCount: inviteCount,
|
||
FirstOrder: orderCount > 0,
|
||
}
|
||
}
|
||
}
|
||
|
||
var (
|
||
allRows []orderMetricRow
|
||
err error
|
||
)
|
||
if len(targetActivityIDs) > 0 {
|
||
allRows, err = s.fetchOrderMetricRows(ctx, userID, targetActivityIDs, nil, nil)
|
||
} else {
|
||
allRows, err = s.fetchOrderMetricRows(ctx, userID, nil, nil, nil)
|
||
}
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
orderCount, orderAmount := s.aggregateOrderMetrics(allRows, false)
|
||
|
||
var subProgressList []ActivityProgress
|
||
if len(targetActivityIDs) > 0 {
|
||
subStats := make(map[int64]ActivityProgress)
|
||
for _, row := range allRows {
|
||
if row.ActivityID == 0 {
|
||
continue
|
||
}
|
||
stat := subStats[row.ActivityID]
|
||
stat.ActivityID = row.ActivityID
|
||
stat.OrderCount++
|
||
stat.OrderAmount += s.calculateEffectiveAmount(row)
|
||
subStats[row.ActivityID] = stat
|
||
}
|
||
subProgressList = make([]ActivityProgress, 0, len(targetActivityIDs))
|
||
for _, actID := range targetActivityIDs {
|
||
if stat, ok := subStats[actID]; ok {
|
||
subProgressList = append(subProgressList, stat)
|
||
}
|
||
}
|
||
}
|
||
|
||
inviteCount, err := s.countInvitesForActivities(ctx, userID, targetActivityIDs)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
var progressRows []tcmodel.UserTaskProgress
|
||
if err := db.Where("user_id=? AND task_id=?", userID, taskID).Find(&progressRows).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
claimedSet := map[int64]struct{}{}
|
||
for _, row := range progressRows {
|
||
var claimed []int64
|
||
if len(row.ClaimedTiers) > 0 {
|
||
_ = json.Unmarshal([]byte(row.ClaimedTiers), &claimed)
|
||
}
|
||
for _, id := range claimed {
|
||
claimedSet[id] = struct{}{}
|
||
}
|
||
}
|
||
|
||
allClaimed := make([]int64, 0, len(claimedSet))
|
||
for id := range claimedSet {
|
||
allClaimed = append(allClaimed, id)
|
||
}
|
||
|
||
hasFirstOrder := orderCount > 0
|
||
|
||
return &UserProgress{
|
||
TaskID: taskID,
|
||
UserID: userID,
|
||
OrderCount: orderCount,
|
||
OrderAmount: orderAmount,
|
||
InviteCount: inviteCount,
|
||
FirstOrder: hasFirstOrder,
|
||
ClaimedTiers: allClaimed,
|
||
SubProgress: subProgressList,
|
||
TierProgressMap: tierProgressMap,
|
||
}, nil
|
||
}
|
||
|
||
func (s *service) ClaimTier(ctx context.Context, userID int64, taskID int64, tierID int64) error {
|
||
// BUG FIX: 增加前置校验,确保用户真的完成了该档位任务
|
||
progress, err := s.GetUserProgress(ctx, userID, taskID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 获取档位配置
|
||
var tier tcmodel.TaskTier
|
||
if err := s.repo.GetDbR().First(&tier, tierID).Error; err != nil {
|
||
return err
|
||
}
|
||
|
||
var task tcmodel.Task
|
||
if err := s.repo.GetDbR().First(&task, taskID).Error; err != nil {
|
||
return err
|
||
}
|
||
|
||
// CRITICAL FIX: 为所有档位添加 Redis 分布式锁,防止并发重复领取
|
||
// - activityID > 0: 使用 activity_id 作为锁键(同活动跨任务场景)
|
||
// - activityID = 0: 使用 task_id 作为锁键(全局档位场景)
|
||
if s.redis != nil {
|
||
var claimLockKey string
|
||
if tier.ActivityID > 0 {
|
||
claimLockKey = fmt.Sprintf("tc:claim_lock:%d:%d", userID, tier.ActivityID)
|
||
} else {
|
||
claimLockKey = fmt.Sprintf("tc:claim_lock_task:%d:%d", userID, taskID)
|
||
}
|
||
locked, lockErr := s.redis.SetNX(ctx, claimLockKey, "1", 10*time.Second).Result()
|
||
if lockErr != nil {
|
||
s.logger.Error("ClaimTier: Redis lock error",
|
||
zap.Error(lockErr),
|
||
zap.Int64("user_id", userID),
|
||
zap.Int64("activity_id", tier.ActivityID),
|
||
zap.Int64("task_id", taskID))
|
||
return lockErr
|
||
}
|
||
if !locked {
|
||
return errors.New("操作频繁,请稍后再试")
|
||
}
|
||
defer s.redis.Del(ctx, claimLockKey)
|
||
}
|
||
|
||
// 校验是否达标
|
||
// Bug1 修复:优先使用 TierProgressMap(窗口化进度),回退到全局进度
|
||
hit := false
|
||
|
||
var currentOrderCount, currentOrderAmount, currentInviteCount int64
|
||
if tp, ok := progress.TierProgressMap[tierID]; ok {
|
||
// 使用该 tier 所配置 window 内的独立统计值
|
||
currentOrderCount = tp.OrderCount
|
||
currentOrderAmount = tp.OrderAmount
|
||
currentInviteCount = tp.InviteCount
|
||
} else if tier.ActivityID > 0 {
|
||
// 回退:从 SubProgress 中找对应活动的进度
|
||
for _, sub := range progress.SubProgress {
|
||
if sub.ActivityID == tier.ActivityID {
|
||
currentOrderCount = sub.OrderCount
|
||
currentOrderAmount = sub.OrderAmount
|
||
break
|
||
}
|
||
}
|
||
} else {
|
||
currentOrderCount = progress.OrderCount
|
||
currentOrderAmount = progress.OrderAmount
|
||
currentInviteCount = progress.InviteCount
|
||
}
|
||
|
||
var currentValue int64
|
||
switch tier.Metric {
|
||
case MetricFirstOrder:
|
||
hit = progress.FirstOrder
|
||
case MetricOrderCount:
|
||
if tier.Operator == OperatorGTE {
|
||
hit = currentOrderCount >= tier.Threshold
|
||
} else {
|
||
hit = currentOrderCount == tier.Threshold
|
||
}
|
||
currentValue = currentOrderCount
|
||
case MetricOrderAmount:
|
||
if tier.Operator == OperatorGTE {
|
||
hit = currentOrderAmount >= tier.Threshold
|
||
} else {
|
||
hit = currentOrderAmount == tier.Threshold
|
||
}
|
||
currentValue = currentOrderAmount
|
||
case MetricInviteCount:
|
||
if tier.Operator == OperatorGTE {
|
||
hit = currentInviteCount >= tier.Threshold
|
||
} else {
|
||
hit = currentInviteCount == tier.Threshold
|
||
}
|
||
currentValue = currentInviteCount
|
||
}
|
||
|
||
// BUG2 FIX: 跨任务累加校验 —— 防止多任务共享同一 activityID 订单池,用户用同一批订单重复领多个任务奖励
|
||
// 规则:同一 activityID + 同一 metric 下,不同 taskID 间各取已领最大 threshold 后求和,
|
||
// 要求 currentValue >= consumedThreshold(已消耗)+ tier.Threshold(本次需消耗)
|
||
if tier.ActivityID > 0 && (tier.Metric == MetricOrderCount || tier.Metric == MetricOrderAmount || tier.Metric == MetricInviteCount) {
|
||
consumedThreshold, err := s.calculateCrossTaskConsumedThreshold(userID, &task, &tier)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if currentValue < consumedThreshold+tier.Threshold {
|
||
s.logger.Warn("ClaimTier: cross-task threshold validation failed",
|
||
zap.Int64("user_id", userID),
|
||
zap.Int64("task_id", taskID),
|
||
zap.Int64("tier_id", tierID),
|
||
zap.Int64("current_value", currentValue),
|
||
zap.Int64("consumed_threshold", consumedThreshold),
|
||
zap.Int64("tier_threshold", tier.Threshold),
|
||
)
|
||
return errors.New("订单量不足,已被其他任务消耗,无法领取")
|
||
}
|
||
}
|
||
|
||
if !hit {
|
||
return errors.New("任务条件未达成,无法领取")
|
||
}
|
||
|
||
// 1.5 校验任务有效期
|
||
now := time.Now()
|
||
if task.StartTime != nil && now.Before(*task.StartTime) {
|
||
return errors.New("任务尚未开始")
|
||
}
|
||
if task.EndTime != nil && now.After(*task.EndTime) {
|
||
return errors.New("任务已经结束")
|
||
}
|
||
|
||
if task.Quota > 0 {
|
||
result := s.repo.GetDbW().Model(&tcmodel.Task{}).
|
||
Where("id = ? AND claimed_count < quota", taskID).
|
||
Update("claimed_count", gorm.Expr("claimed_count + 1"))
|
||
if result.Error != nil {
|
||
return result.Error
|
||
}
|
||
if result.RowsAffected == 0 {
|
||
return errors.New("奖励已领完")
|
||
}
|
||
s.logger.Info("ClaimTier: Task quota check passed", zap.Int64("task_id", taskID), zap.Int32("quota", task.Quota))
|
||
}
|
||
|
||
// 3. 先尝试发放奖励 (grantTierRewards 内部有幂等校验)
|
||
// IDK logic inside grantTierRewards ensures we don't double grant.
|
||
// We use "manual_claim" as source type.
|
||
// IMPORTANT: Call this BEFORE updating the progress status to avoid "Claimed but not received" state if grant fails.
|
||
s.logger.Info("ClaimTier: Starting reward grant...", zap.Int64("user_id", userID), zap.Int64("task_id", taskID), zap.Int64("tier_id", tierID))
|
||
if err := s.grantTierRewards(ctx, taskID, tierID, userID, "manual_claim", 0, fmt.Sprintf("claim:%d:%d:%d", userID, taskID, tierID)); err != nil {
|
||
s.logger.Error("ClaimTier: Reward grant failed", zap.Error(err), zap.Int64("user_id", userID), zap.Int64("tier_id", tierID))
|
||
return err
|
||
}
|
||
s.logger.Info("ClaimTier: Reward granted successfully", zap.Int64("user_id", userID), zap.Int64("tier_id", tierID))
|
||
|
||
// 2. 奖励发放成功后,事务中更新领取状态
|
||
err = s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
|
||
var p tcmodel.UserTaskProgress
|
||
err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("user_id=? AND task_id=? AND activity_id=0", userID, taskID).First(&p).Error
|
||
if err != nil {
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
// 实时模式兼容:自动创建进度记录用于存储已领取状态
|
||
p = tcmodel.UserTaskProgress{
|
||
UserID: userID,
|
||
TaskID: taskID,
|
||
ActivityID: 0,
|
||
ClaimedTiers: datatypes.JSON("[]"),
|
||
}
|
||
if err := tx.Create(&p).Error; err != nil {
|
||
return err
|
||
}
|
||
} else {
|
||
return err
|
||
}
|
||
}
|
||
var claimed []int64
|
||
if len(p.ClaimedTiers) > 0 {
|
||
_ = json.Unmarshal([]byte(p.ClaimedTiers), &claimed)
|
||
}
|
||
for _, id := range claimed {
|
||
if id == tierID {
|
||
return nil // 已更新状态,无需重复更新
|
||
}
|
||
}
|
||
claimed = append(claimed, tierID)
|
||
b, _ := json.Marshal(claimed)
|
||
p.ClaimedTiers = datatypes.JSON(b)
|
||
return tx.Model(&tcmodel.UserTaskProgress{}).Where("id=?", p.ID).Update("claimed_tiers", p.ClaimedTiers).Error
|
||
})
|
||
if err != nil {
|
||
s.logger.Error("ClaimTier: Failed to update status", zap.Error(err))
|
||
return err
|
||
}
|
||
s.logger.Info("ClaimTier: Status updated successfully", zap.Int64("user_id", userID), zap.Int64("tier_id", tierID))
|
||
return nil
|
||
}
|
||
|
||
type siblingTierRow struct {
|
||
TierID int64 `gorm:"column:tier_id"`
|
||
TaskID int64 `gorm:"column:task_id"`
|
||
Threshold int64 `gorm:"column:threshold"`
|
||
TaskStart *time.Time `gorm:"column:task_start"`
|
||
TaskEnd *time.Time `gorm:"column:task_end"`
|
||
TaskCreated time.Time `gorm:"column:task_created"`
|
||
}
|
||
|
||
func tasksOverlapWindow(aStart, aEnd, bStart, bEnd *time.Time) bool {
|
||
if aStart != nil && bEnd != nil && (aStart.Equal(*bEnd) || aStart.After(*bEnd)) {
|
||
return false
|
||
}
|
||
if bStart != nil && aEnd != nil && (bStart.Equal(*aEnd) || bStart.After(*aEnd)) {
|
||
return false
|
||
}
|
||
return true
|
||
}
|
||
|
||
func (s *service) calculateCrossTaskConsumedThreshold(userID int64, task *tcmodel.Task, tier *tcmodel.TaskTier) (int64, error) {
|
||
tierTable := tcmodel.TaskTier{}.TableName()
|
||
taskTable := tcmodel.Task{}.TableName()
|
||
|
||
var siblingRows []siblingTierRow
|
||
if err := s.repo.GetDbR().
|
||
Table(tierTable+" AS tiers").
|
||
Select("tiers.id AS tier_id, tiers.task_id AS task_id, tiers.threshold, tasks.start_time AS task_start, tasks.end_time AS task_end, tasks.created_at AS task_created").
|
||
Joins(fmt.Sprintf("JOIN %s AS tasks ON tasks.id = tiers.task_id", taskTable)).
|
||
Where("tiers.activity_id = ? AND tiers.metric = ? AND tiers.task_id <> ?", tier.ActivityID, tier.Metric, task.ID).
|
||
Find(&siblingRows).Error; err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
tierThreshold := make(map[int64]int64)
|
||
siblingTaskSet := make(map[int64]struct{})
|
||
taskStart := chooseTaskStart(task.StartTime, task.CreatedAt)
|
||
for _, row := range siblingRows {
|
||
if row.TaskCreated.Before(task.CreatedAt) {
|
||
continue
|
||
}
|
||
if !tasksOverlapWindow(taskStart, task.EndTime, chooseTaskStart(row.TaskStart, row.TaskCreated), row.TaskEnd) {
|
||
continue
|
||
}
|
||
tierThreshold[row.TierID] = row.Threshold
|
||
siblingTaskSet[row.TaskID] = struct{}{}
|
||
}
|
||
if len(siblingTaskSet) == 0 {
|
||
return 0, nil
|
||
}
|
||
|
||
taskIDs := make([]int64, 0, len(siblingTaskSet))
|
||
for id := range siblingTaskSet {
|
||
taskIDs = append(taskIDs, id)
|
||
}
|
||
|
||
var siblingProgresses []tcmodel.UserTaskProgress
|
||
if err := s.repo.GetDbR().
|
||
Where("user_id = ? AND task_id IN ? AND activity_id = 0", userID, taskIDs).
|
||
Find(&siblingProgresses).Error; err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
perTaskMax := make(map[int64]int64)
|
||
for _, sp := range siblingProgresses {
|
||
var claimedIDs []int64
|
||
if len(sp.ClaimedTiers) > 0 {
|
||
_ = json.Unmarshal([]byte(sp.ClaimedTiers), &claimedIDs)
|
||
}
|
||
var maxThreshold int64
|
||
for _, id := range claimedIDs {
|
||
if th, ok := tierThreshold[id]; ok && th > maxThreshold {
|
||
maxThreshold = th
|
||
}
|
||
}
|
||
if maxThreshold > 0 {
|
||
if prev, ok := perTaskMax[sp.TaskID]; !ok || maxThreshold > prev {
|
||
perTaskMax[sp.TaskID] = maxThreshold
|
||
}
|
||
}
|
||
}
|
||
|
||
var consumed int64
|
||
for _, val := range perTaskMax {
|
||
consumed += val
|
||
}
|
||
return consumed, nil
|
||
}
|
||
|
||
func chooseTaskStart(start *time.Time, created time.Time) *time.Time {
|
||
if start != nil {
|
||
return start
|
||
}
|
||
if created.IsZero() {
|
||
return nil
|
||
}
|
||
c := created
|
||
return &c
|
||
}
|
||
|
||
func (s *service) CreateTask(ctx context.Context, in CreateTaskInput) (int64, error) {
|
||
db := s.repo.GetDbW()
|
||
row := &tcmodel.Task{Name: in.Name, Description: in.Description, Status: in.Status, StartTime: in.StartTime, EndTime: in.EndTime, Visibility: in.Visibility, Quota: in.Quota, ClaimedCount: 0}
|
||
if err := db.Create(row).Error; err != nil {
|
||
return 0, err
|
||
}
|
||
return row.ID, s.invalidateCache(ctx)
|
||
}
|
||
|
||
func (s *service) ModifyTask(ctx context.Context, id int64, in ModifyTaskInput) error {
|
||
db := s.repo.GetDbW()
|
||
if err := db.Model(&tcmodel.Task{}).Where("id=?", id).Updates(map[string]any{"name": in.Name, "description": in.Description, "status": in.Status, "start_time": in.StartTime, "end_time": in.EndTime, "visibility": in.Visibility, "quota": in.Quota}).Error; err != nil {
|
||
return err
|
||
}
|
||
return s.invalidateCache(ctx)
|
||
}
|
||
|
||
func (s *service) DeleteTask(ctx context.Context, id int64) error {
|
||
db := s.repo.GetDbW()
|
||
if err := db.Where("id=?", id).Delete(&tcmodel.Task{}).Error; err != nil {
|
||
return err
|
||
}
|
||
return s.invalidateCache(ctx)
|
||
}
|
||
|
||
func (s *service) ListTaskTiers(ctx context.Context, taskID int64) ([]TaskTierItem, error) {
|
||
db := s.repo.GetDbR()
|
||
var rows []tcmodel.TaskTier
|
||
if err := db.Where("task_id=?", taskID).Order("priority asc, id asc").Find(&rows).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
out := make([]TaskTierItem, len(rows))
|
||
for i, v := range rows {
|
||
remaining := int32(-1) // -1 表示不限
|
||
if v.Quota > 0 {
|
||
remaining = v.Quota - v.ClaimedCount
|
||
if remaining < 0 {
|
||
remaining = 0
|
||
}
|
||
}
|
||
out[i] = TaskTierItem{ID: v.ID, Metric: v.Metric, Operator: v.Operator, Threshold: v.Threshold, Window: normalizeWindow(v.Window), Repeatable: v.Repeatable, Priority: v.Priority, ActivityID: v.ActivityID, ExtraParams: v.ExtraParams, Quota: v.Quota, ClaimedCount: v.ClaimedCount, Remaining: remaining}
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
func (s *service) UpsertTaskTiers(ctx context.Context, taskID int64, tiers []TaskTierInput) error {
|
||
db := s.repo.GetDbW()
|
||
// 1. 获取现有档位
|
||
var existing []tcmodel.TaskTier
|
||
if err := db.Where("task_id=?", taskID).Find(&existing).Error; err != nil {
|
||
return err
|
||
}
|
||
|
||
existingMap := make(map[string]tcmodel.TaskTier)
|
||
for _, t := range existing {
|
||
window := normalizeWindow(t.Window)
|
||
t.Window = window
|
||
key := tierFingerprint(t.Metric, t.Threshold, t.ActivityID, window)
|
||
existingMap[key] = t
|
||
}
|
||
|
||
var toDelete []int64
|
||
var toUpdate []tcmodel.TaskTier
|
||
var toCreate []tcmodel.TaskTier
|
||
|
||
processedKeys := make(map[string]struct{})
|
||
for _, t := range tiers {
|
||
window, err := normalizeWindowStrict(t.Window)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
key := tierFingerprint(t.Metric, t.Threshold, t.ActivityID, window)
|
||
if old, ok := existingMap[key]; ok {
|
||
// 更新现有记录,保留 ID 和 ClaimedCount
|
||
old.Operator = t.Operator
|
||
old.Window = window
|
||
old.Repeatable = t.Repeatable
|
||
old.Priority = t.Priority
|
||
old.ExtraParams = t.ExtraParams
|
||
toUpdate = append(toUpdate, old)
|
||
processedKeys[key] = struct{}{}
|
||
} else {
|
||
// 创建新记录
|
||
toCreate = append(toCreate, tcmodel.TaskTier{
|
||
TaskID: taskID,
|
||
Metric: t.Metric,
|
||
Operator: t.Operator,
|
||
Threshold: t.Threshold,
|
||
Window: window,
|
||
Repeatable: t.Repeatable,
|
||
Priority: t.Priority,
|
||
ActivityID: t.ActivityID,
|
||
ExtraParams: t.ExtraParams,
|
||
})
|
||
}
|
||
}
|
||
|
||
for key, old := range existingMap {
|
||
if _, ok := processedKeys[key]; !ok {
|
||
toDelete = append(toDelete, old.ID)
|
||
}
|
||
}
|
||
|
||
return db.Transaction(func(tx *gorm.DB) error {
|
||
if len(toDelete) > 0 {
|
||
if err := tx.Delete(&tcmodel.TaskTier{}, toDelete).Error; err != nil {
|
||
return err
|
||
}
|
||
}
|
||
for _, t := range toUpdate {
|
||
if err := tx.Save(&t).Error; err != nil {
|
||
return err
|
||
}
|
||
}
|
||
if len(toCreate) > 0 {
|
||
if err := tx.Create(&toCreate).Error; err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
})
|
||
}
|
||
|
||
func (s *service) ListTaskRewards(ctx context.Context, taskID int64) ([]TaskRewardItem, error) {
|
||
db := s.repo.GetDbR()
|
||
var rows []tcmodel.TaskReward
|
||
if err := db.Where("task_id=?", taskID).Order("id asc").Find(&rows).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
out := make([]TaskRewardItem, len(rows))
|
||
for i, v := range rows {
|
||
out[i] = TaskRewardItem{ID: v.ID, TierID: v.TierID, RewardType: v.RewardType, RewardPayload: v.RewardPayload, Quantity: v.Quantity}
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
func (s *service) UpsertTaskRewards(ctx context.Context, taskID int64, rewards []TaskRewardInput, deleteIDs []int64) error {
|
||
db := s.repo.GetDbW()
|
||
|
||
var existing []tcmodel.TaskReward
|
||
if err := db.Where("task_id=?", taskID).Find(&existing).Error; err != nil {
|
||
return err
|
||
}
|
||
|
||
existingByID := make(map[int64]tcmodel.TaskReward, len(existing))
|
||
for _, r := range existing {
|
||
existingByID[r.ID] = r
|
||
}
|
||
|
||
var toUpdate []tcmodel.TaskReward
|
||
var toCreate []tcmodel.TaskReward
|
||
seen := make(map[int64]struct{})
|
||
|
||
for _, r := range rewards {
|
||
if r.ID > 0 {
|
||
old, ok := existingByID[r.ID]
|
||
if !ok || old.TaskID != taskID {
|
||
return fmt.Errorf("reward %d not found", r.ID)
|
||
}
|
||
old.TierID = r.TierID
|
||
old.RewardType = r.RewardType
|
||
old.RewardPayload = r.RewardPayload
|
||
old.Quantity = r.Quantity
|
||
toUpdate = append(toUpdate, old)
|
||
seen[r.ID] = struct{}{}
|
||
continue
|
||
}
|
||
|
||
toCreate = append(toCreate, tcmodel.TaskReward{
|
||
TaskID: taskID,
|
||
TierID: r.TierID,
|
||
RewardType: r.RewardType,
|
||
RewardPayload: r.RewardPayload,
|
||
Quantity: r.Quantity,
|
||
})
|
||
}
|
||
|
||
var toDelete []int64
|
||
if len(deleteIDs) > 0 {
|
||
for _, id := range deleteIDs {
|
||
if reward, ok := existingByID[id]; ok {
|
||
toDelete = append(toDelete, reward.ID)
|
||
}
|
||
}
|
||
} else {
|
||
for id := range existingByID {
|
||
if _, ok := seen[id]; !ok {
|
||
toDelete = append(toDelete, id)
|
||
}
|
||
}
|
||
}
|
||
|
||
return db.Transaction(func(tx *gorm.DB) error {
|
||
if len(toDelete) > 0 {
|
||
if err := tx.Delete(&tcmodel.TaskReward{}, toDelete).Error; err != nil {
|
||
return err
|
||
}
|
||
}
|
||
for _, r := range toUpdate {
|
||
if err := tx.Save(&r).Error; err != nil {
|
||
return err
|
||
}
|
||
}
|
||
if len(toCreate) > 0 {
|
||
if err := tx.Create(&toCreate).Error; err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
})
|
||
}
|
||
|
||
func (s *service) OnOrderPaid(ctx context.Context, userID int64, orderID int64) error {
|
||
if s.queue != nil {
|
||
return s.queue.PublishOrderPaid(ctx, userID, orderID)
|
||
}
|
||
return s.processOrderPaid(ctx, userID, orderID)
|
||
}
|
||
|
||
func (s *service) processOrderPaid(ctx context.Context, userID int64, orderID int64) error {
|
||
// 1. 获取订单金额
|
||
ord, err := s.readDB.Orders.WithContext(ctx).Where(s.readDB.Orders.ID.Eq(orderID)).First()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 1.0 状态校验与幂等性检查
|
||
// 仅处理已支付订单
|
||
if ord.Status != 2 {
|
||
s.logger.Warn("Order not paid, skip task center", zap.Int64("order_id", orderID), zap.Int32("status", ord.Status))
|
||
return nil
|
||
}
|
||
|
||
// 使用 Redis 进行 24 小时内的幂等性拦截,防止重复触发进度计算
|
||
if s.redis != nil {
|
||
lockKey := fmt.Sprintf("tc:proc:order:%d", orderID)
|
||
set, err := s.redis.SetNX(ctx, lockKey, "1", 24*time.Hour).Result()
|
||
if err != nil {
|
||
s.logger.Error("Redis idempotency check failed", zap.Error(err))
|
||
// 如果 Redis 异常,为了保险起见,我们可以选择继续处理(由数据库事务保证底层原子性,虽然非严格幂等)
|
||
// 或者返回错误。这里选择返回错误让调用方重试或记录日志。
|
||
return err
|
||
}
|
||
if !set {
|
||
s.logger.Info("Order already processed by task center", zap.Int64("order_id", orderID))
|
||
return nil
|
||
}
|
||
}
|
||
|
||
amount := ord.ActualAmount
|
||
rmk := remark.Parse(ord.Remark)
|
||
_ = rmk // 手动模式下不再需要 activityID
|
||
|
||
// 2. 更新邀请人累计金额(用于 GetUserProgress 中判断有效邀请)
|
||
|
||
// 使用事务更新 UserInvites
|
||
err = s.writeDB.Transaction(func(tx *dao.Query) error {
|
||
uInv, err := tx.UserInvites.WithContext(ctx).Clauses(clause.Locking{Strength: "UPDATE"}).Where(tx.UserInvites.InviteeID.Eq(userID)).First()
|
||
if err != nil {
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
return nil
|
||
}
|
||
return err
|
||
}
|
||
newAmount := uInv.AccumulatedAmount + amount
|
||
updates := map[string]any{
|
||
"accumulated_amount": newAmount,
|
||
}
|
||
_, err = tx.UserInvites.WithContext(ctx).Where(tx.UserInvites.ID.Eq(uInv.ID)).Updates(updates)
|
||
return err
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 手动领取模式:进度从订单表实时统计,此处不再预计算
|
||
// 仅保留邀请金额累计,用于判断有效邀请
|
||
return nil
|
||
}
|
||
|
||
func (s *service) OnInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error {
|
||
if s.queue != nil {
|
||
return s.queue.PublishInviteSuccess(ctx, inviterID, inviteeID)
|
||
}
|
||
return s.processInviteSuccess(ctx, inviterID, inviteeID)
|
||
}
|
||
|
||
func (s *service) processInviteSuccess(ctx context.Context, inviterID int64, inviteeID int64) error {
|
||
// 手动领取模式:邀请数从 user_invites 表实时统计,此处不再预计算
|
||
return nil
|
||
}
|
||
|
||
func (s *service) checkAndResetDailyProgress(ctx context.Context, tx *gorm.DB, t *tcmodel.Task, p *tcmodel.UserTaskProgress) error {
|
||
isDaily := false
|
||
if len(t.Tiers) > 0 {
|
||
for _, tier := range t.Tiers {
|
||
if tier.Window == WindowDaily {
|
||
isDaily = true
|
||
break
|
||
}
|
||
}
|
||
} else {
|
||
var count int64
|
||
if err := tx.Model(&tcmodel.TaskTier{}).Where("task_id = ? AND window = ?", t.ID, WindowDaily).Count(&count).Error; err != nil {
|
||
return err
|
||
}
|
||
isDaily = count > 0
|
||
}
|
||
|
||
if !isDaily {
|
||
return nil
|
||
}
|
||
now := time.Now()
|
||
if p.UpdatedAt.IsZero() {
|
||
return nil
|
||
}
|
||
y1, m1, d1 := p.UpdatedAt.Date()
|
||
y2, m2, d2 := now.Date()
|
||
if y1 == y2 && m1 == m2 && d1 == d2 {
|
||
return nil
|
||
}
|
||
s.logger.Info("Daily progress reset",
|
||
zap.Int64("user_id", p.UserID),
|
||
zap.Int64("task_id", t.ID),
|
||
zap.Time("last_update", p.UpdatedAt),
|
||
zap.Time("now", now),
|
||
)
|
||
p.OrderCount = 0
|
||
p.OrderAmount = 0
|
||
p.InviteCount = 0
|
||
p.FirstOrder = 0
|
||
p.ClaimedTiers = datatypes.JSON("[]")
|
||
return nil
|
||
}
|
||
|
||
func (s *service) matchAndGrantExtended(ctx context.Context, t *tcmodel.Task, p *tcmodel.UserTaskProgress, sourceType string, sourceID int64, eventID string, isGlobalNewUser bool) error {
|
||
tiers := t.Tiers
|
||
if len(tiers) == 0 {
|
||
if err := s.repo.GetDbR().Where("task_id=?", t.ID).Order("priority asc").Find(&tiers).Error; err != nil {
|
||
return err
|
||
}
|
||
}
|
||
var claimed []int64
|
||
if len(p.ClaimedTiers) > 0 {
|
||
_ = json.Unmarshal([]byte(p.ClaimedTiers), &claimed)
|
||
}
|
||
claimedSet := map[int64]struct{}{}
|
||
for _, id := range claimed {
|
||
claimedSet[id] = struct{}{}
|
||
}
|
||
for _, tier := range tiers {
|
||
if _, ok := claimedSet[tier.ID]; ok {
|
||
continue
|
||
}
|
||
|
||
// 活动维度过滤:如果 Tier 指定了 ActivityID,必须与进度记录中的 ActivityID 一致
|
||
if tier.ActivityID > 0 && tier.ActivityID != p.ActivityID {
|
||
continue
|
||
}
|
||
|
||
var extra struct {
|
||
NewUserOnly bool `json:"new_user_only"`
|
||
}
|
||
if len(tier.ExtraParams) > 0 {
|
||
_ = json.Unmarshal([]byte(tier.ExtraParams), &extra)
|
||
}
|
||
|
||
s.logger.Debug("Evaluating tier",
|
||
zap.Int64("tier_id", tier.ID),
|
||
zap.String("metric", tier.Metric),
|
||
zap.Int64("order_count", p.OrderCount),
|
||
zap.Int64("threshold", tier.Threshold),
|
||
zap.Int64("activity_id", tier.ActivityID),
|
||
zap.Int64("my_activity_id", p.ActivityID),
|
||
zap.Int32("first_order", p.FirstOrder),
|
||
zap.Bool("is_global_new", isGlobalNewUser),
|
||
)
|
||
|
||
hit := false
|
||
switch tier.Metric {
|
||
case MetricFirstOrder:
|
||
hit = p.FirstOrder == 1
|
||
// 如果要求新用户,则必须满足全局新用户条件
|
||
if extra.NewUserOnly && !isGlobalNewUser {
|
||
hit = false
|
||
}
|
||
case MetricOrderCount:
|
||
if tier.Operator == OperatorGTE {
|
||
hit = p.OrderCount >= tier.Threshold
|
||
} else {
|
||
hit = p.OrderCount == tier.Threshold
|
||
}
|
||
if extra.NewUserOnly && !isGlobalNewUser {
|
||
hit = false
|
||
}
|
||
case MetricOrderAmount:
|
||
if tier.Operator == OperatorGTE {
|
||
hit = p.OrderAmount >= tier.Threshold
|
||
} else {
|
||
hit = p.OrderAmount == tier.Threshold
|
||
}
|
||
if extra.NewUserOnly && !isGlobalNewUser {
|
||
hit = false
|
||
}
|
||
case MetricInviteCount:
|
||
if tier.Operator == OperatorGTE {
|
||
hit = p.InviteCount >= tier.Threshold
|
||
} else {
|
||
hit = p.InviteCount == tier.Threshold
|
||
}
|
||
}
|
||
if !hit {
|
||
s.logger.Debug("Tier not hit", zap.Int64("tier_id", tier.ID))
|
||
continue
|
||
}
|
||
s.logger.Info("Tier Hit! Granting...", zap.Int64("tier_id", tier.ID), zap.Int64("user_id", p.UserID))
|
||
if err := s.grantTierRewards(ctx, t.ID, tier.ID, p.UserID, sourceType, sourceID, eventID); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 安全更新状态
|
||
err := s.repo.GetDbW().Transaction(func(tx *gorm.DB) error {
|
||
var latestP tcmodel.UserTaskProgress
|
||
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id=?", p.ID).First(&latestP).Error; err != nil {
|
||
return err
|
||
}
|
||
var latestClaimed []int64
|
||
if len(latestP.ClaimedTiers) > 0 {
|
||
_ = json.Unmarshal([]byte(latestP.ClaimedTiers), &latestClaimed)
|
||
}
|
||
for _, id := range latestClaimed {
|
||
if id == tier.ID {
|
||
return nil
|
||
}
|
||
}
|
||
latestClaimed = append(latestClaimed, tier.ID)
|
||
b, _ := json.Marshal(latestClaimed)
|
||
latestP.ClaimedTiers = datatypes.JSON(b)
|
||
return tx.Model(&latestP).Update("claimed_tiers", latestP.ClaimedTiers).Error
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *service) grantTierRewards(ctx context.Context, taskID int64, tierID int64, userID int64, sourceType string, sourceID int64, eventID string) error {
|
||
var rewards []tcmodel.TaskReward
|
||
if err := s.repo.GetDbR().Where("task_id=? AND tier_id=?", taskID, tierID).Find(&rewards).Error; err != nil {
|
||
return err
|
||
}
|
||
|
||
// 容错处理:如果直接根据 tier_id 找不到奖励,可能是 ID 变更导致的。
|
||
// 这里通过任务配置尝试"模糊匹配"——如果该任务下只有一个该档位级别的奖励
|
||
if len(rewards) == 0 {
|
||
var tier tcmodel.TaskTier
|
||
if err := s.repo.GetDbR().First(&tier, tierID).Error; err == nil {
|
||
s.logger.Warn("Tier ID mismatch or no rewards configured", zap.Int64("tier_id", tierID))
|
||
}
|
||
return errors.New("no rewards configured for this tier")
|
||
}
|
||
|
||
idk := fmt.Sprintf("%d:%d:%d:%s:%d", userID, taskID, tierID, sourceType, sourceID)
|
||
var exists tcmodel.TaskEventLog
|
||
if err := s.repo.GetDbR().Where("idempotency_key=?", idk).First(&exists).Error; err == nil && exists.ID > 0 {
|
||
return nil
|
||
}
|
||
tx := s.repo.GetDbW().Begin()
|
||
s.logger.Info("Granting rewards for task",
|
||
zap.Int64("user_id", userID),
|
||
zap.Int64("task_id", taskID),
|
||
zap.Int64("tier_id", tierID),
|
||
zap.String("event_id", eventID),
|
||
)
|
||
for _, r := range rewards {
|
||
var err error
|
||
switch r.RewardType {
|
||
case "points":
|
||
var pl struct {
|
||
Points int64 `json:"points"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
points := pl.Points
|
||
// 回退:如果 payload 中没有 points 字段,使用 quantity 字段
|
||
if points == 0 && r.Quantity > 0 {
|
||
points = r.Quantity
|
||
}
|
||
if points != 0 {
|
||
s.logger.Info("Granting points reward", zap.Int64("user_id", userID), zap.Int64("points", points))
|
||
err = s.userSvc.AddPoints(ctx, userID, points, "task_reward", "task_center", nil, nil)
|
||
}
|
||
case "coupon":
|
||
var pl struct {
|
||
CouponID int64 `json:"coupon_id"`
|
||
Quantity int `json:"quantity"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.CouponID > 0 {
|
||
// BUG 修复:优先使用 r.Quantity(仅当 > 1 时),否则使用 payload,否则默认 1
|
||
qty := 1
|
||
if r.Quantity > 1 {
|
||
qty = int(r.Quantity)
|
||
} else if pl.Quantity > 0 {
|
||
qty = pl.Quantity
|
||
} else if r.Quantity == 1 {
|
||
qty = 1 // 显式设置为 1
|
||
}
|
||
s.logger.Info("Granting coupon reward", zap.Int64("user_id", userID), zap.Int64("coupon_id", pl.CouponID), zap.Int("quantity", qty))
|
||
for i := 0; i < qty; i++ {
|
||
if err = s.userSvc.AddCoupon(ctx, userID, pl.CouponID); err != nil {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
case "item_card":
|
||
var pl struct {
|
||
CardID int64 `json:"card_id"`
|
||
Quantity int `json:"quantity"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.CardID > 0 {
|
||
// BUG 修复:优先使用 r.Quantity(仅当 > 1 时),否则使用 payload,否则默认 1
|
||
qty := 1
|
||
if r.Quantity > 1 {
|
||
qty = int(r.Quantity)
|
||
} else if pl.Quantity > 0 {
|
||
qty = pl.Quantity
|
||
} else if r.Quantity == 1 {
|
||
qty = 1 // 显式设置为 1
|
||
}
|
||
s.logger.Info("Granting item card reward", zap.Int64("user_id", userID), zap.Int64("card_id", pl.CardID), zap.Int("quantity", qty))
|
||
err = s.userSvc.AddItemCard(ctx, userID, pl.CardID, qty)
|
||
}
|
||
case "title":
|
||
var pl struct {
|
||
TitleID int64 `json:"title_id"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.TitleID > 0 {
|
||
s.logger.Info("Granting title reward", zap.Int64("user_id", userID), zap.Int64("title_id", pl.TitleID))
|
||
err = s.titleSvc.AssignUserTitle(ctx, userID, pl.TitleID, nil, "task_center")
|
||
}
|
||
case "game_ticket":
|
||
var pl struct {
|
||
GameCode string `json:"game_code"`
|
||
Amount int `json:"amount"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.GameCode != "" {
|
||
// BUG 修复:增加对 r.Quantity 的支持,统一数量解析逻辑
|
||
amount := 1
|
||
if r.Quantity > 1 {
|
||
amount = int(r.Quantity)
|
||
} else if pl.Amount > 0 {
|
||
amount = pl.Amount
|
||
} else if r.Quantity == 1 {
|
||
amount = 1
|
||
}
|
||
s.logger.Info("Granting game ticket reward", zap.Int64("user_id", userID), zap.String("game_code", pl.GameCode), zap.Int("amount", amount))
|
||
gameSvc := gamesvc.NewTicketService(s.logger, s.repo)
|
||
err = gameSvc.GrantTicket(ctx, userID, pl.GameCode, amount, "task_center", taskID, "任务奖励")
|
||
}
|
||
case "product":
|
||
var pl struct {
|
||
ProductID int64 `json:"product_id"`
|
||
Quantity int `json:"quantity"`
|
||
}
|
||
_ = json.Unmarshal([]byte(r.RewardPayload), &pl)
|
||
if pl.ProductID > 0 {
|
||
// BUG 修复:优先使用 r.Quantity(仅当 > 1 时),否则使用 payload,否则默认 1
|
||
qty := 1
|
||
if r.Quantity > 1 {
|
||
qty = int(r.Quantity)
|
||
} else if pl.Quantity > 0 {
|
||
qty = pl.Quantity
|
||
} else if r.Quantity == 1 {
|
||
qty = 1 // 显式设置为 1
|
||
}
|
||
s.logger.Info("Granting product reward", zap.Int64("user_id", userID), zap.Int64("product_id", pl.ProductID), zap.Int("quantity", qty))
|
||
// 通过用户服务发放商品(创建待发货订单)
|
||
_, err = s.userSvc.GrantReward(ctx, userID, usersvc.GrantRewardRequest{
|
||
ProductID: pl.ProductID,
|
||
Quantity: qty,
|
||
Remark: "任务奖励",
|
||
})
|
||
}
|
||
default:
|
||
s.logger.Warn("Unknown reward type", zap.String("type", r.RewardType))
|
||
}
|
||
if err != nil {
|
||
s.logger.Error("Failed to grant reward",
|
||
zap.String("type", r.RewardType),
|
||
zap.Int64("user_id", userID),
|
||
zap.Error(err))
|
||
tx.Rollback()
|
||
return err
|
||
} else {
|
||
s.logger.Info("Successfully granted reward", zap.String("type", r.RewardType), zap.Int64("user_id", userID))
|
||
}
|
||
}
|
||
if err := tx.Create(&tcmodel.TaskEventLog{EventID: eventID, SourceType: sourceType, SourceID: sourceID, UserID: userID, TaskID: taskID, TierID: tierID, IdempotencyKey: idk, Status: "granted"}).Error; err != nil {
|
||
tx.Rollback()
|
||
return err
|
||
}
|
||
return tx.Commit().Error
|
||
}
|