Mq.php 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. <?php
  2. namespace App\Http\Api;
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. use PhpAmqpLib\Exchange\AMQPExchangeType;
  6. use Illuminate\Support\Facades\Log;
  7. class Mq{
  8. public static function publish(string $channelName, $message){
  9. //一对一
  10. try{
  11. $host = env("RABBITMQ_HOST");
  12. $port = env("RABBITMQ_PORT");
  13. $user = env("RABBITMQ_USER");
  14. $password = env("RABBITMQ_PASSWORD");
  15. $vhost = env("RABBITMQ_VIRTUAL_HOST");
  16. if(empty($host) || empty($port) || empty($user) || empty($password) || empty($vhost)){
  17. return;
  18. }
  19. $connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost);
  20. $channel = $connection->channel();
  21. $channel->queue_declare($channelName, false, true, false, false);
  22. $msg = new AMQPMessage(json_encode($message,JSON_UNESCAPED_UNICODE));
  23. $channel->basic_publish($msg, '', $channelName);
  24. $channel->close();
  25. $connection->close();
  26. }catch(\Exception $e){
  27. Log::error($e);
  28. return;
  29. }
  30. }
  31. /**
  32. * @param string $exchange
  33. * @param string $queue
  34. * @param callable|null $callback
  35. */
  36. public static function worker($exchange,$queue,$callback=null){
  37. $consumerTag = 'consumer';
  38. $connection = new AMQPStreamConnection(env("RABBITMQ_HOST"),
  39. env("RABBITMQ_PORT"),
  40. env("RABBITMQ_USER"),
  41. env("RABBITMQ_PASSWORD"),
  42. env("RABBITMQ_VIRTUAL_HOST"));
  43. $channel = $connection->channel();
  44. /*
  45. The following code is the same both in the consumer and the producer.
  46. In this way we are sure we always have a queue to consume from and an
  47. exchange where to publish messages.
  48. */
  49. /*
  50. name: $queue
  51. passive: false
  52. durable: true // the queue will survive server restarts
  53. exclusive: false // the queue can be accessed in other channels
  54. auto_delete: false //the queue won't be deleted once the channel is closed.
  55. */
  56. $channel->queue_declare($queue, false, true, false, false);
  57. /*
  58. name: $exchange
  59. type: direct
  60. passive: false
  61. durable: true // the exchange will survive server restarts
  62. auto_delete: false //the exchange won't be deleted once the channel is closed.
  63. */
  64. $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
  65. $channel->queue_bind($queue, $exchange);
  66. /**
  67. * @param \PhpAmqpLib\Message\AMQPMessage $message
  68. */
  69. $process_message = function ($message) use($callback,$connection,$exchange,$queue)
  70. {
  71. if($callback !== null){
  72. try{
  73. $result = $callback(json_decode($message->body));
  74. if($result !== 0){
  75. throw new \Exception('error');
  76. }
  77. }catch(\Exception $e){
  78. // push to issues
  79. $channelName = 'issues';
  80. $channelIssues = $connection->channel();
  81. $channelIssues->queue_declare($channelName, false, true, false, false);
  82. $msg = new AMQPMessage(json_encode(['exchange'=>$exchange,
  83. 'channel'=>$queue,
  84. 'message'=>json_decode($message->body),
  85. 'result'=>$result,
  86. 'error'=>$e,
  87. ],JSON_UNESCAPED_UNICODE));
  88. $channelIssues->basic_publish($msg, '', $channelName);
  89. $channelIssues->close();
  90. }
  91. }
  92. $message->ack();
  93. // Send a message with the string "quit" to cancel the consumer.
  94. /*
  95. if ($message->body === 'quit') {
  96. $message->getChannel()->basic_cancel($message->getConsumerTag());
  97. }
  98. */
  99. };
  100. /*
  101. queue: Queue from where to get the messages
  102. consumer_tag: Consumer identifier
  103. no_local: Don't receive messages published by this consumer.
  104. no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
  105. exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
  106. nowait:
  107. callback: A PHP Callback
  108. */
  109. $channel->basic_consume($queue, $consumerTag, false, false, false, false, $process_message);
  110. /**
  111. * @param \PhpAmqpLib\Channel\AMQPChannel $channel
  112. * @param \PhpAmqpLib\Connection\AbstractConnection $connection
  113. */
  114. $shutdown = function ($channel, $connection)
  115. {
  116. $channel->close();
  117. $connection->close();
  118. };
  119. register_shutdown_function($shutdown, $channel, $connection);
  120. // Loop as long as the channel has callbacks registered
  121. while ($channel->is_consuming()) {
  122. $channel->wait(null, true);
  123. // do something else
  124. usleep(300000);
  125. }
  126. }
  127. }