Mq.php 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. <?php
  2. namespace App\Http\Api;
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. use PhpAmqpLib\Exchange\AMQPExchangeType;
  6. use Illuminate\Support\Facades\Log;
  7. class Mq
  8. {
  9. private static function connection()
  10. {
  11. $host = config("queue.connections.rabbitmq.host");
  12. $port = config("queue.connections.rabbitmq.port");
  13. $user = config("queue.connections.rabbitmq.user");
  14. $password = config("queue.connections.rabbitmq.password");
  15. $vhost = config("queue.connections.rabbitmq.password");
  16. if (empty($host) || empty($port) || empty($user) || empty($password) || empty($vhost)) {
  17. Log::error('rabbitmq set error');
  18. return;
  19. }
  20. $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
  21. return $connection;
  22. }
  23. public static function publish(string $channelName, $message)
  24. {
  25. //一对一
  26. try {
  27. Log::debug('mq publish {channel} {message}', ['channel' => $channelName, 'message' => $message]);
  28. $host = config("queue.connections.rabbitmq.host");
  29. $port = config("queue.connections.rabbitmq.port");
  30. $user = config("queue.connections.rabbitmq.user");
  31. $password = config("queue.connections.rabbitmq.password");
  32. $vhost = config("queue.connections.rabbitmq.virtual_host");
  33. if (empty($host) || empty($port) || empty($user) || empty($password) || empty($vhost)) {
  34. Log::error('rabbitmq set error');
  35. return;
  36. }
  37. $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
  38. $channel = $connection->channel();
  39. $channel->queue_declare($channelName, false, true, false, false);
  40. $msg = new AMQPMessage(json_encode($message, JSON_UNESCAPED_UNICODE));
  41. $channel->basic_publish($msg, '', $channelName);
  42. $channel->close();
  43. $connection->close();
  44. } catch (\Exception $e) {
  45. Log::error($e);
  46. return;
  47. }
  48. }
  49. /**
  50. * @param string $exchange
  51. * @param string $queue
  52. * @param callable|null $callback
  53. */
  54. public static function worker($exchange, $queue, $callback = null)
  55. {
  56. $consumerTag = 'consumer';
  57. $host = config("queue.connections.rabbitmq.host");
  58. $port = config("queue.connections.rabbitmq.port");
  59. $user = config("queue.connections.rabbitmq.user");
  60. $password = config("queue.connections.rabbitmq.password");
  61. $vhost = config("queue.connections.rabbitmq.virtual_host");
  62. $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
  63. $channel = $connection->channel();
  64. /*
  65. The following code is the same both in the consumer and the producer.
  66. In this way we are sure we always have a queue to consume from and an
  67. exchange where to publish messages.
  68. */
  69. /*
  70. name: $queue
  71. passive: false
  72. durable: true // the queue will survive server restarts
  73. exclusive: false // the queue can be accessed in other channels
  74. auto_delete: false //the queue won't be deleted once the channel is closed.
  75. */
  76. $channel->queue_declare($queue, false, true, false, false);
  77. /*
  78. name: $exchange
  79. type: direct
  80. passive: false
  81. durable: true // the exchange will survive server restarts
  82. auto_delete: false //the exchange won't be deleted once the channel is closed.
  83. */
  84. $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
  85. $channel->queue_bind($queue, $exchange);
  86. /**
  87. * @param \PhpAmqpLib\Message\AMQPMessage $message
  88. */
  89. $process_message = function ($message) use ($callback, $connection, $exchange, $queue) {
  90. if ($callback !== null) {
  91. try {
  92. $result = $callback(json_decode($message->body));
  93. if (\App\Tools\Tools::isStop()) {
  94. Log::debug('mq worker: .stop file exist. cancel the consumer.');
  95. $message->getChannel()->basic_cancel($message->getConsumerTag());
  96. }
  97. if ($result !== 0) {
  98. throw new \Exception('task error');
  99. }
  100. } catch (\Exception $e) {
  101. // push to issues
  102. Log::error('mq worker exception', ['exception'=>$e] );
  103. $channelName = 'issues';
  104. $channelIssues = $connection->channel();
  105. $channelIssues->queue_declare($channelName, false, true, false, false);
  106. $msg = new AMQPMessage(json_encode([
  107. 'exchange' => $exchange,
  108. 'channel' => $queue,
  109. 'message' => json_decode($message->body),
  110. 'result' => 0,
  111. 'error' => $e,
  112. ], JSON_UNESCAPED_UNICODE));
  113. $channelIssues->basic_publish($msg, '', $channelName);
  114. $channelIssues->close();
  115. }
  116. }
  117. $message->ack();
  118. // Send a message with the string "quit" to cancel the consumer.
  119. /*
  120. if ($message->body === 'quit') {
  121. $message->getChannel()->basic_cancel($message->getConsumerTag());
  122. }
  123. */
  124. };
  125. /*
  126. queue: Queue from where to get the messages
  127. consumer_tag: Consumer identifier
  128. no_local: Don't receive messages published by this consumer.
  129. no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
  130. exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
  131. nowait:
  132. callback: A PHP Callback
  133. */
  134. $channel->basic_consume($queue, $consumerTag, false, false, false, false, $process_message);
  135. /**
  136. * @param \PhpAmqpLib\Channel\AMQPChannel $channel
  137. * @param \PhpAmqpLib\Connection\AbstractConnection $connection
  138. */
  139. $shutdown = function ($channel, $connection) {
  140. $channel->close();
  141. $connection->close();
  142. };
  143. register_shutdown_function($shutdown, $channel, $connection);
  144. // Loop as long as the channel has callbacks registered
  145. while ($channel->is_consuming()) {
  146. $channel->wait(null, true);
  147. // do something else
  148. usleep(300000);
  149. }
  150. }
  151. }