Mq.php 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. <?php
  2. namespace App\Http\Api;
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. use PhpAmqpLib\Exchange\AMQPExchangeType;
  6. use PhpAmqpLib\Exception\AMQPTimeoutException;
  7. use PhpAmqpLib\Exception\AMQPProtocolChannelException;
  8. use Illuminate\Support\Facades\Log;
  9. use Illuminate\Support\Str;
  10. class Mq
  11. {
  12. private static function connection()
  13. {
  14. $host = config("queue.connections.rabbitmq.host");
  15. $port = config("queue.connections.rabbitmq.port");
  16. $user = config("queue.connections.rabbitmq.user");
  17. $password = config("queue.connections.rabbitmq.password");
  18. $vhost = config("queue.connections.rabbitmq.password");
  19. if (empty($host) || empty($port) || empty($user) || empty($password) || empty($vhost)) {
  20. Log::error('rabbitmq set error');
  21. return;
  22. }
  23. $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
  24. return $connection;
  25. }
  26. public static function publish(string $queue, $message)
  27. {
  28. //一对一
  29. try {
  30. Log::debug('mq publish', ['queue' => $queue, 'message' => $message]);
  31. $host = config("queue.connections.rabbitmq.host");
  32. $port = config("queue.connections.rabbitmq.port");
  33. $user = config("queue.connections.rabbitmq.user");
  34. $password = config("queue.connections.rabbitmq.password");
  35. $vhost = config("queue.connections.rabbitmq.virtual_host");
  36. if (empty($host) || empty($port) || empty($user) || empty($password) || empty($vhost)) {
  37. Log::error('rabbitmq set error');
  38. return;
  39. }
  40. $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
  41. $channel = $connection->channel();
  42. $channel->queue_declare($queue, false, true, false, false);
  43. $msgId = Str::uuid();
  44. Log::info("mq push message queue={$queue} id={$msgId}");
  45. $msg = new AMQPMessage(
  46. json_encode($message, JSON_UNESCAPED_UNICODE),
  47. [
  48. "message_id" => $msgId,
  49. "content_type" => 'application/json; charset=utf-8'
  50. ]
  51. );
  52. $channel->basic_publish($msg, '', $queue);
  53. $channel->close();
  54. $connection->close();
  55. } catch (\Exception $e) {
  56. Log::error($e);
  57. return;
  58. }
  59. }
  60. /**
  61. * @param string $exchange
  62. * @param string $queue
  63. * @param callable|null $callback
  64. */
  65. public static function worker($exchange, $queue, $callback = null)
  66. {
  67. $consumerTag = 'consumer';
  68. $host = config("queue.connections.rabbitmq.host");
  69. $port = config("queue.connections.rabbitmq.port");
  70. $user = config("queue.connections.rabbitmq.user");
  71. $password = config("queue.connections.rabbitmq.password");
  72. $vhost = config("queue.connections.rabbitmq.virtual_host");
  73. $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
  74. $channel = $connection->channel();
  75. /*
  76. The following code is the same both in the consumer and the producer.
  77. In this way we are sure we always have a queue to consume from and an
  78. exchange where to publish messages.
  79. */
  80. /*
  81. name: $queue
  82. passive: false
  83. durable: true // the queue will survive server restarts
  84. exclusive: false // the queue can be accessed in other channels
  85. auto_delete: false //the queue won't be deleted once the channel is closed.
  86. */
  87. $channel->queue_declare($queue, false, true, false, false);
  88. /*
  89. name: $exchange
  90. type: direct
  91. passive: false
  92. durable: true // the exchange will survive server restarts
  93. auto_delete: false //the exchange won't be deleted once the channel is closed.
  94. */
  95. $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
  96. $channel->queue_bind($queue, $exchange);
  97. /**
  98. * @param \PhpAmqpLib\Message\AMQPMessage $message
  99. */
  100. $process_message = function ($message) use ($callback, $queue) {
  101. Log::debug('received message', [
  102. 'message_id' => $message->get('message_id'),
  103. 'content_type' => $message->get('content_type')
  104. ]);
  105. if ($callback !== null) {
  106. try {
  107. $result = $callback(json_decode($message->getBody()));
  108. Log::debug(
  109. 'mq done',
  110. [
  111. 'message_id' => $message->get('message_id')
  112. ]
  113. );
  114. if ($result !== 0) {
  115. throw new \Exception('task error');
  116. }
  117. } catch (\Exception $e) {
  118. Log::error("mq worker {$queue} exception", [
  119. 'queue' => $queue,
  120. 'message_id' => $message->get('message_id'),
  121. 'exception' => $e
  122. ]);
  123. }
  124. if (\App\Tools\Tools::isStop()) {
  125. Log::info('mq worker: .stop file exist. cancel the consumer.');
  126. $message->getChannel()->basic_cancel($message->getConsumerTag());
  127. }
  128. }
  129. //exit
  130. foreach (config('mint.mq.loop_limit') as $key => $value) {
  131. if ($queue === $key) {
  132. if ($value > 0) {
  133. if (isset($GLOBALS[$key])) {
  134. $GLOBALS[$key]++;
  135. } else {
  136. $GLOBALS[$key] = 1;
  137. }
  138. if ($GLOBALS[$key] >= $value) {
  139. Log::info("mq exit queue={$queue} loop=" . $GLOBALS[$key]);
  140. $message->getChannel()->basic_cancel($message->getConsumerTag());
  141. }
  142. }
  143. }
  144. }
  145. // Send a message with the string "quit" to cancel the consumer.
  146. /*
  147. if ($message->body === 'quit') {
  148. $message->getChannel()->basic_cancel($message->getConsumerTag());
  149. }
  150. */
  151. };
  152. /*
  153. queue: Queue from where to get the messages
  154. consumer_tag: Consumer identifier
  155. no_local: Don't receive messages published by this consumer.
  156. no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
  157. exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
  158. nowait:
  159. callback: A PHP Callback
  160. */
  161. $channel->basic_consume($queue, $consumerTag, false, true, false, false, $process_message);
  162. /**
  163. * @param \PhpAmqpLib\Channel\AMQPChannel $channel
  164. * @param \PhpAmqpLib\Connection\AbstractConnection $connection
  165. */
  166. $shutdown = function ($channel, $connection) {
  167. $channel->close();
  168. $connection->close();
  169. };
  170. register_shutdown_function($shutdown, $channel, $connection);
  171. $timeout = 15;
  172. // Loop as long as the channel has callbacks registered
  173. while ($channel->is_consuming()) {
  174. try {
  175. $channel->wait(null, false, $timeout);
  176. } catch (AMQPTimeoutException $e) {
  177. // ignore it
  178. }
  179. }
  180. }
  181. }