邹方成 a7a0f639e1 feat: 新增取消发货功能并优化任务中心
fix: 修复微信通知字段截断导致的编码错误
feat: 添加有效邀请相关字段和任务中心常量
refactor: 重构一番赏奖品格位逻辑
perf: 优化道具卡列表聚合显示
docs: 更新项目说明文档和API文档
test: 添加字符串截断工具测试
2025-12-23 22:26:07 +08:00

81 lines
2.3 KiB
Go

package taskcenter
import (
"bindbox-game/internal/pkg/async"
"context"
"encoding/json"
"time"
"go.uber.org/zap"
)
func (s *service) StartWorker(ctx context.Context) {
if s.queue == nil {
s.logger.Info("Async queue not configured, worker not started")
return
}
s.logger.Info("Task center worker started")
// Start multiple workers for concurrency
workerCount := 5
for i := 0; i < workerCount; i++ {
go s.runWorkerLoop(ctx, i)
}
}
func (s *service) runWorkerLoop(ctx context.Context, workerID int) {
defer func() {
if r := recover(); r != nil {
s.logger.Error("Task center worker panicked", zap.Any("recover", r), zap.Int("worker_id", workerID))
// Restart worker after a short delay to prevent tight loops
time.Sleep(3 * time.Second)
go s.runWorkerLoop(ctx, workerID)
}
}()
s.logger.Info("Worker routine started", zap.Int("worker_id", workerID))
for {
select {
case <-ctx.Done():
s.logger.Info("Task center worker stopping", zap.Int("worker_id", workerID))
return
default:
event, err := s.queue.Consume(ctx)
if err != nil {
s.logger.Error("Failed to consume event", zap.Error(err), zap.Int("worker_id", workerID))
time.Sleep(1 * time.Second)
continue
}
if event == nil {
continue
}
s.logger.Info("Processing event", zap.String("type", string(event.Type)), zap.Int("worker_id", workerID))
switch event.Type {
case async.EventTypeOrderPaid:
var pl async.OrderPaidPayload
if err := json.Unmarshal([]byte(event.Payload), &pl); err != nil {
s.logger.Error("Failed to unmarshal order paid payload", zap.Error(err))
continue
}
if err := s.processOrderPaid(ctx, pl.UserID, pl.OrderID); err != nil {
s.logger.Error("Failed to process order paid", zap.Error(err))
}
case async.EventTypeInviteSuccess:
var pl async.InviteSuccessPayload
if err := json.Unmarshal([]byte(event.Payload), &pl); err != nil {
s.logger.Error("Failed to unmarshal invite success payload", zap.Error(err))
continue
}
if err := s.processInviteSuccess(ctx, pl.InviterID, pl.InviteeID); err != nil {
s.logger.Error("Failed to process invite success", zap.Error(err))
}
default:
s.logger.Warn("Unknown event type", zap.String("type", string(event.Type)))
}
}
}
}