Mq.php 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. <?php
  2. namespace App\Http\Api;
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. use PhpAmqpLib\Exchange\AMQPExchangeType;
  6. use Illuminate\Support\Facades\Log;
  7. class Mq{
  8. private static function connection(){
  9. $host = config("queue.connections.rabbitmq.host");
  10. $port = config("queue.connections.rabbitmq.port");
  11. $user = config("queue.connections.rabbitmq.user");
  12. $password = config("queue.connections.rabbitmq.password");
  13. $vhost = config("queue.connections.rabbitmq.password");
  14. if(empty($host) || empty($port) || empty($user) || empty($password) || empty($vhost)){
  15. Log::error('rabbitmq set error');
  16. return;
  17. }
  18. $connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost);
  19. return $connection;
  20. }
  21. public static function publish(string $channelName, $message){
  22. //一对一
  23. try{
  24. Log::debug('mq start {channel} {message}',['channel'=>$channelName,'message'=>$message]);
  25. $host = config("queue.connections.rabbitmq.host");
  26. $port = config("queue.connections.rabbitmq.port");
  27. $user = config("queue.connections.rabbitmq.user");
  28. $password = config("queue.connections.rabbitmq.password");
  29. $vhost = config("queue.connections.rabbitmq.virtual_host");
  30. if(empty($host) || empty($port) || empty($user) || empty($password) || empty($vhost)){
  31. Log::error('rabbitmq set error');
  32. return;
  33. }
  34. $connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost);
  35. $channel = $connection->channel();
  36. $channel->queue_declare($channelName, false, true, false, false);
  37. $msg = new AMQPMessage(json_encode($message,JSON_UNESCAPED_UNICODE));
  38. $channel->basic_publish($msg, '', $channelName);
  39. $channel->close();
  40. $connection->close();
  41. }catch(\Exception $e){
  42. Log::error($e);
  43. return;
  44. }
  45. }
  46. /**
  47. * @param string $exchange
  48. * @param string $queue
  49. * @param callable|null $callback
  50. */
  51. public static function worker($exchange,$queue,$callback=null){
  52. $consumerTag = 'consumer';
  53. $host = config("queue.connections.rabbitmq.host");
  54. $port = config("queue.connections.rabbitmq.port");
  55. $user = config("queue.connections.rabbitmq.user");
  56. $password = config("queue.connections.rabbitmq.password");
  57. $vhost = config("queue.connections.rabbitmq.virtual_host");
  58. $connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost);
  59. $channel = $connection->channel();
  60. /*
  61. The following code is the same both in the consumer and the producer.
  62. In this way we are sure we always have a queue to consume from and an
  63. exchange where to publish messages.
  64. */
  65. /*
  66. name: $queue
  67. passive: false
  68. durable: true // the queue will survive server restarts
  69. exclusive: false // the queue can be accessed in other channels
  70. auto_delete: false //the queue won't be deleted once the channel is closed.
  71. */
  72. $channel->queue_declare($queue, false, true, false, false);
  73. /*
  74. name: $exchange
  75. type: direct
  76. passive: false
  77. durable: true // the exchange will survive server restarts
  78. auto_delete: false //the exchange won't be deleted once the channel is closed.
  79. */
  80. $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
  81. $channel->queue_bind($queue, $exchange);
  82. /**
  83. * @param \PhpAmqpLib\Message\AMQPMessage $message
  84. */
  85. $process_message = function ($message) use($callback,$connection,$exchange,$queue)
  86. {
  87. if($callback !== null){
  88. try{
  89. $result = $callback(json_decode($message->body));
  90. if(\App\Tools\Tools::isStop()){
  91. Log::debug('mq worker: .stop file exist. cancel the consumer.');
  92. $message->getChannel()->basic_cancel($message->getConsumerTag());
  93. }
  94. if($result !== 0){
  95. throw new \Exception('task error');
  96. }
  97. }catch(\Exception $e){
  98. // push to issues
  99. Log::error('mq worker exception',$e);
  100. $channelName = 'issues';
  101. $channelIssues = $connection->channel();
  102. $channelIssues->queue_declare($channelName, false, true, false, false);
  103. $msg = new AMQPMessage(json_encode(['exchange'=>$exchange,
  104. 'channel'=>$queue,
  105. 'message'=>json_decode($message->body),
  106. 'result'=>0,
  107. 'error'=>$e,
  108. ],JSON_UNESCAPED_UNICODE));
  109. $channelIssues->basic_publish($msg, '', $channelName);
  110. $channelIssues->close();
  111. }
  112. }
  113. $message->ack();
  114. // Send a message with the string "quit" to cancel the consumer.
  115. /*
  116. if ($message->body === 'quit') {
  117. $message->getChannel()->basic_cancel($message->getConsumerTag());
  118. }
  119. */
  120. };
  121. /*
  122. queue: Queue from where to get the messages
  123. consumer_tag: Consumer identifier
  124. no_local: Don't receive messages published by this consumer.
  125. no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
  126. exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
  127. nowait:
  128. callback: A PHP Callback
  129. */
  130. $channel->basic_consume($queue, $consumerTag, false, false, false, false, $process_message);
  131. /**
  132. * @param \PhpAmqpLib\Channel\AMQPChannel $channel
  133. * @param \PhpAmqpLib\Connection\AbstractConnection $connection
  134. */
  135. $shutdown = function ($channel, $connection)
  136. {
  137. $channel->close();
  138. $connection->close();
  139. };
  140. register_shutdown_function($shutdown, $channel, $connection);
  141. // Loop as long as the channel has callbacks registered
  142. while ($channel->is_consuming()) {
  143. $channel->wait(null, true);
  144. // do something else
  145. usleep(300000);
  146. }
  147. }
  148. }