BaseRabbitMQJob.php 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. <?php
  2. namespace App\Jobs;
  3. use Illuminate\Bus\Queueable;
  4. use Illuminate\Contracts\Queue\ShouldQueue;
  5. use Illuminate\Foundation\Bus\Dispatchable;
  6. use Illuminate\Queue\InteractsWithQueue;
  7. use Illuminate\Queue\SerializesModels;
  8. use Illuminate\Support\Facades\Log;
  9. use Illuminate\Support\Facades\Config;
  10. abstract class BaseRabbitMQJob implements ShouldQueue
  11. {
  12. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  13. protected $queueName;
  14. protected $messageData;
  15. protected $currentRetryCount = 0;
  16. protected $tries = 0;
  17. protected $timeout = 0;
  18. protected $messageId = null;
  19. protected $stop = false;
  20. public function __construct(string $queueName, string $messageId, array $messageData, int $retryCount = 0)
  21. {
  22. $this->queueName = $queueName;
  23. $this->messageData = $messageData;
  24. $this->currentRetryCount = $retryCount;
  25. $this->messageId = $messageId;
  26. // 从配置读取重试次数和超时时间
  27. $queueConfig = config("mint.rabbitmq.queues.{$queueName}");
  28. $this->tries = $queueConfig['retry_times'] ?? 3;
  29. $this->timeout = $queueConfig['timeout'] ?? 300;
  30. }
  31. public function handle()
  32. {
  33. try {
  34. Log::info("开始处理队列消息", [
  35. 'queue' => $this->queueName,
  36. 'message_id' => $this->messageId ?? 'unknown',
  37. 'retry_count' => $this->currentRetryCount
  38. ]);
  39. // 调用子类的具体业务逻辑
  40. $result = $this->processMessage($this->messageData);
  41. Log::info("队列消息处理完成", [
  42. 'queue' => $this->queueName,
  43. 'message_id' => $this->messageId ?? 'unknown',
  44. 'result' => $result
  45. ]);
  46. return $result;
  47. } catch (\Exception $e) {
  48. Log::error("队列消息处理失败", [
  49. 'queue' => $this->queueName,
  50. 'message_id' => $this->messageId ?? 'unknown',
  51. 'error' => $e->getMessage(),
  52. 'retry_count' => $this->currentRetryCount,
  53. 'max_retries' => $this->tries
  54. ]);
  55. // 如果达到最大重试次数,处理失败逻辑
  56. if ($this->currentRetryCount >= $this->tries - 1) {
  57. $this->handleFinalFailure($this->messageData, $e);
  58. }
  59. throw $e; // 重新抛出异常以触发重试
  60. }
  61. }
  62. public function failed(\Exception $exception)
  63. {
  64. Log::error("队列消息最终失败", [
  65. 'queue' => $this->queueName,
  66. 'message_id' => $this->messageId ?? 'unknown',
  67. 'error' => $exception->getMessage(),
  68. 'retry_count' => $this->currentRetryCount
  69. ]);
  70. // 发送到死信队列的逻辑将在 Worker 中处理
  71. }
  72. // 子类需要实现的具体业务逻辑
  73. abstract protected function processMessage(array $messageData);
  74. // 子类可以重写的失败处理逻辑
  75. protected function handleFinalFailure(array $messageData, \Exception $exception)
  76. {
  77. // 默认实现:记录日志
  78. Log::error("消息处理最终失败,准备发送到死信队列", [
  79. 'queue' => $this->queueName,
  80. 'message_id' => $this->messageId ?? 'unknown',
  81. 'error' => $exception->getMessage()
  82. ]);
  83. }
  84. public function getQueueName(): string
  85. {
  86. return $this->queueName;
  87. }
  88. public function getCurrentRetryCount(): int
  89. {
  90. return $this->currentRetryCount;
  91. }
  92. public function stop()
  93. {
  94. $this->stop = true;
  95. }
  96. }