RabbitMQService.php 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. <?php
  2. namespace App\Services;
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Channel\AMQPChannel;
  5. use PhpAmqpLib\Message\AMQPMessage;
  6. use PhpAmqpLib\Wire\AMQPTable;
  7. use Illuminate\Support\Facades\Log;
  8. use Illuminate\Support\Str;
  9. class RabbitMQService
  10. {
  11. private $connection;
  12. private $channel;
  13. private $config;
  14. public function __construct()
  15. {
  16. $this->config = config('queue.connections.rabbitmq');
  17. $this->connect();
  18. }
  19. private function connect()
  20. {
  21. $conn = $this->config;
  22. $this->connection = new AMQPStreamConnection(
  23. $conn['host'],
  24. $conn['port'],
  25. $conn['user'],
  26. $conn['password'],
  27. $conn['virtual_host']
  28. );
  29. $this->channel = $this->connection->channel();
  30. // 设置 QoS - 每次只处理一条消息
  31. $this->channel->basic_qos(null, 1, null);
  32. }
  33. public function getChannel(): AMQPChannel
  34. {
  35. return $this->channel;
  36. }
  37. public function setupQueue(string $queueName): void
  38. {
  39. $queueConfig = config("mint.rabbitmq.queues.{$queueName}");
  40. $workerArgs = [];
  41. if (isset($queueConfig['ttl'])) {
  42. $workerArgs['x-message-ttl'] = $queueConfig['ttl'];
  43. }
  44. if (isset($queueConfig['max_length'])) {
  45. $workerArgs['x-max-length'] = $queueConfig['max_length'];
  46. }
  47. // 创建死信交换机
  48. if (isset($queueConfig['dead_letter_exchange'])) {
  49. $this->channel->exchange_declare(
  50. $queueConfig['dead_letter_exchange'],
  51. 'direct',
  52. false,
  53. true,
  54. false
  55. );
  56. $dlqName = $queueConfig['dead_letter_queue'];
  57. $dlqConfig = config("mint.rabbitmq.dead_letter_queues.{$dlqName}", []);
  58. $dlqArgs = [];
  59. if (isset($dlqConfig['ttl'])) {
  60. $dlqArgs['x-message-ttl'] = $dlqConfig['ttl'];
  61. }
  62. if (isset($dlqConfig['max_length'])) {
  63. $dlqArgs['x-max-length'] = $dlqConfig['max_length'];
  64. }
  65. $dlqArguments = new AMQPTable($dlqArgs);
  66. // 创建死信队列
  67. $this->channel->queue_declare(
  68. $dlqName,
  69. false, // passive
  70. true, // durable
  71. false, // exclusive
  72. false, // auto_delete
  73. false, // nowait
  74. $dlqArguments
  75. );
  76. // 绑定死信队列到死信交换机
  77. $this->channel->queue_bind(
  78. $queueConfig['dead_letter_queue'],
  79. $queueConfig['dead_letter_exchange']
  80. );
  81. // 主队列,配置死信
  82. $workerArgs['x-dead-letter-exchange'] = $queueConfig['dead_letter_exchange'];
  83. $workerArgs['x-dead-letter-routing-key'] = $queueConfig['dead_letter_queue'];
  84. }
  85. $arguments = new AMQPTable($workerArgs);
  86. // 检查队列是否存在
  87. try {
  88. $this->channel->queue_declare(
  89. $queueName,
  90. true, // passive
  91. true, // durable
  92. false, // exclusive
  93. false, // auto_delete
  94. false, // nowait
  95. $arguments
  96. );
  97. //存在,不用创建
  98. } catch (\Exception $e) {
  99. if (strpos($e->getMessage(), 'NOT_FOUND') !== false) {
  100. Log::info("Queue $queueName does not exist.");
  101. // 队列不存在,创建新队列
  102. $this->channel->queue_declare(
  103. $queueName,
  104. false, // passive
  105. true, // durable
  106. false, // exclusive
  107. false, // auto_delete
  108. false, // nowait
  109. $arguments
  110. );
  111. } else {
  112. Log::error("Error checking queue $queueName: {$e->getMessage()}");
  113. throw $e;
  114. }
  115. }
  116. }
  117. public function publishMessage(string $queueName, array $data): bool
  118. {
  119. try {
  120. $this->setupQueue($queueName);
  121. $msgId = Str::uuid();
  122. $message = new AMQPMessage(
  123. json_encode($data, JSON_UNESCAPED_UNICODE),
  124. [
  125. 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
  126. 'timestamp' => time(),
  127. 'message_id' => $msgId,
  128. "content_type" => 'application/json; charset=utf-8'
  129. ]
  130. );
  131. $this->channel->basic_publish($message, '', $queueName);
  132. Log::info("Message published to queue: {$queueName} msg id={$msgId}");
  133. return true;
  134. } catch (\Exception $e) {
  135. Log::error("Failed to publish message to queue: {$queueName}", [
  136. 'error' => $e->getMessage(),
  137. ]);
  138. throw $e;
  139. }
  140. }
  141. public function consume(string $queueName, callable $callback, int $maxIterations = null): void
  142. {
  143. $this->setupQueue($queueName);
  144. $maxIterations = $maxIterations ?? $this->config['consumer']['max_iterations'];
  145. $iteration = 0;
  146. $consumerCallback = function (AMQPMessage $msg) use ($callback, $queueName, &$iteration) {
  147. try {
  148. $data = json_decode($msg->getBody(), true);
  149. $retryCount = $this->getRetryCount($msg);
  150. $maxRetries = $this->config['queues'][$queueName]['retry_count'];
  151. Log::info("Processing message from queue: {$queueName}", [
  152. 'data' => $data,
  153. 'retry_count' => $retryCount,
  154. 'delivery_tag' => $msg->getDeliveryTag()
  155. ]);
  156. // 执行回调处理消息
  157. $result = $callback($data, $retryCount);
  158. if ($result === true) {
  159. // 处理成功,确认消息
  160. $msg->ack();
  161. Log::info("Message processed successfully", ['delivery_tag' => $msg->getDeliveryTag()]);
  162. } else {
  163. // 处理失败,检查重试次数
  164. if ($retryCount < $maxRetries) {
  165. // 重新入队,延迟处理
  166. $this->requeueWithDelay($msg, $queueName, $retryCount + 1);
  167. Log::warning("Message requeued for retry", [
  168. 'delivery_tag' => $msg->getDeliveryTag(),
  169. 'retry_count' => $retryCount + 1
  170. ]);
  171. } else {
  172. // 超过重试次数,拒绝消息(进入死信队列)
  173. $msg->nack(false, false);
  174. Log::error("Message rejected after max retries", [
  175. 'delivery_tag' => $msg->getDeliveryTag(),
  176. 'retry_count' => $retryCount
  177. ]);
  178. }
  179. }
  180. } catch (\Exception $e) {
  181. Log::error("Error processing message", [
  182. 'error' => $e->getMessage(),
  183. 'delivery_tag' => $msg->getDeliveryTag()
  184. ]);
  185. $msg->nack(false, false);
  186. }
  187. $iteration++;
  188. };
  189. $this->channel->basic_qos(null, 1, null);
  190. $this->channel->basic_consume($queueName, '', false, false, false, false, $consumerCallback);
  191. Log::info("Starting consumer for queue: {$queueName}", ['max_iterations' => $maxIterations]);
  192. while ($this->channel->is_consuming() && $iteration < $maxIterations) {
  193. $this->channel->wait(null, false, $this->config['consumer']['sleep_between_iterations']);
  194. }
  195. Log::info("Consumer stopped", ['iterations_processed' => $iteration]);
  196. }
  197. private function getRetryCount(AMQPMessage $msg): int
  198. {
  199. $headers = $msg->get_properties();
  200. return isset($headers['application_headers']['x-retry-count'])
  201. ? $headers['application_headers']['x-retry-count'] : 0;
  202. }
  203. private function requeueWithDelay(AMQPMessage $msg, string $queueName, int $retryCount): void
  204. {
  205. $delay = $this->config['queues'][$queueName]['retry_delay'];
  206. // 创建延迟队列
  207. $delayQueueName = "{$queueName}_delay_{$retryCount}";
  208. $arguments = new AMQPTable([
  209. 'x-message-ttl' => $delay,
  210. 'x-dead-letter-exchange' => '',
  211. 'x-dead-letter-routing-key' => $queueName,
  212. ]);
  213. $this->channel->queue_declare(
  214. $delayQueueName,
  215. false,
  216. true,
  217. false,
  218. false,
  219. false,
  220. $arguments
  221. );
  222. // 重新发布消息到延迟队列
  223. $data = json_decode($msg->getBody(), true);
  224. $newMessage = new AMQPMessage(
  225. json_encode($data),
  226. [
  227. 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
  228. 'application_headers' => new AMQPTable([
  229. 'x-retry-count' => $retryCount
  230. ])
  231. ]
  232. );
  233. $this->channel->basic_publish($newMessage, '', $delayQueueName);
  234. $msg->ack();
  235. }
  236. public function close(): void
  237. {
  238. if ($this->channel) {
  239. $this->channel->close();
  240. }
  241. if ($this->connection) {
  242. $this->connection->close();
  243. }
  244. }
  245. }