TestMqWorker.php 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. <?php
  2. namespace App\Console\Commands;
  3. use Illuminate\Console\Command;
  4. use PhpAmqpLib\Connection\AMQPStreamConnection;
  5. use PhpAmqpLib\Exchange\AMQPExchangeType;
  6. class TestMqWorker extends Command
  7. {
  8. /**
  9. * The name and signature of the console command.
  10. * php artisan test:mq.worker
  11. * @var string
  12. */
  13. protected $signature = 'test:mq.worker';
  14. /**
  15. * The console command description.
  16. *
  17. * @var string
  18. */
  19. protected $description = 'Command description';
  20. /**
  21. * Create a new command instance.
  22. *
  23. * @return void
  24. */
  25. public function __construct()
  26. {
  27. parent::__construct();
  28. }
  29. /**
  30. * Execute the console command.
  31. *
  32. * @return int
  33. */
  34. public function handle()
  35. {
  36. if(\App\Tools\Tools::isStop()){
  37. return 0;
  38. }
  39. $exchange = 'router';
  40. $queue = 'hello';
  41. $consumerTag = 'consumer';
  42. $connection = new AMQPStreamConnection(config("queue.connections.rabbitmq.host"),
  43. config("queue.connections.rabbitmq.port"),
  44. config("queue.connections.rabbitmq.user"),
  45. config("queue.connections.rabbitmq.password"),
  46. config("queue.connections.rabbitmq.virtual_host"));
  47. $channel = $connection->channel();
  48. /*
  49. The following code is the same both in the consumer and the producer.
  50. In this way we are sure we always have a queue to consume from and an
  51. exchange where to publish messages.
  52. */
  53. /*
  54. name: $queue
  55. passive: false
  56. durable: true // the queue will survive server restarts
  57. exclusive: false // the queue can be accessed in other channels
  58. auto_delete: false //the queue won't be deleted once the channel is closed.
  59. */
  60. $channel->queue_declare($queue, false, true, false, false);
  61. /*
  62. name: $exchange
  63. type: direct
  64. passive: false
  65. durable: true // the exchange will survive server restarts
  66. auto_delete: false //the exchange won't be deleted once the channel is closed.
  67. */
  68. $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
  69. $channel->queue_bind($queue, $exchange);
  70. /**
  71. * @param \PhpAmqpLib\Message\AMQPMessage $message
  72. */
  73. $process_message = function ($message)
  74. {
  75. echo "\n--------\n";
  76. echo $message->body;
  77. echo "\n--------\n";
  78. $message->ack();
  79. // Send a message with the string "quit" to cancel the consumer.
  80. if ($message->body === 'quit') {
  81. $message->getChannel()->basic_cancel($message->getConsumerTag());
  82. }
  83. };
  84. /*
  85. queue: Queue from where to get the messages
  86. consumer_tag: Consumer identifier
  87. no_local: Don't receive messages published by this consumer.
  88. no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
  89. exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
  90. nowait:
  91. callback: A PHP Callback
  92. */
  93. $channel->basic_consume($queue, $consumerTag, false, false, false, false, $process_message);
  94. /**
  95. * @param \PhpAmqpLib\Channel\AMQPChannel $channel
  96. * @param \PhpAmqpLib\Connection\AbstractConnection $connection
  97. */
  98. $shutdown = function ($channel, $connection)
  99. {
  100. $channel->close();
  101. $connection->close();
  102. };
  103. register_shutdown_function($shutdown, $channel, $connection);
  104. // Loop as long as the channel has callbacks registered
  105. while ($channel->is_consuming()) {
  106. $channel->wait(null, true);
  107. // do something else
  108. usleep(300000);
  109. }
  110. return 0;
  111. }
  112. }