TestMqWorker.php 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. <?php
  2. namespace App\Console\Commands;
  3. use Illuminate\Console\Command;
  4. use PhpAmqpLib\Connection\AMQPStreamConnection;
  5. use PhpAmqpLib\Exchange\AMQPExchangeType;
  6. class TestMqWorker extends Command
  7. {
  8. /**
  9. * The name and signature of the console command.
  10. *
  11. * @var string
  12. */
  13. protected $signature = 'test:mq.worker';
  14. /**
  15. * The console command description.
  16. *
  17. * @var string
  18. */
  19. protected $description = 'Command description';
  20. /**
  21. * Create a new command instance.
  22. *
  23. * @return void
  24. */
  25. public function __construct()
  26. {
  27. parent::__construct();
  28. }
  29. /**
  30. * Execute the console command.
  31. *
  32. * @return int
  33. */
  34. public function handle()
  35. {
  36. $exchange = 'router';
  37. $queue = 'hello';
  38. $consumerTag = 'consumer';
  39. $connection = new AMQPStreamConnection(env("RABBITMQ_HOST"),
  40. env("RABBITMQ_PORT"),
  41. env("RABBITMQ_USER"),
  42. env("RABBITMQ_PASSWORD"),
  43. env("RABBITMQ_VIRTUAL_HOST"));
  44. $channel = $connection->channel();
  45. /*
  46. The following code is the same both in the consumer and the producer.
  47. In this way we are sure we always have a queue to consume from and an
  48. exchange where to publish messages.
  49. */
  50. /*
  51. name: $queue
  52. passive: false
  53. durable: true // the queue will survive server restarts
  54. exclusive: false // the queue can be accessed in other channels
  55. auto_delete: false //the queue won't be deleted once the channel is closed.
  56. */
  57. $channel->queue_declare($queue, false, true, false, false);
  58. /*
  59. name: $exchange
  60. type: direct
  61. passive: false
  62. durable: true // the exchange will survive server restarts
  63. auto_delete: false //the exchange won't be deleted once the channel is closed.
  64. */
  65. $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
  66. $channel->queue_bind($queue, $exchange);
  67. /**
  68. * @param \PhpAmqpLib\Message\AMQPMessage $message
  69. */
  70. $process_message = function ($message)
  71. {
  72. echo "\n--------\n";
  73. echo $message->body;
  74. echo "\n--------\n";
  75. $message->ack();
  76. // Send a message with the string "quit" to cancel the consumer.
  77. if ($message->body === 'quit') {
  78. $message->getChannel()->basic_cancel($message->getConsumerTag());
  79. }
  80. };
  81. /*
  82. queue: Queue from where to get the messages
  83. consumer_tag: Consumer identifier
  84. no_local: Don't receive messages published by this consumer.
  85. no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
  86. exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
  87. nowait:
  88. callback: A PHP Callback
  89. */
  90. $channel->basic_consume($queue, $consumerTag, false, false, false, false, $process_message);
  91. /**
  92. * @param \PhpAmqpLib\Channel\AMQPChannel $channel
  93. * @param \PhpAmqpLib\Connection\AbstractConnection $connection
  94. */
  95. $shutdown = function ($channel, $connection)
  96. {
  97. $channel->close();
  98. $connection->close();
  99. };
  100. register_shutdown_function($shutdown, $channel, $connection);
  101. // Loop as long as the channel has callbacks registered
  102. while ($channel->is_consuming()) {
  103. $channel->wait(null, true);
  104. // do something else
  105. usleep(300000);
  106. }
  107. return 0;
  108. }
  109. }