2
0

RabbitMQService.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. <?php
  2. namespace App\Services;
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Channel\AMQPChannel;
  5. use PhpAmqpLib\Message\AMQPMessage;
  6. use PhpAmqpLib\Wire\AMQPTable;
  7. use Illuminate\Support\Facades\Log;
  8. use Illuminate\Support\Str;
  9. use App\Exceptions\TaskFailException;
  10. class RabbitMQService
  11. {
  12. private $connection;
  13. private $channel;
  14. private array $config;
  15. private array $queues;
  16. public function __construct()
  17. {
  18. $this->config = config('queue.connections.rabbitmq');
  19. $this->queues = config('mint.rabbitmq.queues');
  20. $this->connect();
  21. }
  22. private function connect()
  23. {
  24. $conn = $this->config;
  25. $this->connection = new AMQPStreamConnection(
  26. $conn['host'],
  27. $conn['port'],
  28. $conn['user'],
  29. $conn['password'],
  30. $conn['virtual_host']
  31. );
  32. $this->channel = $this->connection->channel();
  33. // 设置 QoS - 每次只处理一条消息
  34. $this->channel->basic_qos(null, 1, null);
  35. }
  36. public function isConnected()
  37. {
  38. return $this->connection->isConnected();
  39. }
  40. public function getChannel(): AMQPChannel
  41. {
  42. return $this->channel;
  43. }
  44. public function createQueue(): array
  45. {
  46. foreach ($this->queues as $name => $queue) {
  47. $args = [];
  48. // TTL
  49. if (!empty($queue['ttl'])) {
  50. $args['x-message-ttl'] = (int) $queue['ttl'];
  51. }
  52. // max length
  53. if (!empty($queue['max_length'])) {
  54. $args['x-max-length'] = (int) $queue['max_length'];
  55. }
  56. // dead letter exchange
  57. if (!empty($queue['dead_letter_exchange'])) {
  58. $args['x-dead-letter-exchange'] = $queue['dead_letter_exchange'];
  59. }
  60. // dead letter routing key(可选但建议)
  61. if (!empty($queue['dead_letter_queue'])) {
  62. $args['x-dead-letter-routing-key'] = $queue['dead_letter_queue'];
  63. }
  64. $this->channel->queue_declare(
  65. $name, // queue 名称
  66. false, // passive:检查是否存在(false=不存在则创建)
  67. true, // durable:是否持久化(重启 RabbitMQ 后仍存在)
  68. false, // exclusive:独占模式
  69. false, // auto_delete:是否在无消费者时自动删除
  70. false, // nowait:是否不等待服务器响应(false=阻塞等待确认)
  71. new AMQPTable($args) // arguments:附加参数(TTL / DLX / max-length 等)
  72. );
  73. }
  74. return array_keys($this->queues);
  75. }
  76. public function setupQueue(string $queueName): void
  77. {
  78. $queueConfig = config("mint.rabbitmq.queues.{$queueName}");
  79. $workerArgs = [];
  80. if (isset($queueConfig['ttl'])) {
  81. $workerArgs['x-message-ttl'] = $queueConfig['ttl'];
  82. }
  83. if (isset($queueConfig['max_length'])) {
  84. $workerArgs['x-max-length'] = $queueConfig['max_length'];
  85. }
  86. // 创建死信交换机
  87. if (isset($queueConfig['dead_letter_exchange'])) {
  88. $this->channel->exchange_declare(
  89. $queueConfig['dead_letter_exchange'],
  90. 'direct',
  91. false,
  92. true,
  93. false
  94. );
  95. $dlqName = $queueConfig['dead_letter_queue'];
  96. $dlqConfig = config("mint.rabbitmq.dead_letter_queues.{$dlqName}", []);
  97. $dlqArgs = [];
  98. if (isset($dlqConfig['ttl'])) {
  99. $dlqArgs['x-message-ttl'] = $dlqConfig['ttl'];
  100. }
  101. if (isset($dlqConfig['max_length'])) {
  102. $dlqArgs['x-max-length'] = $dlqConfig['max_length'];
  103. }
  104. $dlqArguments = new AMQPTable($dlqArgs);
  105. // 创建死信队列
  106. $this->channel->queue_declare(
  107. $dlqName,
  108. false, // passive
  109. true, // durable
  110. false, // exclusive
  111. false, // auto_delete
  112. false, // nowait
  113. $dlqArguments
  114. );
  115. // 绑定死信队列到死信交换机
  116. $this->channel->queue_bind(
  117. $queueConfig['dead_letter_queue'],
  118. $queueConfig['dead_letter_exchange']
  119. );
  120. // 主队列,配置死信
  121. $workerArgs['x-dead-letter-exchange'] = $queueConfig['dead_letter_exchange'];
  122. $workerArgs['x-dead-letter-routing-key'] = $queueConfig['dead_letter_queue'];
  123. }
  124. $arguments = new AMQPTable($workerArgs);
  125. $this->channel->queue_declare(
  126. $queueName,
  127. false, // passive
  128. true, // durable
  129. false, // exclusive
  130. false, // auto_delete
  131. false, // nowait
  132. $arguments
  133. );
  134. /*
  135. // 检查队列是否存在
  136. try {
  137. $this->channel->queue_declare(
  138. $queueName,
  139. true, // passive
  140. true, // durable
  141. false, // exclusive
  142. false, // auto_delete
  143. false, // nowait
  144. $arguments
  145. );
  146. //存在,不用创建
  147. } catch (\Exception $e) {
  148. if (strpos($e->getMessage(), 'NOT_FOUND') !== false) {
  149. Log::info("Queue $queueName does not exist.");
  150. // 队列不存在,创建新队列
  151. $this->channel->queue_declare(
  152. $queueName,
  153. false, // passive
  154. true, // durable
  155. false, // exclusive
  156. false, // auto_delete
  157. false, // nowait
  158. $arguments
  159. );
  160. } else {
  161. Log::error("Error checking queue $queueName: {$e->getMessage()}");
  162. throw $e;
  163. }
  164. }
  165. */
  166. }
  167. public function publishMessage(string $queueName, array $data): string
  168. {
  169. try {
  170. $this->setupQueue($queueName);
  171. $msgId = Str::uuid();
  172. $message = new AMQPMessage(
  173. json_encode($data, JSON_UNESCAPED_UNICODE),
  174. [
  175. 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
  176. 'timestamp' => time(),
  177. 'message_id' => $msgId,
  178. "content_type" => 'application/json; charset=utf-8'
  179. ]
  180. );
  181. $this->channel->basic_publish($message, '', $queueName);
  182. Log::info("Message published to queue: {$queueName} msg id={$msgId}");
  183. return $msgId;
  184. } catch (\Exception $e) {
  185. Log::error("Failed to publish message to queue: {$queueName}", [
  186. 'error' => $e->getMessage(),
  187. ]);
  188. throw $e;
  189. }
  190. }
  191. public function consume(string $queueName, callable $callback, int $maxIterations = null): void
  192. {
  193. $this->setupQueue($queueName);
  194. $maxIterations = $maxIterations ?? $this->config['consumer']['max_iterations'];
  195. $iteration = 0;
  196. $consumerCallback = function (AMQPMessage $msg) use ($callback, $queueName, &$iteration) {
  197. try {
  198. $data = json_decode($msg->getBody(), true);
  199. $retryCount = $this->getRetryCount($msg);
  200. $maxRetries = $this->config['queues'][$queueName]['retry_count'];
  201. Log::info("Processing message from queue: {$queueName}", [
  202. 'data' => $data,
  203. 'retry_count' => $retryCount,
  204. 'delivery_tag' => $msg->getDeliveryTag()
  205. ]);
  206. // 执行回调处理消息
  207. $callback($data, $retryCount);
  208. // 处理成功,确认消息
  209. $msg->ack();
  210. Log::info("Message processed successfully", ['delivery_tag' => $msg->getDeliveryTag()]);
  211. } catch (TaskFailException $e) {
  212. //no need requeue
  213. Log::error("Error processing message", [
  214. 'error' => $e->getMessage(),
  215. 'delivery_tag' => $msg->getDeliveryTag()
  216. ]);
  217. $msg->nack(false, false);
  218. } catch (\Exception $e) {
  219. // 处理失败,检查重试次数
  220. if ($retryCount < $maxRetries) {
  221. // 重新入队,延迟处理
  222. $this->requeueWithDelay($msg, $queueName, $retryCount + 1);
  223. Log::warning("Message requeued for retry", [
  224. 'delivery_tag' => $msg->getDeliveryTag(),
  225. 'retry_count' => $retryCount + 1
  226. ]);
  227. } else {
  228. // 超过重试次数,拒绝消息(进入死信队列)
  229. $msg->nack(false, false);
  230. Log::error("Message rejected after max retries", [
  231. 'delivery_tag' => $msg->getDeliveryTag(),
  232. 'retry_count' => $retryCount
  233. ]);
  234. }
  235. }
  236. $iteration++;
  237. };
  238. $this->channel->basic_qos(null, 1, null);
  239. $this->channel->basic_consume($queueName, '', false, false, false, false, $consumerCallback);
  240. Log::info("Starting consumer for queue: {$queueName}", ['max_iterations' => $maxIterations]);
  241. while ($this->channel->is_consuming() && $iteration < $maxIterations) {
  242. $this->channel->wait(null, false, $this->config['consumer']['sleep_between_iterations']);
  243. }
  244. Log::info("Consumer stopped", ['iterations_processed' => $iteration]);
  245. }
  246. private function getRetryCount(AMQPMessage $msg): int
  247. {
  248. $headers = $msg->get_properties();
  249. return isset($headers['application_headers']['x-retry-count'])
  250. ? $headers['application_headers']['x-retry-count'] : 0;
  251. }
  252. private function requeueWithDelay(AMQPMessage $msg, string $queueName, int $retryCount): void
  253. {
  254. $delay = $this->config['queues'][$queueName]['retry_delay'];
  255. // 创建延迟队列
  256. $delayQueueName = "{$queueName}_delay_{$retryCount}";
  257. $arguments = new AMQPTable([
  258. 'x-message-ttl' => $delay,
  259. 'x-dead-letter-exchange' => '',
  260. 'x-dead-letter-routing-key' => $queueName,
  261. ]);
  262. $this->channel->queue_declare(
  263. $delayQueueName,
  264. false,
  265. true,
  266. false,
  267. false,
  268. false,
  269. $arguments
  270. );
  271. // 重新发布消息到延迟队列
  272. $data = json_decode($msg->getBody(), true);
  273. $newMessage = new AMQPMessage(
  274. json_encode($data),
  275. [
  276. 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
  277. 'application_headers' => new AMQPTable([
  278. 'x-retry-count' => $retryCount
  279. ])
  280. ]
  281. );
  282. $this->channel->basic_publish($newMessage, '', $delayQueueName);
  283. $msg->ack();
  284. }
  285. public function close(): void
  286. {
  287. if ($this->channel) {
  288. $this->channel->close();
  289. }
  290. if ($this->connection) {
  291. $this->connection->close();
  292. }
  293. }
  294. }