RabbitMQWorker.php 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. <?php
  2. namespace App\Console\Commands;
  3. use Illuminate\Console\Command;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. use App\Jobs\ProcessAITranslateJob;
  6. use App\Jobs\BaseRabbitMQJob;
  7. use Illuminate\Support\Facades\Log;
  8. use PhpAmqpLib\Exception\AMQPTimeoutException;
  9. use PhpAmqpLib\Wire\AMQPTable;
  10. use App\Services\RabbitMQService;
  11. use App\Exceptions\SectionTimeoutException;
  12. use App\Exceptions\TaskFailException;
  13. class RabbitMQWorker extends Command
  14. {
  15. /**
  16. * The name and signature of the console command.
  17. * php -d memory_limit=128M artisan rabbitmq:consume ai_translate
  18. * @var string
  19. */
  20. protected $signature = 'rabbitmq:consume {queue} {--reset-loop-count}';
  21. protected $description = '消费 RabbitMQ 队列消息';
  22. private $connection;
  23. private $channel;
  24. private $processedCount = 0;
  25. private $maxLoopCount = 0;
  26. private $queueName;
  27. private $queueConfig;
  28. private $shouldStop = false;
  29. private $timeout = 15;
  30. private $job = null;
  31. public function handle()
  32. {
  33. if (\App\Tools\Tools::isStop()) {
  34. return 0;
  35. }
  36. $this->queueName = $this->argument('queue');
  37. $this->queueConfig = config("mint.rabbitmq.queues.{$this->queueName}");
  38. if (!$this->queueConfig) {
  39. $this->error("队列 {$this->queueName} 的配置不存在");
  40. return 1;
  41. }
  42. $this->maxLoopCount = $this->queueConfig['max_loop_count'];
  43. $this->info("启动 RabbitMQ Worker");
  44. $this->info("队列: {$this->queueName}");
  45. $this->info("最大循环次数: {$this->maxLoopCount}");
  46. $this->info("重试次数: {$this->queueConfig['retry_times']}");
  47. $consume = app(RabbitMQService::class);
  48. try {
  49. $consume->setupQueue($this->queueName);
  50. $this->channel = $consume->getChannel();
  51. $this->startConsuming();
  52. } catch (\Exception $e) {
  53. $this->error("Worker 启动失败: " . $e->getMessage());
  54. Log::error("RabbitMQ Worker 启动失败", [
  55. 'queue' => $this->queueName,
  56. 'error' => $e->getMessage()
  57. ]);
  58. return 1;
  59. } finally {
  60. $this->cleanup();
  61. }
  62. return 0;
  63. }
  64. private function startConsuming()
  65. {
  66. $callback = function (AMQPMessage $msg) {
  67. $this->processMessage($msg);
  68. };
  69. $this->channel->basic_consume(
  70. $this->queueName,
  71. '', // consumer_tag
  72. false, // no_local
  73. false, // no_ack
  74. false, // exclusive
  75. false, // nowait
  76. $callback
  77. );
  78. $this->info("开始消费消息... 按 Ctrl+C 退出");
  79. // 设置信号处理
  80. if (extension_loaded('pcntl')) {
  81. pcntl_signal(SIGTERM, [$this, 'handleSignal']);
  82. pcntl_signal(SIGINT, [$this, 'handleSignal']);
  83. }
  84. while ($this->channel->is_consuming() && !$this->shouldStop) {
  85. try {
  86. $this->channel->wait(null, false, $this->timeout);
  87. } catch (AMQPTimeoutException $e) {
  88. //忽略
  89. } catch (\Exception $e) {
  90. $this->error($e->getMessage());
  91. throw $e;
  92. }
  93. if (extension_loaded('pcntl')) {
  94. pcntl_signal_dispatch();
  95. }
  96. // 检查是否达到最大循环次数
  97. if ($this->processedCount >= $this->maxLoopCount) {
  98. $this->info("达到最大循环次数 ({$this->maxLoopCount}),Worker 自动退出");
  99. break;
  100. }
  101. if (\App\Tools\Tools::isStop()) {
  102. //检测到停止标记
  103. break;
  104. }
  105. }
  106. }
  107. private function processMessage(AMQPMessage $msg)
  108. {
  109. try {
  110. Log::info('processMessage start', ['message_id' => $msg->get('message_id')]);
  111. $data = json_decode($msg->getBody());
  112. $this->info("processMessage start " . $msg->get('message_id') . '[' . count($data) . ']');
  113. if (json_last_error() !== JSON_ERROR_NONE) {
  114. throw new \Exception("JSON 解析失败: " . json_last_error_msg());
  115. }
  116. // 获取重试次数(从消息头中获取)
  117. $retryCount = 0;
  118. if ($msg->has('application_headers')) {
  119. $headers = $msg->get('application_headers')->getNativeData();
  120. $retryCount = $headers['retry_count'] ?? 0;
  121. }
  122. // 根据队列类型创建对应的 Job
  123. $this->job = $this->createJob($msg->get('message_id'), $data, $retryCount);
  124. try {
  125. // 执行业务逻辑
  126. $this->job->handle();
  127. // 成功处理,确认消息
  128. $msg->ack();
  129. $this->processedCount++;
  130. $this->info("消息处理成功 [{$this->processedCount}/{$this->maxLoopCount}]");
  131. } catch (SectionTimeoutException $e) {
  132. $msg->nack(true, false);
  133. Log::warning('attempt to requeue the message message_id:' . $msg->get('message_id'));
  134. } catch (TaskFailException $e) {
  135. $msg->nack(false, false);
  136. } catch (\Exception $e) {
  137. //requeue
  138. $this->handleJobException($msg, $data, $retryCount, $e);
  139. }
  140. } catch (\Exception $e) {
  141. $this->error("消息处理异常: " . $e->getMessage());
  142. Log::error("RabbitMQ 消息处理异常", [
  143. 'queue' => $this->queueName,
  144. 'error' => $e->getMessage(),
  145. 'message_body' => $msg->getBody()
  146. ]);
  147. // 拒绝消息并发送到死信队列
  148. //$msg->nack(false, false);
  149. $this->sendToDeadLetterQueue($data, $e);
  150. $msg->ack(); // 确认原消息以避免重复
  151. $this->error("已发送到死信队列");
  152. $this->processedCount++;
  153. }
  154. }
  155. private function createJob(string $messageId, array $data, int $retryCount): BaseRabbitMQJob
  156. {
  157. // 根据队列名称创建对应的 Job 实例
  158. switch ($this->queueName) {
  159. case 'ai_translate':
  160. return new ProcessAITranslateJob(
  161. $this->queueName,
  162. $messageId,
  163. $data,
  164. $retryCount,
  165. );
  166. // 可以添加更多队列类型
  167. default:
  168. throw new \Exception("未知的队列类型: {$this->queueName}");
  169. }
  170. }
  171. private function handleJobException(AMQPMessage $msg, array $data, int $retryCount, \Exception $e)
  172. {
  173. $maxRetries = $this->queueConfig['retry_times'];
  174. if ($retryCount < $maxRetries - 1) {
  175. // 还有重试机会,重新入队
  176. $this->requeueMessage($msg, $data, $retryCount + 1);
  177. $this->info("消息重新入队,重试次数: " . ($retryCount + 1) . "/{$maxRetries}");
  178. } else {
  179. // 超过重试次数,发送到死信队列
  180. $this->sendToDeadLetterQueue($data, $e);
  181. $msg->ack(); // 确认原消息以避免重复
  182. $this->error("消息超过最大重试次数,已发送到死信队列 ");
  183. Log::error("消息超过最大重试次数,已发送到死信队列 message_id=" . $msg->get('message_id'));
  184. }
  185. $this->processedCount++;
  186. }
  187. private function requeueMessage(AMQPMessage $msg, array $data, int $newRetryCount)
  188. {
  189. // 添加重试计数到消息头
  190. // 使用 AMQPTable 包装头部数据
  191. $headers = new AMQPTable([
  192. 'retry_count' => $newRetryCount,
  193. 'original_queue' => $this->queueName,
  194. 'retry_timestamp' => time()
  195. ]);
  196. $newMsg = new AMQPMessage(
  197. json_encode($data, JSON_UNESCAPED_UNICODE),
  198. [
  199. 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
  200. 'timestamp' => time(),
  201. 'message_id' => $msg->get('message_id'),
  202. 'application_headers' => $headers,
  203. "content_type" => 'application/json; charset=utf-8'
  204. ]
  205. );
  206. // 发布到同一队列
  207. $this->channel->basic_publish($newMsg, '', $this->queueName);
  208. // 确认原消息
  209. $msg->ack();
  210. }
  211. private function sendToDeadLetterQueue(array $data, \Exception $e)
  212. {
  213. $dlqName = $this->queueConfig['dead_letter_queue'];
  214. $dlqData = [
  215. 'original_message' => $data,
  216. 'failure_reason' => $e->getMessage(),
  217. 'failed_at' => date('Y-m-d H:i:s'),
  218. 'queue' => $this->queueName,
  219. 'max_retries' => $this->queueConfig['retry_times']
  220. ];
  221. $dlqMsg = new AMQPMessage(
  222. json_encode($dlqData),
  223. ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
  224. );
  225. $this->channel->basic_publish($dlqMsg, '', $dlqName);
  226. Log::error("消息发送到死信队列", [
  227. 'original_queue' => $this->queueName,
  228. 'dead_letter_queue' => $dlqName,
  229. 'error' => $e->getMessage()
  230. ]);
  231. }
  232. public function handleSignal($signal)
  233. {
  234. $this->info("接收到退出信号,正在优雅关闭...");
  235. $this->shouldStop = true;
  236. if ($this->job) {
  237. $this->job->stop();
  238. }
  239. if ($this->channel && $this->channel->is_consuming()) {
  240. //$this->channel->basic_cancel_on_shutdown(true);
  241. $this->channel->basic_cancel('');
  242. }
  243. }
  244. private function cleanup()
  245. {
  246. try {
  247. if ($this->channel) {
  248. $this->channel->close();
  249. }
  250. if ($this->connection) {
  251. $this->connection->close();
  252. }
  253. $this->info("连接已关闭,处理了 {$this->processedCount} 条消息");
  254. } catch (\Exception $e) {
  255. $this->error("清理资源时出错: " . $e->getMessage());
  256. }
  257. }
  258. }