RabbitMQService.php 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. <?php
  2. namespace App\Services;
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Channel\AMQPChannel;
  5. use PhpAmqpLib\Message\AMQPMessage;
  6. use PhpAmqpLib\Wire\AMQPTable;
  7. use Illuminate\Support\Facades\Log;
  8. class RabbitMQService
  9. {
  10. private $connection;
  11. private $channel;
  12. private $config;
  13. public function __construct()
  14. {
  15. $this->config = config('queue.connections.rabbitmq');
  16. $this->connect();
  17. }
  18. private function connect()
  19. {
  20. $conn = $this->config;
  21. $this->connection = new AMQPStreamConnection(
  22. $conn['host'],
  23. $conn['port'],
  24. $conn['user'],
  25. $conn['password'],
  26. $conn['virtual_host']
  27. );
  28. $this->channel = $this->connection->channel();
  29. // 设置 QoS - 每次只处理一条消息
  30. $this->channel->basic_qos(null, 1, null);
  31. }
  32. public function getChannel(): AMQPChannel
  33. {
  34. return $this->channel;
  35. }
  36. public function setupQueue(string $queueName): void
  37. {
  38. $queueConfig = config("mint.rabbitmq.queues.{$queueName}");
  39. // 创建死信交换机
  40. $this->channel->exchange_declare(
  41. $queueConfig['dead_letter_exchange'],
  42. 'direct',
  43. false,
  44. true,
  45. false
  46. );
  47. $dlqName = $queueConfig['dead_letter_queue'];
  48. $dlqConfig = config("mint.rabbitmq.dead_letter_queues.{$dlqName}", []);
  49. $dlqArgs = [];
  50. if (isset($dlqConfig['ttl'])) {
  51. $dlqArgs['x-message-ttl'] = $dlqConfig['ttl'];
  52. }
  53. if (isset($dlqConfig['max_length'])) {
  54. $dlqArgs['x-max-length'] = $dlqConfig['max_length'];
  55. }
  56. $dlqArguments = new AMQPTable($dlqArgs);
  57. // 创建死信队列
  58. $this->channel->queue_declare(
  59. $dlqName,
  60. false, // passive
  61. true, // durable
  62. false, // exclusive
  63. false, // auto_delete
  64. false, // nowait
  65. $dlqArguments
  66. );
  67. // 绑定死信队列到死信交换机
  68. $this->channel->queue_bind(
  69. $queueConfig['dead_letter_queue'],
  70. $queueConfig['dead_letter_exchange']
  71. );
  72. // 创建主队列,配置死信
  73. $arguments = new AMQPTable([
  74. 'x-dead-letter-exchange' => $queueConfig['dead_letter_exchange'],
  75. 'x-dead-letter-routing-key' => $queueConfig['dead_letter_queue'], // 死信路由键
  76. ]);
  77. $this->channel->queue_declare(
  78. $queueName,
  79. false, // passive
  80. true, // durable
  81. false, // exclusive
  82. false, // auto_delete
  83. false, // nowait
  84. $arguments
  85. );
  86. }
  87. public function publishMessage(string $queueName, array $data): bool
  88. {
  89. try {
  90. $this->setupQueue($queueName);
  91. $message = new AMQPMessage(
  92. json_encode($data, JSON_UNESCAPED_UNICODE),
  93. [
  94. 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
  95. 'timestamp' => time(),
  96. 'message_id' => uniqid(),
  97. "content_type" => 'application/json; charset=utf-8'
  98. ]
  99. );
  100. $this->channel->basic_publish($message, '', $queueName);
  101. Log::info("Message published to queue: {$queueName}", $data);
  102. return true;
  103. } catch (\Exception $e) {
  104. Log::error("Failed to publish message to queue: {$queueName}", [
  105. 'error' => $e->getMessage(),
  106. 'data' => $data
  107. ]);
  108. return false;
  109. }
  110. }
  111. public function consume(string $queueName, callable $callback, int $maxIterations = null): void
  112. {
  113. $this->setupQueue($queueName);
  114. $maxIterations = $maxIterations ?? $this->config['consumer']['max_iterations'];
  115. $iteration = 0;
  116. $consumerCallback = function (AMQPMessage $msg) use ($callback, $queueName, &$iteration) {
  117. try {
  118. $data = json_decode($msg->getBody(), true);
  119. $retryCount = $this->getRetryCount($msg);
  120. $maxRetries = $this->config['queues'][$queueName]['retry_count'];
  121. Log::info("Processing message from queue: {$queueName}", [
  122. 'data' => $data,
  123. 'retry_count' => $retryCount,
  124. 'delivery_tag' => $msg->getDeliveryTag()
  125. ]);
  126. // 执行回调处理消息
  127. $result = $callback($data, $retryCount);
  128. if ($result === true) {
  129. // 处理成功,确认消息
  130. $msg->ack();
  131. Log::info("Message processed successfully", ['delivery_tag' => $msg->getDeliveryTag()]);
  132. } else {
  133. // 处理失败,检查重试次数
  134. if ($retryCount < $maxRetries) {
  135. // 重新入队,延迟处理
  136. $this->requeueWithDelay($msg, $queueName, $retryCount + 1);
  137. Log::warning("Message requeued for retry", [
  138. 'delivery_tag' => $msg->getDeliveryTag(),
  139. 'retry_count' => $retryCount + 1
  140. ]);
  141. } else {
  142. // 超过重试次数,拒绝消息(进入死信队列)
  143. $msg->nack(false, false);
  144. Log::error("Message rejected after max retries", [
  145. 'delivery_tag' => $msg->getDeliveryTag(),
  146. 'retry_count' => $retryCount
  147. ]);
  148. }
  149. }
  150. } catch (\Exception $e) {
  151. Log::error("Error processing message", [
  152. 'error' => $e->getMessage(),
  153. 'delivery_tag' => $msg->getDeliveryTag()
  154. ]);
  155. $msg->nack(false, false);
  156. }
  157. $iteration++;
  158. };
  159. $this->channel->basic_qos(null, 1, null);
  160. $this->channel->basic_consume($queueName, '', false, false, false, false, $consumerCallback);
  161. Log::info("Starting consumer for queue: {$queueName}", ['max_iterations' => $maxIterations]);
  162. while ($this->channel->is_consuming() && $iteration < $maxIterations) {
  163. $this->channel->wait(null, false, $this->config['consumer']['sleep_between_iterations']);
  164. }
  165. Log::info("Consumer stopped", ['iterations_processed' => $iteration]);
  166. }
  167. private function getRetryCount(AMQPMessage $msg): int
  168. {
  169. $headers = $msg->get_properties();
  170. return isset($headers['application_headers']['x-retry-count'])
  171. ? $headers['application_headers']['x-retry-count'] : 0;
  172. }
  173. private function requeueWithDelay(AMQPMessage $msg, string $queueName, int $retryCount): void
  174. {
  175. $delay = $this->config['queues'][$queueName]['retry_delay'];
  176. // 创建延迟队列
  177. $delayQueueName = "{$queueName}_delay_{$retryCount}";
  178. $arguments = new AMQPTable([
  179. 'x-message-ttl' => $delay,
  180. 'x-dead-letter-exchange' => '',
  181. 'x-dead-letter-routing-key' => $queueName,
  182. ]);
  183. $this->channel->queue_declare(
  184. $delayQueueName,
  185. false,
  186. true,
  187. false,
  188. false,
  189. false,
  190. $arguments
  191. );
  192. // 重新发布消息到延迟队列
  193. $data = json_decode($msg->getBody(), true);
  194. $newMessage = new AMQPMessage(
  195. json_encode($data),
  196. [
  197. 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
  198. 'application_headers' => new AMQPTable([
  199. 'x-retry-count' => $retryCount
  200. ])
  201. ]
  202. );
  203. $this->channel->basic_publish($newMessage, '', $delayQueueName);
  204. $msg->ack();
  205. }
  206. public function close(): void
  207. {
  208. if ($this->channel) {
  209. $this->channel->close();
  210. }
  211. if ($this->connection) {
  212. $this->connection->close();
  213. }
  214. }
  215. }