RabbitMQWorker.php 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. <?php
  2. namespace App\Console\Commands;
  3. use Illuminate\Console\Command;
  4. use PhpAmqpLib\Connection\AMQPStreamConnection;
  5. use PhpAmqpLib\Message\AMQPMessage;
  6. use PhpAmqpLib\Channel\AMQPChannel;
  7. use App\Jobs\ProcessAITranslateJob;
  8. use App\Jobs\BaseRabbitMQJob;
  9. use Illuminate\Support\Facades\Log;
  10. use PhpAmqpLib\Exception\AMQPTimeoutException;
  11. use PhpAmqpLib\Wire\AMQPTable;
  12. use App\Services\RabbitMQService;
  13. class RabbitMQWorker extends Command
  14. {
  15. /**
  16. * The name and signature of the console command.
  17. * php artisan rabbitmq:consume ai_translate
  18. * @var string
  19. */
  20. protected $signature = 'rabbitmq:consume {queue} {--reset-loop-count}';
  21. protected $description = '消费 RabbitMQ 队列消息';
  22. private $connection;
  23. private $channel;
  24. private $processedCount = 0;
  25. private $maxLoopCount = 0;
  26. private $queueName;
  27. private $queueConfig;
  28. private $shouldStop = false;
  29. private $timeout = 15;
  30. public function handle(RabbitMQService $consume)
  31. {
  32. $this->queueName = $this->argument('queue');
  33. $this->queueConfig = config("mint.rabbitmq.queues.{$this->queueName}");
  34. if (!$this->queueConfig) {
  35. $this->error("队列 {$this->queueName} 的配置不存在");
  36. return 1;
  37. }
  38. $this->maxLoopCount = $this->queueConfig['max_loop_count'];
  39. $this->info("启动 RabbitMQ Worker");
  40. $this->info("队列: {$this->queueName}");
  41. $this->info("最大循环次数: {$this->maxLoopCount}");
  42. $this->info("重试次数: {$this->queueConfig['retry_times']}");
  43. try {
  44. $consume->setupQueue($this->queueName);
  45. $this->channel = $consume->getChannel();
  46. $this->startConsuming();
  47. } catch (\Exception $e) {
  48. $this->error("Worker 启动失败: " . $e->getMessage());
  49. Log::error("RabbitMQ Worker 启动失败", [
  50. 'queue' => $this->queueName,
  51. 'error' => $e->getMessage()
  52. ]);
  53. return 1;
  54. } finally {
  55. $this->cleanup();
  56. }
  57. return 0;
  58. }
  59. /*
  60. private function setupQueues()
  61. {
  62. // 声明主队列
  63. $arguments = new AMQPTable([
  64. 'x-dead-letter-exchange' => '',
  65. 'x-dead-letter-routing-key' => $this->queueConfig['dead_letter_queue'], // 死信路由键
  66. ]);
  67. $this->channel->queue_declare(
  68. $this->queueName,
  69. false, // passive
  70. true, // durable
  71. false, // exclusive
  72. false, // auto_delete
  73. false, // nowait
  74. $arguments
  75. );
  76. // 声明死信队列
  77. $dlqName = $this->queueConfig['dead_letter_queue'];
  78. $dlqConfig = config("mint.rabbitmq.dead_letter_queues.{$dlqName}", []);
  79. $dlqArgs = [];
  80. if (isset($dlqConfig['ttl'])) {
  81. $dlqArgs['x-message-ttl'] = $dlqConfig['ttl'];
  82. }
  83. if (isset($dlqConfig['max_length'])) {
  84. $dlqArgs['x-max-length'] = $dlqConfig['max_length'];
  85. }
  86. $dlqArguments = new AMQPTable($dlqArgs);
  87. $this->channel->queue_declare(
  88. $dlqName,
  89. false, // passive
  90. true, // durable
  91. false, // exclusive
  92. false, // auto_delete
  93. false, // nowait
  94. $dlqArguments
  95. );
  96. $this->info("队列设置完成,死信队列: {$dlqName}");
  97. }
  98. */
  99. private function startConsuming()
  100. {
  101. $callback = function (AMQPMessage $msg) {
  102. $this->processMessage($msg);
  103. };
  104. $this->channel->basic_consume(
  105. $this->queueName,
  106. '', // consumer_tag
  107. false, // no_local
  108. false, // no_ack
  109. false, // exclusive
  110. false, // nowait
  111. $callback
  112. );
  113. $this->info("开始消费消息... 按 Ctrl+C 退出");
  114. // 设置信号处理
  115. if (extension_loaded('pcntl')) {
  116. pcntl_signal(SIGTERM, [$this, 'handleSignal']);
  117. pcntl_signal(SIGINT, [$this, 'handleSignal']);
  118. }
  119. while ($this->channel->is_consuming() && !$this->shouldStop) {
  120. try {
  121. $this->channel->wait(null, false, $this->timeout);
  122. } catch (AMQPTimeoutException $e) {
  123. // ignore it
  124. }
  125. if (extension_loaded('pcntl')) {
  126. pcntl_signal_dispatch();
  127. }
  128. // 检查是否达到最大循环次数
  129. if ($this->processedCount >= $this->maxLoopCount) {
  130. $this->info("达到最大循环次数 ({$this->maxLoopCount}),Worker 自动退出");
  131. break;
  132. }
  133. }
  134. }
  135. private function processMessage(AMQPMessage $msg)
  136. {
  137. try {
  138. Log::info('processMessage start', ['message_id' => $msg->get('message_id')]);
  139. $data = json_decode($msg->getBody(), true);
  140. if (json_last_error() !== JSON_ERROR_NONE) {
  141. throw new \Exception("JSON 解析失败: " . json_last_error_msg());
  142. }
  143. // 获取重试次数(从消息头中获取)
  144. $retryCount = 0;
  145. if ($msg->has('application_headers')) {
  146. $headers = $msg->get('application_headers')->getNativeData();
  147. $retryCount = $headers['retry_count'] ?? 0;
  148. }
  149. // 根据队列类型创建对应的 Job
  150. $job = $this->createJob($data, $retryCount);
  151. try {
  152. // 执行业务逻辑
  153. $job->handle();
  154. // 成功处理,确认消息
  155. $msg->ack();
  156. $this->processedCount++;
  157. $this->info("消息处理成功 [{$this->processedCount}/{$this->maxLoopCount}]");
  158. } catch (\Exception $e) {
  159. $this->handleJobException($msg, $data, $retryCount, $e);
  160. }
  161. } catch (\Exception $e) {
  162. $this->error("消息处理异常: " . $e->getMessage());
  163. Log::error("RabbitMQ 消息处理异常", [
  164. 'queue' => $this->queueName,
  165. 'error' => $e->getMessage(),
  166. 'message_body' => $msg->getBody()
  167. ]);
  168. // 拒绝消息并发送到死信队列
  169. //$msg->nack(false, false);
  170. $this->sendToDeadLetterQueue($data, $e);
  171. $msg->ack(); // 确认原消息以避免重复
  172. $this->error("已发送到死信队列");
  173. $this->processedCount++;
  174. }
  175. }
  176. private function createJob(array $data, int $retryCount): BaseRabbitMQJob
  177. {
  178. // 根据队列名称创建对应的 Job 实例
  179. switch ($this->queueName) {
  180. case 'ai_translate':
  181. return new ProcessAITranslateJob($this->queueName, $data, $retryCount);
  182. // 可以添加更多队列类型
  183. default:
  184. throw new \Exception("未知的队列类型: {$this->queueName}");
  185. }
  186. }
  187. private function handleJobException(AMQPMessage $msg, array $data, int $retryCount, \Exception $e)
  188. {
  189. $maxRetries = $this->queueConfig['retry_times'];
  190. if ($retryCount < $maxRetries - 1) {
  191. // 还有重试机会,重新入队
  192. $this->requeueMessage($msg, $data, $retryCount + 1);
  193. $this->info("消息重新入队,重试次数: " . ($retryCount + 1) . "/{$maxRetries}");
  194. } else {
  195. // 超过重试次数,发送到死信队列
  196. $this->sendToDeadLetterQueue($data, $e);
  197. $msg->ack(); // 确认原消息以避免重复
  198. $this->error("消息超过最大重试次数,已发送到死信队列");
  199. }
  200. $this->processedCount++;
  201. }
  202. private function requeueMessage(AMQPMessage $msg, array $data, int $newRetryCount)
  203. {
  204. // 添加重试计数到消息头
  205. // 使用 AMQPTable 包装头部数据
  206. $headers = new AMQPTable([
  207. 'retry_count' => $newRetryCount,
  208. 'original_queue' => $this->queueName,
  209. 'retry_timestamp' => time()
  210. ]);
  211. $newMsg = new AMQPMessage(
  212. json_encode($data, JSON_UNESCAPED_UNICODE),
  213. [
  214. 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
  215. 'timestamp' => time(),
  216. 'message_id' => $msg->get('message_id'),
  217. 'application_headers' => $headers,
  218. "content_type" => 'application/json; charset=utf-8'
  219. ]
  220. );
  221. // 发布到同一队列
  222. $this->channel->basic_publish($newMsg, '', $this->queueName);
  223. // 确认原消息
  224. $msg->ack();
  225. }
  226. private function sendToDeadLetterQueue(array $data, \Exception $e)
  227. {
  228. $dlqName = $this->queueConfig['dead_letter_queue'];
  229. $dlqData = [
  230. 'original_message' => $data,
  231. 'failure_reason' => $e->getMessage(),
  232. 'failed_at' => date('Y-m-d H:i:s'),
  233. 'queue' => $this->queueName,
  234. 'max_retries' => $this->queueConfig['retry_times']
  235. ];
  236. $dlqMsg = new AMQPMessage(
  237. json_encode($dlqData),
  238. ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
  239. );
  240. $this->channel->basic_publish($dlqMsg, '', $dlqName);
  241. Log::error("消息发送到死信队列", [
  242. 'original_queue' => $this->queueName,
  243. 'dead_letter_queue' => $dlqName,
  244. 'data' => $data,
  245. 'error' => $e->getMessage()
  246. ]);
  247. }
  248. public function handleSignal($signal)
  249. {
  250. $this->info("接收到退出信号,正在优雅关闭...");
  251. $this->shouldStop = true;
  252. if ($this->channel && $this->channel->is_consuming()) {
  253. //$this->channel->basic_cancel_on_shutdown(true);
  254. $this->channel->basic_cancel('');
  255. }
  256. }
  257. private function cleanup()
  258. {
  259. try {
  260. if ($this->channel) {
  261. $this->channel->close();
  262. }
  263. if ($this->connection) {
  264. $this->connection->close();
  265. }
  266. $this->info("连接已关闭,处理了 {$this->processedCount} 条消息");
  267. } catch (\Exception $e) {
  268. $this->error("清理资源时出错: " . $e->getMessage());
  269. }
  270. }
  271. }