package taskcenter import ( "bindbox-game/internal/pkg/async" "context" "encoding/json" "time" "go.uber.org/zap" ) func (s *service) StartWorker(ctx context.Context) { if s.queue == nil { s.logger.Info("Async queue not configured, worker not started") return } s.logger.Info("Task center worker started") // Start multiple workers for concurrency workerCount := 5 for i := 0; i < workerCount; i++ { go s.runWorkerLoop(ctx, i) } } func (s *service) runWorkerLoop(ctx context.Context, workerID int) { defer func() { if r := recover(); r != nil { s.logger.Error("Task center worker panicked", zap.Any("recover", r), zap.Int("worker_id", workerID)) // Restart worker after a short delay to prevent tight loops time.Sleep(3 * time.Second) go s.runWorkerLoop(ctx, workerID) } }() s.logger.Info("Worker routine started", zap.Int("worker_id", workerID)) for { select { case <-ctx.Done(): s.logger.Info("Task center worker stopping", zap.Int("worker_id", workerID)) return default: event, err := s.queue.Consume(ctx) if err != nil { s.logger.Error("Failed to consume event", zap.Error(err), zap.Int("worker_id", workerID)) time.Sleep(1 * time.Second) continue } if event == nil { continue } s.logger.Info("Processing event", zap.String("type", string(event.Type)), zap.Int("worker_id", workerID)) switch event.Type { case async.EventTypeOrderPaid: var pl async.OrderPaidPayload if err := json.Unmarshal([]byte(event.Payload), &pl); err != nil { s.logger.Error("Failed to unmarshal order paid payload", zap.Error(err)) continue } if err := s.processOrderPaid(ctx, pl.UserID, pl.OrderID); err != nil { s.logger.Error("Failed to process order paid", zap.Error(err)) } case async.EventTypeInviteSuccess: var pl async.InviteSuccessPayload if err := json.Unmarshal([]byte(event.Payload), &pl); err != nil { s.logger.Error("Failed to unmarshal invite success payload", zap.Error(err)) continue } if err := s.processInviteSuccess(ctx, pl.InviterID, pl.InviteeID); err != nil { s.logger.Error("Failed to process invite success", zap.Error(err)) } default: s.logger.Warn("Unknown event type", zap.String("type", string(event.Type))) } } } }