Procházet zdrojové kódy

检测到停止标记退出

visuddhinanda před 9 měsíci
rodič
revize
ae724762e8
1 změnil soubory, kde provedl 26 přidání a 57 odebrání
  1. 26 57
      api-v8/app/Console/Commands/RabbitMQWorker.php

+ 26 - 57
api-v8/app/Console/Commands/RabbitMQWorker.php

@@ -3,9 +3,7 @@
 namespace App\Console\Commands;
 
 use Illuminate\Console\Command;
-use PhpAmqpLib\Connection\AMQPStreamConnection;
 use PhpAmqpLib\Message\AMQPMessage;
-use PhpAmqpLib\Channel\AMQPChannel;
 use App\Jobs\ProcessAITranslateJob;
 use App\Jobs\BaseRabbitMQJob;
 use Illuminate\Support\Facades\Log;
@@ -17,7 +15,7 @@ class RabbitMQWorker extends Command
 {
     /**
      * The name and signature of the console command.
-     * php artisan rabbitmq:consume ai_translate
+     * php -d memory_limit=128M artisan rabbitmq:consume ai_translate
      * @var string
      */
     protected $signature = 'rabbitmq:consume {queue} {--reset-loop-count}';
@@ -31,8 +29,13 @@ class RabbitMQWorker extends Command
     private $queueConfig;
     private $shouldStop = false;
     private $timeout = 15;
+    private $job = null;
+
     public function handle(RabbitMQService $consume)
     {
+        if (\App\Tools\Tools::isStop()) {
+            return 0;
+        }
         $this->queueName = $this->argument('queue');
         $this->queueConfig = config("mint.rabbitmq.queues.{$this->queueName}");
 
@@ -66,50 +69,6 @@ class RabbitMQWorker extends Command
         return 0;
     }
 
-    /*
-    private function setupQueues()
-    {
-        // 声明主队列
-        $arguments = new AMQPTable([
-            'x-dead-letter-exchange' => '',
-            'x-dead-letter-routing-key' => $this->queueConfig['dead_letter_queue'], // 死信路由键
-        ]);
-        $this->channel->queue_declare(
-            $this->queueName,
-            false,  // passive
-            true,   // durable
-            false,  // exclusive
-            false,  // auto_delete
-            false,  // nowait
-            $arguments
-        );
-
-        // 声明死信队列
-        $dlqName = $this->queueConfig['dead_letter_queue'];
-        $dlqConfig = config("mint.rabbitmq.dead_letter_queues.{$dlqName}", []);
-
-        $dlqArgs = [];
-        if (isset($dlqConfig['ttl'])) {
-            $dlqArgs['x-message-ttl'] =  $dlqConfig['ttl'];
-        }
-        if (isset($dlqConfig['max_length'])) {
-            $dlqArgs['x-max-length'] =  $dlqConfig['max_length'];
-        }
-        $dlqArguments = new AMQPTable($dlqArgs);
-
-        $this->channel->queue_declare(
-            $dlqName,
-            false,  // passive
-            true,   // durable
-            false,  // exclusive
-            false,  // auto_delete
-            false,  // nowait
-            $dlqArguments
-        );
-
-        $this->info("队列设置完成,死信队列: {$dlqName}");
-    }
-*/
     private function startConsuming()
     {
         $callback = function (AMQPMessage $msg) {
@@ -151,15 +110,21 @@ class RabbitMQWorker extends Command
                 $this->info("达到最大循环次数 ({$this->maxLoopCount}),Worker 自动退出");
                 break;
             }
+            if (\App\Tools\Tools::isStop()) {
+                //检测到停止标记
+                break;
+            }
         }
     }
 
     private function processMessage(AMQPMessage $msg)
     {
         try {
+
             Log::info('processMessage start', ['message_id' => $msg->get('message_id')]);
 
-            $data = json_decode($msg->getBody(), true);
+            $data = json_decode($msg->getBody());
+            $this->info("processMessage start " . $msg->get('message_id') . '[' . count($data) . ']');
 
             if (json_last_error() !== JSON_ERROR_NONE) {
                 throw new \Exception("JSON 解析失败: " . json_last_error_msg());
@@ -173,14 +138,16 @@ class RabbitMQWorker extends Command
             }
 
             // 根据队列类型创建对应的 Job
-            $job = $this->createJob($data, $retryCount);
+            $this->job = $this->createJob($msg->get('message_id'), $data, $retryCount);
 
             try {
                 // 执行业务逻辑
-                $job->handle();
+                $successful = $this->job->handle();
+                if ($successful) {
+                    // 成功处理,确认消息
+                    $msg->ack();
+                }
 
-                // 成功处理,确认消息
-                $msg->ack();
                 $this->processedCount++;
 
                 $this->info("消息处理成功 [{$this->processedCount}/{$this->maxLoopCount}]");
@@ -204,12 +171,12 @@ class RabbitMQWorker extends Command
         }
     }
 
-    private function createJob(array $data, int $retryCount): BaseRabbitMQJob
+    private function createJob(string $messageId, array $data, int $retryCount): BaseRabbitMQJob
     {
         // 根据队列名称创建对应的 Job 实例
         switch ($this->queueName) {
             case 'ai_translate':
-                return new ProcessAITranslateJob($this->queueName, $data, $retryCount);
+                return new ProcessAITranslateJob($this->queueName, $messageId, $data, $retryCount);
                 // 可以添加更多队列类型
             default:
                 throw new \Exception("未知的队列类型: {$this->queueName}");
@@ -228,7 +195,8 @@ class RabbitMQWorker extends Command
             // 超过重试次数,发送到死信队列
             $this->sendToDeadLetterQueue($data, $e);
             $msg->ack(); // 确认原消息以避免重复
-            $this->error("消息超过最大重试次数,已发送到死信队列");
+            $this->error("消息超过最大重试次数,已发送到死信队列 ");
+            Log::error("消息超过最大重试次数,已发送到死信队列 message_id=" . $msg->get('message_id'));
         }
 
         $this->processedCount++;
@@ -284,7 +252,6 @@ class RabbitMQWorker extends Command
         Log::error("消息发送到死信队列", [
             'original_queue' => $this->queueName,
             'dead_letter_queue' => $dlqName,
-            'data' => $data,
             'error' => $e->getMessage()
         ]);
     }
@@ -293,7 +260,9 @@ class RabbitMQWorker extends Command
     {
         $this->info("接收到退出信号,正在优雅关闭...");
         $this->shouldStop = true;
-
+        if ($this->job) {
+            $this->job->stop();
+        }
         if ($this->channel && $this->channel->is_consuming()) {
             //$this->channel->basic_cancel_on_shutdown(true);
             $this->channel->basic_cancel('');