ProcessDeadLetterQueue.php 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. <?php
  2. namespace App\Console\Commands;
  3. use Illuminate\Console\Command;
  4. use PhpAmqpLib\Connection\AMQPStreamConnection;
  5. use PhpAmqpLib\Message\AMQPMessage;
  6. use Illuminate\Support\Facades\Log;
  7. class ProcessDeadLetterQueue extends Command
  8. {
  9. /**
  10. * The name and signature of the console command.
  11. * 查看死信队列消息
  12. * php artisan rabbitmq:process-dlq orders_dlq
  13. *
  14. * 重新入队死信消息
  15. * php artisan rabbitmq:process-dlq orders_dlq --requeue
  16. *
  17. * 删除死信消息
  18. * php artisan rabbitmq:process-dlq orders_dlq --delete
  19. * @var string
  20. */
  21. protected $signature = 'rabbitmq:process-dlq {dlq_name} {--requeue} {--delete}';
  22. protected $description = '处理死信队列中的消息';
  23. public function handle()
  24. {
  25. $dlqName = $this->argument('dlq_name');
  26. $requeue = $this->option('requeue');
  27. $delete = $this->option('delete');
  28. $config = config('queue.connections.rabbitmq');
  29. $connection = new AMQPStreamConnection(
  30. $config['host'],
  31. $config['port'],
  32. $config['user'],
  33. $config['password'],
  34. $config['virtual_host']
  35. );
  36. $channel = $connection->channel();
  37. $this->info("开始处理死信队列: {$dlqName}");
  38. $messageCount = 0;
  39. while (true) {
  40. $msg = $channel->basic_get($dlqName, false);
  41. if (!$msg) {
  42. break; // 队列为空
  43. }
  44. $messageCount++;
  45. $data = json_decode($msg->body, true);
  46. $this->info("处理第 {$messageCount} 条死信消息");
  47. $this->line("原始队列: " . ($data['queue'] ?? 'unknown'));
  48. $this->line("失败原因: " . ($data['failure_reason'] ?? 'unknown'));
  49. $this->line("失败时间: " . ($data['failed_at'] ?? 'unknown'));
  50. if ($requeue) {
  51. // 重新入队到原始队列
  52. $originalQueue = $data['queue'] ?? null;
  53. if ($originalQueue && isset($data['original_message'])) {
  54. $requeueMsg = new AMQPMessage(
  55. json_encode($data['original_message']),
  56. ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
  57. );
  58. $channel->basic_publish($requeueMsg, '', $originalQueue);
  59. $this->info("消息已重新入队到: {$originalQueue}");
  60. }
  61. }
  62. if ($delete || $requeue) {
  63. $msg->ack();
  64. } else {
  65. // 只是查看,不删除
  66. $msg->nack(false, true);
  67. }
  68. }
  69. $this->info("死信队列处理完成,共处理 {$messageCount} 条消息");
  70. $channel->close();
  71. $connection->close();
  72. return 0;
  73. }
  74. }