2
0

BaseRabbitMQJob.php 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. <?php
  2. namespace App\Jobs;
  3. use Illuminate\Bus\Queueable;
  4. use Illuminate\Contracts\Queue\ShouldQueue;
  5. use Illuminate\Foundation\Bus\Dispatchable;
  6. use Illuminate\Queue\InteractsWithQueue;
  7. use Illuminate\Queue\SerializesModels;
  8. use Illuminate\Support\Facades\Log;
  9. use Illuminate\Support\Facades\Config;
  10. use App\Exceptions\TaskFailException;
  11. abstract class BaseRabbitMQJob implements ShouldQueue
  12. {
  13. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  14. protected $queueName;
  15. protected $messageData;
  16. protected $currentRetryCount = 0;
  17. protected $tries = 0;
  18. protected $timeout = 0;
  19. protected $messageId = null;
  20. protected $stop = false;
  21. public function __construct(string $queueName, string $messageId, array $messageData, int $retryCount = 0)
  22. {
  23. $this->queueName = $queueName;
  24. $this->messageData = $messageData;
  25. $this->currentRetryCount = $retryCount;
  26. $this->messageId = $messageId;
  27. // 从配置读取重试次数和超时时间
  28. $queueConfig = config("mint.rabbitmq.queues.{$queueName}");
  29. $this->tries = $queueConfig['retry_times'] ?? 3;
  30. $this->timeout = $queueConfig['timeout'] ?? 300;
  31. }
  32. public function handle()
  33. {
  34. try {
  35. Log::info("开始处理队列消息", [
  36. 'queue' => $this->queueName,
  37. 'message_id' => $this->messageId ?? 'unknown',
  38. 'retry_count' => $this->currentRetryCount
  39. ]);
  40. // 调用子类的具体业务逻辑
  41. $result = $this->processMessage($this->messageData);
  42. Log::info("队列消息处理完成", [
  43. 'queue' => $this->queueName,
  44. 'message_id' => $this->messageId ?? 'unknown',
  45. 'result' => $result
  46. ]);
  47. return $result;
  48. } catch (TaskFailException $e) {
  49. $this->handleFinalFailure($this->messageData, $e);
  50. throw $e;
  51. } catch (\Exception $e) {
  52. Log::error("队列消息处理失败", [
  53. 'queue' => $this->queueName,
  54. 'message_id' => $this->messageId ?? 'unknown',
  55. 'error' => $e->getMessage(),
  56. 'retry_count' => $this->currentRetryCount,
  57. 'max_retries' => $this->tries
  58. ]);
  59. // 如果达到最大重试次数,处理失败逻辑
  60. if ($this->currentRetryCount >= $this->tries - 1) {
  61. $this->handleFinalFailure($this->messageData, $e);
  62. }
  63. throw $e; // 重新抛出异常以触发重试
  64. }
  65. }
  66. public function failed(\Exception $exception)
  67. {
  68. Log::error("队列消息最终失败", [
  69. 'queue' => $this->queueName,
  70. 'message_id' => $this->messageId ?? 'unknown',
  71. 'error' => $exception->getMessage(),
  72. 'retry_count' => $this->currentRetryCount
  73. ]);
  74. // 发送到死信队列的逻辑将在 Worker 中处理
  75. }
  76. // 子类需要实现的具体业务逻辑
  77. abstract protected function processMessage(array $messageData);
  78. // 子类可以重写的失败处理逻辑
  79. protected function handleFinalFailure(array $messageData, \Exception $exception)
  80. {
  81. // 默认实现:记录日志
  82. Log::error("消息处理最终失败,准备发送到死信队列", [
  83. 'queue' => $this->queueName,
  84. 'message_id' => $this->messageId ?? 'unknown',
  85. 'error' => $exception->getMessage()
  86. ]);
  87. }
  88. public function getQueueName(): string
  89. {
  90. return $this->queueName;
  91. }
  92. public function getCurrentRetryCount(): int
  93. {
  94. return $this->currentRetryCount;
  95. }
  96. public function stop()
  97. {
  98. $this->stop = true;
  99. }
  100. }