RabbitMQWorker.php 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. <?php
  2. namespace App\Console\Commands;
  3. use Illuminate\Console\Command;
  4. use PhpAmqpLib\Connection\AMQPStreamConnection;
  5. use PhpAmqpLib\Message\AMQPMessage;
  6. use PhpAmqpLib\Channel\AMQPChannel;
  7. use App\Jobs\ProcessAITranslateJob;
  8. use App\Jobs\BaseRabbitMQJob;
  9. use Illuminate\Support\Facades\Log;
  10. use PhpAmqpLib\Exception\AMQPTimeoutException;
  11. class RabbitMQWorker extends Command
  12. {
  13. /**
  14. * The name and signature of the console command.
  15. * php artisan rabbitmq:consume ai_translate
  16. * @var string
  17. */
  18. protected $signature = 'rabbitmq:consume {queue} {--reset-loop-count}';
  19. protected $description = '消费 RabbitMQ 队列消息';
  20. private $connection;
  21. private $channel;
  22. private $processedCount = 0;
  23. private $maxLoopCount = 0;
  24. private $queueName;
  25. private $queueConfig;
  26. private $shouldStop = false;
  27. private $timeout = 15;
  28. public function handle()
  29. {
  30. $this->queueName = $this->argument('queue');
  31. $this->queueConfig = config("mint.rabbitmq.queues.{$this->queueName}");
  32. if (!$this->queueConfig) {
  33. $this->error("队列 {$this->queueName} 的配置不存在");
  34. return 1;
  35. }
  36. $this->maxLoopCount = $this->queueConfig['max_loop_count'];
  37. $this->info("启动 RabbitMQ Worker");
  38. $this->info("队列: {$this->queueName}");
  39. $this->info("最大循环次数: {$this->maxLoopCount}");
  40. $this->info("重试次数: {$this->queueConfig['retry_times']}");
  41. try {
  42. $this->setupConnection();
  43. $this->setupQueues();
  44. $this->startConsuming();
  45. } catch (\Exception $e) {
  46. $this->error("Worker 启动失败: " . $e->getMessage());
  47. Log::error("RabbitMQ Worker 启动失败", [
  48. 'queue' => $this->queueName,
  49. 'error' => $e->getMessage()
  50. ]);
  51. return 1;
  52. } finally {
  53. $this->cleanup();
  54. }
  55. return 0;
  56. }
  57. private function setupConnection()
  58. {
  59. $config = config('queue.connections.rabbitmq');
  60. $this->connection = new AMQPStreamConnection(
  61. $config['host'],
  62. $config['port'],
  63. $config['user'],
  64. $config['password'],
  65. $config['virtual_host']
  66. );
  67. $this->channel = $this->connection->channel();
  68. // 设置 QoS - 每次只处理一条消息
  69. $this->channel->basic_qos(null, 1, null);
  70. }
  71. private function setupQueues()
  72. {
  73. // 声明主队列
  74. $this->channel->queue_declare(
  75. $this->queueName,
  76. false, // passive
  77. true, // durable
  78. false, // exclusive
  79. false, // auto_delete
  80. false, // nowait
  81. [
  82. 'x-dead-letter-exchange' => ['S', ''],
  83. 'x-dead-letter-routing-key' => ['S', $this->queueConfig['dead_letter_queue']]
  84. ]
  85. );
  86. // 声明死信队列
  87. $dlqName = $this->queueConfig['dead_letter_queue'];
  88. $dlqConfig = config("rabbitmq.dead_letter_queues.{$dlqName}", []);
  89. $dlqArgs = [];
  90. if (isset($dlqConfig['ttl'])) {
  91. $dlqArgs['x-message-ttl'] = ['I', $dlqConfig['ttl']];
  92. }
  93. if (isset($dlqConfig['max_length'])) {
  94. $dlqArgs['x-max-length'] = ['I', $dlqConfig['max_length']];
  95. }
  96. $this->channel->queue_declare(
  97. $dlqName,
  98. false, // passive
  99. true, // durable
  100. false, // exclusive
  101. false, // auto_delete
  102. false, // nowait
  103. $dlqArgs
  104. );
  105. $this->info("队列设置完成,死信队列: {$dlqName}");
  106. }
  107. private function startConsuming()
  108. {
  109. $callback = function (AMQPMessage $msg) {
  110. $this->processMessage($msg);
  111. };
  112. $this->channel->basic_consume(
  113. $this->queueName,
  114. '', // consumer_tag
  115. false, // no_local
  116. false, // no_ack
  117. false, // exclusive
  118. false, // nowait
  119. $callback
  120. );
  121. $this->info("开始消费消息... 按 Ctrl+C 退出");
  122. // 设置信号处理
  123. if (extension_loaded('pcntl')) {
  124. pcntl_signal(SIGTERM, [$this, 'handleSignal']);
  125. pcntl_signal(SIGINT, [$this, 'handleSignal']);
  126. }
  127. while ($this->channel->is_consuming() && !$this->shouldStop) {
  128. try {
  129. $this->channel->wait(null, false, $this->timeout);
  130. } catch (AMQPTimeoutException $e) {
  131. // ignore it
  132. }
  133. if (extension_loaded('pcntl')) {
  134. pcntl_signal_dispatch();
  135. }
  136. // 检查是否达到最大循环次数
  137. if ($this->processedCount >= $this->maxLoopCount) {
  138. $this->info("达到最大循环次数 ({$this->maxLoopCount}),Worker 自动退出");
  139. break;
  140. }
  141. }
  142. }
  143. private function processMessage(AMQPMessage $msg)
  144. {
  145. try {
  146. $data = json_decode($msg->body, true);
  147. if (json_last_error() !== JSON_ERROR_NONE) {
  148. throw new \Exception("JSON 解析失败: " . json_last_error_msg());
  149. }
  150. // 获取重试次数(从消息头中获取)
  151. $retryCount = 0;
  152. if ($msg->has('application_headers')) {
  153. $headers = $msg->get('application_headers')->getNativeData();
  154. $retryCount = $headers['retry_count'] ?? 0;
  155. }
  156. // 根据队列类型创建对应的 Job
  157. $job = $this->createJob($data, $retryCount);
  158. try {
  159. // 执行业务逻辑
  160. $job->handle();
  161. // 成功处理,确认消息
  162. $msg->ack();
  163. $this->processedCount++;
  164. $this->info("消息处理成功 [{$this->processedCount}/{$this->maxLoopCount}]");
  165. } catch (\Exception $e) {
  166. $this->handleJobException($msg, $data, $retryCount, $e);
  167. }
  168. } catch (\Exception $e) {
  169. $this->error("消息处理异常: " . $e->getMessage());
  170. Log::error("RabbitMQ 消息处理异常", [
  171. 'queue' => $this->queueName,
  172. 'error' => $e->getMessage(),
  173. 'message_body' => $msg->body
  174. ]);
  175. // 拒绝消息并发送到死信队列
  176. $msg->nack(false, false);
  177. $this->processedCount++;
  178. }
  179. }
  180. private function createJob(array $data, int $retryCount): BaseRabbitMQJob
  181. {
  182. // 根据队列名称创建对应的 Job 实例
  183. switch ($this->queueName) {
  184. case 'ai_translate':
  185. return new ProcessAITranslateJob($this->queueName, $data, $retryCount);
  186. // 可以添加更多队列类型
  187. default:
  188. throw new \Exception("未知的队列类型: {$this->queueName}");
  189. }
  190. }
  191. private function handleJobException(AMQPMessage $msg, array $data, int $retryCount, \Exception $e)
  192. {
  193. $maxRetries = $this->queueConfig['retry_times'];
  194. if ($retryCount < $maxRetries - 1) {
  195. // 还有重试机会,重新入队
  196. $this->requeueMessage($msg, $data, $retryCount + 1);
  197. $this->info("消息重新入队,重试次数: " . ($retryCount + 1) . "/{$maxRetries}");
  198. } else {
  199. // 超过重试次数,发送到死信队列
  200. $this->sendToDeadLetterQueue($data, $e);
  201. $msg->ack(); // 确认原消息以避免重复
  202. $this->error("消息超过最大重试次数,已发送到死信队列");
  203. }
  204. $this->processedCount++;
  205. }
  206. private function requeueMessage(AMQPMessage $msg, array $data, int $newRetryCount)
  207. {
  208. // 添加重试计数到消息头
  209. $headers = [
  210. 'retry_count' => $newRetryCount,
  211. 'original_queue' => $this->queueName,
  212. 'retry_timestamp' => time()
  213. ];
  214. $newMsg = new AMQPMessage(
  215. json_encode($data),
  216. [
  217. 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
  218. 'application_headers' => $headers
  219. ]
  220. );
  221. // 发布到同一队列
  222. $this->channel->basic_publish($newMsg, '', $this->queueName);
  223. // 确认原消息
  224. $msg->ack();
  225. }
  226. private function sendToDeadLetterQueue(array $data, \Exception $e)
  227. {
  228. $dlqName = $this->queueConfig['dead_letter_queue'];
  229. $dlqData = [
  230. 'original_message' => $data,
  231. 'failure_reason' => $e->getMessage(),
  232. 'failed_at' => date('Y-m-d H:i:s'),
  233. 'queue' => $this->queueName,
  234. 'max_retries' => $this->queueConfig['retry_times']
  235. ];
  236. $dlqMsg = new AMQPMessage(
  237. json_encode($dlqData),
  238. ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
  239. );
  240. $this->channel->basic_publish($dlqMsg, '', $dlqName);
  241. Log::error("消息发送到死信队列", [
  242. 'original_queue' => $this->queueName,
  243. 'dead_letter_queue' => $dlqName,
  244. 'data' => $data,
  245. 'error' => $e->getMessage()
  246. ]);
  247. }
  248. public function handleSignal($signal)
  249. {
  250. $this->info("接收到退出信号,正在优雅关闭...");
  251. $this->shouldStop = true;
  252. if ($this->channel && $this->channel->is_consuming()) {
  253. $this->channel->basic_cancel_on_shutdown(true);
  254. }
  255. }
  256. private function cleanup()
  257. {
  258. try {
  259. if ($this->channel) {
  260. $this->channel->close();
  261. }
  262. if ($this->connection) {
  263. $this->connection->close();
  264. }
  265. $this->info("连接已关闭,处理了 {$this->processedCount} 条消息");
  266. } catch (\Exception $e) {
  267. $this->error("清理资源时出错: " . $e->getMessage());
  268. }
  269. }
  270. }