Mq.php 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. <?php
  2. namespace App\Http\Api;
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. use PhpAmqpLib\Exchange\AMQPExchangeType;
  6. use Illuminate\Support\Facades\Log;
  7. use Illuminate\Support\Str;
  8. class Mq
  9. {
  10. private static function connection()
  11. {
  12. $host = config("queue.connections.rabbitmq.host");
  13. $port = config("queue.connections.rabbitmq.port");
  14. $user = config("queue.connections.rabbitmq.user");
  15. $password = config("queue.connections.rabbitmq.password");
  16. $vhost = config("queue.connections.rabbitmq.password");
  17. if (empty($host) || empty($port) || empty($user) || empty($password) || empty($vhost)) {
  18. Log::error('rabbitmq set error');
  19. return;
  20. }
  21. $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
  22. return $connection;
  23. }
  24. public static function publish(string $channelName, $message)
  25. {
  26. //一对一
  27. try {
  28. Log::debug('mq publish {channel} {message}', ['channel' => $channelName, 'message' => $message]);
  29. $host = config("queue.connections.rabbitmq.host");
  30. $port = config("queue.connections.rabbitmq.port");
  31. $user = config("queue.connections.rabbitmq.user");
  32. $password = config("queue.connections.rabbitmq.password");
  33. $vhost = config("queue.connections.rabbitmq.virtual_host");
  34. if (empty($host) || empty($port) || empty($user) || empty($password) || empty($vhost)) {
  35. Log::error('rabbitmq set error');
  36. return;
  37. }
  38. $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
  39. $channel = $connection->channel();
  40. $channel->queue_declare($channelName, false, true, false, false);
  41. $msgId = Str::uuid();
  42. Log::info('mq push message id=' . $msgId);
  43. $msg = new AMQPMessage(
  44. json_encode($message, JSON_UNESCAPED_UNICODE),
  45. [
  46. "message_id" => $msgId,
  47. "content_type" => 'application/json; charset=utf-8'
  48. ]
  49. );
  50. $channel->basic_publish($msg, '', $channelName);
  51. $channel->close();
  52. $connection->close();
  53. } catch (\Exception $e) {
  54. Log::error($e);
  55. return;
  56. }
  57. }
  58. /**
  59. * @param string $exchange
  60. * @param string $queue
  61. * @param callable|null $callback
  62. */
  63. public static function worker($exchange, $queue, $callback = null)
  64. {
  65. $consumerTag = 'consumer';
  66. $host = config("queue.connections.rabbitmq.host");
  67. $port = config("queue.connections.rabbitmq.port");
  68. $user = config("queue.connections.rabbitmq.user");
  69. $password = config("queue.connections.rabbitmq.password");
  70. $vhost = config("queue.connections.rabbitmq.virtual_host");
  71. $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
  72. $channel = $connection->channel();
  73. /*
  74. The following code is the same both in the consumer and the producer.
  75. In this way we are sure we always have a queue to consume from and an
  76. exchange where to publish messages.
  77. */
  78. /*
  79. name: $queue
  80. passive: false
  81. durable: true // the queue will survive server restarts
  82. exclusive: false // the queue can be accessed in other channels
  83. auto_delete: false //the queue won't be deleted once the channel is closed.
  84. */
  85. $channel->queue_declare($queue, false, true, false, false);
  86. /*
  87. name: $exchange
  88. type: direct
  89. passive: false
  90. durable: true // the exchange will survive server restarts
  91. auto_delete: false //the exchange won't be deleted once the channel is closed.
  92. */
  93. $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
  94. $channel->queue_bind($queue, $exchange);
  95. /**
  96. * @param \PhpAmqpLib\Message\AMQPMessage $message
  97. */
  98. $process_message = function ($message) use ($callback, $connection, $exchange, $queue) {
  99. Log::debug('received message', [
  100. 'message_id' => $message->get('message_id'),
  101. 'content_type' => $message->get('content_type')
  102. ]);
  103. if ($callback !== null) {
  104. try {
  105. $result = $callback(json_decode($message->body));
  106. if (\App\Tools\Tools::isStop()) {
  107. Log::debug('mq worker: .stop file exist. cancel the consumer.');
  108. $message->getChannel()->basic_cancel($message->getConsumerTag());
  109. }
  110. if ($result !== 0) {
  111. throw new \Exception('task error');
  112. }
  113. } catch (\Exception $e) {
  114. // push to issues
  115. Log::error('mq worker exception', ['exception' => $e]);
  116. $channelName = 'issues';
  117. $channelIssues = $connection->channel();
  118. $channelIssues->queue_declare($channelName, false, true, false, false);
  119. $msg = new AMQPMessage(json_encode([
  120. 'exchange' => $exchange,
  121. 'channel' => $queue,
  122. 'message' => json_decode($message->body),
  123. 'result' => 0,
  124. 'error' => $e,
  125. ], JSON_UNESCAPED_UNICODE));
  126. $channelIssues->basic_publish($msg, '', $channelName);
  127. $channelIssues->close();
  128. }
  129. }
  130. $message->ack();
  131. Log::debug(
  132. 'mq done',
  133. [
  134. 'message_id' => $message->get('message_id')
  135. ]
  136. );
  137. //exit
  138. foreach (config('mint.mq.loop_limit') as $key => $value) {
  139. if ($queue === $key) {
  140. if ($value > 0) {
  141. if (isset($GLOBALS[$key])) {
  142. $GLOBALS[$key]++;
  143. } else {
  144. $GLOBALS[$key] = 1;
  145. }
  146. if ($GLOBALS[$key] >= $value) {
  147. Log::debug('mq exit loop=' . $GLOBALS[$key]);
  148. $message->getChannel()->basic_cancel($message->getConsumerTag());
  149. }
  150. }
  151. }
  152. }
  153. // Send a message with the string "quit" to cancel the consumer.
  154. /*
  155. if ($message->body === 'quit') {
  156. $message->getChannel()->basic_cancel($message->getConsumerTag());
  157. }
  158. */
  159. };
  160. /*
  161. queue: Queue from where to get the messages
  162. consumer_tag: Consumer identifier
  163. no_local: Don't receive messages published by this consumer.
  164. no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
  165. exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
  166. nowait:
  167. callback: A PHP Callback
  168. */
  169. $channel->basic_consume($queue, $consumerTag, false, false, false, false, $process_message);
  170. /**
  171. * @param \PhpAmqpLib\Channel\AMQPChannel $channel
  172. * @param \PhpAmqpLib\Connection\AbstractConnection $connection
  173. */
  174. $shutdown = function ($channel, $connection) {
  175. $channel->close();
  176. $connection->close();
  177. };
  178. register_shutdown_function($shutdown, $channel, $connection);
  179. $timeout = 15;
  180. $deadline = time() + $timeout;
  181. // Loop as long as the channel has callbacks registered
  182. while ($channel->is_consuming()) {
  183. try {
  184. $channel->wait(null, false, $deadline - time());
  185. } catch (\Throwable $th) {
  186. //throw $th;
  187. }
  188. // do something else
  189. }
  190. }
  191. }