RabbitMQService.php 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. <?php
  2. namespace App\Services;
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Channel\AMQPChannel;
  5. use PhpAmqpLib\Message\AMQPMessage;
  6. use PhpAmqpLib\Wire\AMQPTable;
  7. use Illuminate\Support\Facades\Log;
  8. use Illuminate\Support\Str;
  9. use App\Exceptions\TaskFailException;
  10. class RabbitMQService
  11. {
  12. private $connection;
  13. private $channel;
  14. private $config;
  15. public function __construct()
  16. {
  17. $this->config = config('queue.connections.rabbitmq');
  18. $this->connect();
  19. }
  20. private function connect()
  21. {
  22. $conn = $this->config;
  23. $this->connection = new AMQPStreamConnection(
  24. $conn['host'],
  25. $conn['port'],
  26. $conn['user'],
  27. $conn['password'],
  28. $conn['virtual_host']
  29. );
  30. $this->channel = $this->connection->channel();
  31. // 设置 QoS - 每次只处理一条消息
  32. $this->channel->basic_qos(null, 1, null);
  33. }
  34. public function getChannel(): AMQPChannel
  35. {
  36. return $this->channel;
  37. }
  38. public function setupQueue(string $queueName): void
  39. {
  40. $queueConfig = config("mint.rabbitmq.queues.{$queueName}");
  41. $workerArgs = [];
  42. if (isset($queueConfig['ttl'])) {
  43. $workerArgs['x-message-ttl'] = $queueConfig['ttl'];
  44. }
  45. if (isset($queueConfig['max_length'])) {
  46. $workerArgs['x-max-length'] = $queueConfig['max_length'];
  47. }
  48. // 创建死信交换机
  49. if (isset($queueConfig['dead_letter_exchange'])) {
  50. $this->channel->exchange_declare(
  51. $queueConfig['dead_letter_exchange'],
  52. 'direct',
  53. false,
  54. true,
  55. false
  56. );
  57. $dlqName = $queueConfig['dead_letter_queue'];
  58. $dlqConfig = config("mint.rabbitmq.dead_letter_queues.{$dlqName}", []);
  59. $dlqArgs = [];
  60. if (isset($dlqConfig['ttl'])) {
  61. $dlqArgs['x-message-ttl'] = $dlqConfig['ttl'];
  62. }
  63. if (isset($dlqConfig['max_length'])) {
  64. $dlqArgs['x-max-length'] = $dlqConfig['max_length'];
  65. }
  66. $dlqArguments = new AMQPTable($dlqArgs);
  67. // 创建死信队列
  68. $this->channel->queue_declare(
  69. $dlqName,
  70. false, // passive
  71. true, // durable
  72. false, // exclusive
  73. false, // auto_delete
  74. false, // nowait
  75. $dlqArguments
  76. );
  77. // 绑定死信队列到死信交换机
  78. $this->channel->queue_bind(
  79. $queueConfig['dead_letter_queue'],
  80. $queueConfig['dead_letter_exchange']
  81. );
  82. // 主队列,配置死信
  83. $workerArgs['x-dead-letter-exchange'] = $queueConfig['dead_letter_exchange'];
  84. $workerArgs['x-dead-letter-routing-key'] = $queueConfig['dead_letter_queue'];
  85. }
  86. $arguments = new AMQPTable($workerArgs);
  87. $this->channel->queue_declare(
  88. $queueName,
  89. false, // passive
  90. true, // durable
  91. false, // exclusive
  92. false, // auto_delete
  93. false, // nowait
  94. $arguments
  95. );
  96. /*
  97. // 检查队列是否存在
  98. try {
  99. $this->channel->queue_declare(
  100. $queueName,
  101. true, // passive
  102. true, // durable
  103. false, // exclusive
  104. false, // auto_delete
  105. false, // nowait
  106. $arguments
  107. );
  108. //存在,不用创建
  109. } catch (\Exception $e) {
  110. if (strpos($e->getMessage(), 'NOT_FOUND') !== false) {
  111. Log::info("Queue $queueName does not exist.");
  112. // 队列不存在,创建新队列
  113. $this->channel->queue_declare(
  114. $queueName,
  115. false, // passive
  116. true, // durable
  117. false, // exclusive
  118. false, // auto_delete
  119. false, // nowait
  120. $arguments
  121. );
  122. } else {
  123. Log::error("Error checking queue $queueName: {$e->getMessage()}");
  124. throw $e;
  125. }
  126. }
  127. */
  128. }
  129. public function publishMessage(string $queueName, array $data): bool
  130. {
  131. try {
  132. $this->setupQueue($queueName);
  133. $msgId = Str::uuid();
  134. $message = new AMQPMessage(
  135. json_encode($data, JSON_UNESCAPED_UNICODE),
  136. [
  137. 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
  138. 'timestamp' => time(),
  139. 'message_id' => $msgId,
  140. "content_type" => 'application/json; charset=utf-8'
  141. ]
  142. );
  143. $this->channel->basic_publish($message, '', $queueName);
  144. Log::info("Message published to queue: {$queueName} msg id={$msgId}");
  145. return true;
  146. } catch (\Exception $e) {
  147. Log::error("Failed to publish message to queue: {$queueName}", [
  148. 'error' => $e->getMessage(),
  149. ]);
  150. throw $e;
  151. }
  152. }
  153. public function consume(string $queueName, callable $callback, int $maxIterations = null): void
  154. {
  155. $this->setupQueue($queueName);
  156. $maxIterations = $maxIterations ?? $this->config['consumer']['max_iterations'];
  157. $iteration = 0;
  158. $consumerCallback = function (AMQPMessage $msg) use ($callback, $queueName, &$iteration) {
  159. try {
  160. $data = json_decode($msg->getBody(), true);
  161. $retryCount = $this->getRetryCount($msg);
  162. $maxRetries = $this->config['queues'][$queueName]['retry_count'];
  163. Log::info("Processing message from queue: {$queueName}", [
  164. 'data' => $data,
  165. 'retry_count' => $retryCount,
  166. 'delivery_tag' => $msg->getDeliveryTag()
  167. ]);
  168. // 执行回调处理消息
  169. $callback($data, $retryCount);
  170. // 处理成功,确认消息
  171. $msg->ack();
  172. Log::info("Message processed successfully", ['delivery_tag' => $msg->getDeliveryTag()]);
  173. } catch (TaskFailException $e) {
  174. //no need requeue
  175. Log::error("Error processing message", [
  176. 'error' => $e->getMessage(),
  177. 'delivery_tag' => $msg->getDeliveryTag()
  178. ]);
  179. $msg->nack(false, false);
  180. } catch (\Exception $e) {
  181. // 处理失败,检查重试次数
  182. if ($retryCount < $maxRetries) {
  183. // 重新入队,延迟处理
  184. $this->requeueWithDelay($msg, $queueName, $retryCount + 1);
  185. Log::warning("Message requeued for retry", [
  186. 'delivery_tag' => $msg->getDeliveryTag(),
  187. 'retry_count' => $retryCount + 1
  188. ]);
  189. } else {
  190. // 超过重试次数,拒绝消息(进入死信队列)
  191. $msg->nack(false, false);
  192. Log::error("Message rejected after max retries", [
  193. 'delivery_tag' => $msg->getDeliveryTag(),
  194. 'retry_count' => $retryCount
  195. ]);
  196. }
  197. }
  198. $iteration++;
  199. };
  200. $this->channel->basic_qos(null, 1, null);
  201. $this->channel->basic_consume($queueName, '', false, false, false, false, $consumerCallback);
  202. Log::info("Starting consumer for queue: {$queueName}", ['max_iterations' => $maxIterations]);
  203. while ($this->channel->is_consuming() && $iteration < $maxIterations) {
  204. $this->channel->wait(null, false, $this->config['consumer']['sleep_between_iterations']);
  205. }
  206. Log::info("Consumer stopped", ['iterations_processed' => $iteration]);
  207. }
  208. private function getRetryCount(AMQPMessage $msg): int
  209. {
  210. $headers = $msg->get_properties();
  211. return isset($headers['application_headers']['x-retry-count'])
  212. ? $headers['application_headers']['x-retry-count'] : 0;
  213. }
  214. private function requeueWithDelay(AMQPMessage $msg, string $queueName, int $retryCount): void
  215. {
  216. $delay = $this->config['queues'][$queueName]['retry_delay'];
  217. // 创建延迟队列
  218. $delayQueueName = "{$queueName}_delay_{$retryCount}";
  219. $arguments = new AMQPTable([
  220. 'x-message-ttl' => $delay,
  221. 'x-dead-letter-exchange' => '',
  222. 'x-dead-letter-routing-key' => $queueName,
  223. ]);
  224. $this->channel->queue_declare(
  225. $delayQueueName,
  226. false,
  227. true,
  228. false,
  229. false,
  230. false,
  231. $arguments
  232. );
  233. // 重新发布消息到延迟队列
  234. $data = json_decode($msg->getBody(), true);
  235. $newMessage = new AMQPMessage(
  236. json_encode($data),
  237. [
  238. 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
  239. 'application_headers' => new AMQPTable([
  240. 'x-retry-count' => $retryCount
  241. ])
  242. ]
  243. );
  244. $this->channel->basic_publish($newMessage, '', $delayQueueName);
  245. $msg->ack();
  246. }
  247. public function close(): void
  248. {
  249. if ($this->channel) {
  250. $this->channel->close();
  251. }
  252. if ($this->connection) {
  253. $this->connection->close();
  254. }
  255. }
  256. }