| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- <?php
- namespace App\Jobs;
- use Illuminate\Bus\Queueable;
- use Illuminate\Contracts\Queue\ShouldQueue;
- use Illuminate\Foundation\Bus\Dispatchable;
- use Illuminate\Queue\InteractsWithQueue;
- use Illuminate\Queue\SerializesModels;
- use Illuminate\Support\Facades\Log;
- use Illuminate\Support\Facades\Config;
- use App\Exceptions\TaskFailException;
- abstract class BaseRabbitMQJob implements ShouldQueue
- {
- use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
- protected $queueName;
- protected $messageData;
- protected $currentRetryCount = 0;
- protected $tries = 0;
- protected $timeout = 0;
- protected $messageId = null;
- protected $stop = false;
- public function __construct(string $queueName, string $messageId, array $messageData, int $retryCount = 0)
- {
- $this->queueName = $queueName;
- $this->messageData = $messageData;
- $this->currentRetryCount = $retryCount;
- $this->messageId = $messageId;
- // 从配置读取重试次数和超时时间
- $queueConfig = config("mint.rabbitmq.queues.{$queueName}");
- $this->tries = $queueConfig['retry_times'] ?? 3;
- $this->timeout = $queueConfig['timeout'] ?? 300;
- }
- public function handle()
- {
- try {
- Log::info("开始处理队列消息", [
- 'queue' => $this->queueName,
- 'message_id' => $this->messageId ?? 'unknown',
- 'retry_count' => $this->currentRetryCount
- ]);
- // 调用子类的具体业务逻辑
- $result = $this->processMessage($this->messageData);
- Log::info("队列消息处理完成", [
- 'queue' => $this->queueName,
- 'message_id' => $this->messageId ?? 'unknown',
- 'result' => $result
- ]);
- return $result;
- } catch (TaskFailException $e) {
- $this->handleFinalFailure($this->messageData, $e);
- throw $e;
- } catch (\Exception $e) {
- Log::error("队列消息处理失败", [
- 'queue' => $this->queueName,
- 'message_id' => $this->messageId ?? 'unknown',
- 'error' => $e->getMessage(),
- 'retry_count' => $this->currentRetryCount,
- 'max_retries' => $this->tries
- ]);
- // 如果达到最大重试次数,处理失败逻辑
- if ($this->currentRetryCount >= $this->tries - 1) {
- $this->handleFinalFailure($this->messageData, $e);
- }
- throw $e; // 重新抛出异常以触发重试
- }
- }
- public function failed(\Exception $exception)
- {
- Log::error("队列消息最终失败", [
- 'queue' => $this->queueName,
- 'message_id' => $this->messageId ?? 'unknown',
- 'error' => $exception->getMessage(),
- 'retry_count' => $this->currentRetryCount
- ]);
- // 发送到死信队列的逻辑将在 Worker 中处理
- }
- // 子类需要实现的具体业务逻辑
- abstract protected function processMessage(array $messageData);
- // 子类可以重写的失败处理逻辑
- protected function handleFinalFailure(array $messageData, \Exception $exception)
- {
- // 默认实现:记录日志
- Log::error("消息处理最终失败,准备发送到死信队列", [
- 'queue' => $this->queueName,
- 'message_id' => $this->messageId ?? 'unknown',
- 'error' => $exception->getMessage()
- ]);
- }
- public function getQueueName(): string
- {
- return $this->queueName;
- }
- public function getCurrentRetryCount(): int
- {
- return $this->currentRetryCount;
- }
- public function stop()
- {
- $this->stop = true;
- }
- }
|