Mq.php 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. <?php
  2. namespace App\Http\Api;
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. use PhpAmqpLib\Exchange\AMQPExchangeType;
  6. class Mq{
  7. public static function publish(string $channelName, $message){
  8. //一对一
  9. $connection = new AMQPStreamConnection(env("RABBITMQ_HOST"),
  10. env("RABBITMQ_PORT"),
  11. env("RABBITMQ_USER"),
  12. env("RABBITMQ_PASSWORD"),
  13. env("RABBITMQ_VIRTUAL_HOST")
  14. );
  15. $channel = $connection->channel();
  16. $channel->queue_declare($channelName, false, true, false, false);
  17. $msg = new AMQPMessage(json_encode($message,JSON_UNESCAPED_UNICODE));
  18. $channel->basic_publish($msg, '', $channelName);
  19. $channel->close();
  20. $connection->close();
  21. }
  22. /**
  23. * @param string $exchange
  24. * @param string $queue
  25. * @param callable|null $callback
  26. */
  27. public static function worker($exchange,$queue,$callback=null){
  28. $consumerTag = 'consumer';
  29. $connection = new AMQPStreamConnection(env("RABBITMQ_HOST"),
  30. env("RABBITMQ_PORT"),
  31. env("RABBITMQ_USER"),
  32. env("RABBITMQ_PASSWORD"),
  33. env("RABBITMQ_VIRTUAL_HOST"));
  34. $channel = $connection->channel();
  35. /*
  36. The following code is the same both in the consumer and the producer.
  37. In this way we are sure we always have a queue to consume from and an
  38. exchange where to publish messages.
  39. */
  40. /*
  41. name: $queue
  42. passive: false
  43. durable: true // the queue will survive server restarts
  44. exclusive: false // the queue can be accessed in other channels
  45. auto_delete: false //the queue won't be deleted once the channel is closed.
  46. */
  47. $channel->queue_declare($queue, false, true, false, false);
  48. /*
  49. name: $exchange
  50. type: direct
  51. passive: false
  52. durable: true // the exchange will survive server restarts
  53. auto_delete: false //the exchange won't be deleted once the channel is closed.
  54. */
  55. $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
  56. $channel->queue_bind($queue, $exchange);
  57. /**
  58. * @param \PhpAmqpLib\Message\AMQPMessage $message
  59. */
  60. $process_message = function ($message) use($callback)
  61. {
  62. if($callback !== null){
  63. $callback(json_decode($message->body));
  64. }
  65. $message->ack();
  66. // Send a message with the string "quit" to cancel the consumer.
  67. /*
  68. if ($message->body === 'quit') {
  69. $message->getChannel()->basic_cancel($message->getConsumerTag());
  70. }
  71. */
  72. };
  73. /*
  74. queue: Queue from where to get the messages
  75. consumer_tag: Consumer identifier
  76. no_local: Don't receive messages published by this consumer.
  77. no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
  78. exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
  79. nowait:
  80. callback: A PHP Callback
  81. */
  82. $channel->basic_consume($queue, $consumerTag, false, false, false, false, $process_message);
  83. /**
  84. * @param \PhpAmqpLib\Channel\AMQPChannel $channel
  85. * @param \PhpAmqpLib\Connection\AbstractConnection $connection
  86. */
  87. $shutdown = function ($channel, $connection)
  88. {
  89. $channel->close();
  90. $connection->close();
  91. };
  92. register_shutdown_function($shutdown, $channel, $connection);
  93. // Loop as long as the channel has callbacks registered
  94. while ($channel->is_consuming()) {
  95. $channel->wait(null, true);
  96. // do something else
  97. usleep(300000);
  98. }
  99. }
  100. }