2
0

BaseRabbitMQJob.php 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. <?php
  2. namespace App\Jobs;
  3. use Illuminate\Bus\Queueable;
  4. use Illuminate\Contracts\Queue\ShouldQueue;
  5. use Illuminate\Foundation\Bus\Dispatchable;
  6. use Illuminate\Queue\InteractsWithQueue;
  7. use Illuminate\Queue\SerializesModels;
  8. use Illuminate\Support\Facades\Log;
  9. use Illuminate\Support\Facades\Config;
  10. abstract class BaseRabbitMQJob implements ShouldQueue
  11. {
  12. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  13. protected $queueName;
  14. protected $messageData;
  15. protected $currentRetryCount = 0;
  16. protected $tries = 0;
  17. protected $timeout = 0;
  18. public function __construct(string $queueName, array $messageData, int $retryCount = 0)
  19. {
  20. $this->queueName = $queueName;
  21. $this->messageData = $messageData;
  22. $this->currentRetryCount = $retryCount;
  23. // 从配置读取重试次数和超时时间
  24. $queueConfig = config("mint.rabbitmq.queues.{$queueName}");
  25. $this->tries = $queueConfig['retry_times'] ?? 3;
  26. $this->timeout = $queueConfig['timeout'] ?? 300;
  27. }
  28. public function handle($messageId = null)
  29. {
  30. try {
  31. Log::info("开始处理队列消息", [
  32. 'queue' => $this->queueName,
  33. 'message_id' => $this->messageData['id'] ?? 'unknown',
  34. 'retry_count' => $this->currentRetryCount
  35. ]);
  36. // 调用子类的具体业务逻辑
  37. $result = $this->processMessage($this->messageData);
  38. Log::info("队列消息处理完成", [
  39. 'queue' => $this->queueName,
  40. 'message_id' => $this->messageData['id'] ?? 'unknown',
  41. 'result' => $result
  42. ]);
  43. return $result;
  44. } catch (\Exception $e) {
  45. Log::error("队列消息处理失败", [
  46. 'queue' => $this->queueName,
  47. 'message_id' => $this->messageData['id'] ?? 'unknown',
  48. 'error' => $e->getMessage(),
  49. 'retry_count' => $this->currentRetryCount,
  50. 'max_retries' => $this->tries
  51. ]);
  52. // 如果达到最大重试次数,处理失败逻辑
  53. if ($this->currentRetryCount >= $this->tries - 1) {
  54. $this->handleFinalFailure($this->messageData, $e);
  55. }
  56. throw $e; // 重新抛出异常以触发重试
  57. }
  58. }
  59. public function failed(\Exception $exception)
  60. {
  61. Log::error("队列消息最终失败", [
  62. 'queue' => $this->queueName,
  63. 'message_id' => $this->messageData['id'] ?? 'unknown',
  64. 'error' => $exception->getMessage(),
  65. 'retry_count' => $this->currentRetryCount
  66. ]);
  67. // 发送到死信队列的逻辑将在 Worker 中处理
  68. }
  69. // 子类需要实现的具体业务逻辑
  70. abstract protected function processMessage(array $messageData);
  71. // 子类可以重写的失败处理逻辑
  72. protected function handleFinalFailure(array $messageData, \Exception $exception)
  73. {
  74. // 默认实现:记录日志
  75. Log::error("消息处理最终失败,准备发送到死信队列", [
  76. 'queue' => $this->queueName,
  77. 'message_data' => $messageData,
  78. 'error' => $exception->getMessage()
  79. ]);
  80. }
  81. public function getQueueName(): string
  82. {
  83. return $this->queueName;
  84. }
  85. public function getCurrentRetryCount(): int
  86. {
  87. return $this->currentRetryCount;
  88. }
  89. }