Просмотр исходного кода

handle(RabbitMQService $consume)

visuddhinanda 10 месяцев назад
Родитель
Сommit
a032a60030
1 измененных файлов с 35 добавлено и 38 удалено
  1. 35 38
      api-v8/app/Console/Commands/RabbitMQWorker.php

+ 35 - 38
api-v8/app/Console/Commands/RabbitMQWorker.php

@@ -10,7 +10,8 @@ use App\Jobs\ProcessAITranslateJob;
 use App\Jobs\BaseRabbitMQJob;
 use Illuminate\Support\Facades\Log;
 use PhpAmqpLib\Exception\AMQPTimeoutException;
-
+use PhpAmqpLib\Wire\AMQPTable;
+use App\Services\RabbitMQService;
 
 class RabbitMQWorker extends Command
 {
@@ -30,7 +31,7 @@ class RabbitMQWorker extends Command
     private $queueConfig;
     private $shouldStop = false;
     private $timeout = 15;
-    public function handle()
+    public function handle(RabbitMQService $consume)
     {
         $this->queueName = $this->argument('queue');
         $this->queueConfig = config("mint.rabbitmq.queues.{$this->queueName}");
@@ -48,8 +49,8 @@ class RabbitMQWorker extends Command
         $this->info("重试次数: {$this->queueConfig['retry_times']}");
 
         try {
-            $this->setupConnection();
-            $this->setupQueues();
+            $consume->setupQueue($this->queueName);
+            $this->channel = $consume->getChannel();
             $this->startConsuming();
         } catch (\Exception $e) {
             $this->error("Worker 启动失败: " . $e->getMessage());
@@ -65,26 +66,14 @@ class RabbitMQWorker extends Command
         return 0;
     }
 
-    private function setupConnection()
-    {
-        $config = config('queue.connections.rabbitmq');
-        $this->connection = new AMQPStreamConnection(
-            $config['host'],
-            $config['port'],
-            $config['user'],
-            $config['password'],
-            $config['virtual_host']
-        );
-
-        $this->channel = $this->connection->channel();
-
-        // 设置 QoS - 每次只处理一条消息
-        $this->channel->basic_qos(null, 1, null);
-    }
-
+    /*
     private function setupQueues()
     {
         // 声明主队列
+        $arguments = new AMQPTable([
+            'x-dead-letter-exchange' => '',
+            'x-dead-letter-routing-key' => $this->queueConfig['dead_letter_queue'], // 死信路由键
+        ]);
         $this->channel->queue_declare(
             $this->queueName,
             false,  // passive
@@ -92,23 +81,21 @@ class RabbitMQWorker extends Command
             false,  // exclusive
             false,  // auto_delete
             false,  // nowait
-            [
-                'x-dead-letter-exchange' => ['S', ''],
-                'x-dead-letter-routing-key' => ['S', $this->queueConfig['dead_letter_queue']]
-            ]
+            $arguments
         );
 
         // 声明死信队列
         $dlqName = $this->queueConfig['dead_letter_queue'];
-        $dlqConfig = config("rabbitmq.dead_letter_queues.{$dlqName}", []);
+        $dlqConfig = config("mint.rabbitmq.dead_letter_queues.{$dlqName}", []);
 
         $dlqArgs = [];
         if (isset($dlqConfig['ttl'])) {
-            $dlqArgs['x-message-ttl'] = ['I', $dlqConfig['ttl']];
+            $dlqArgs['x-message-ttl'] =  $dlqConfig['ttl'];
         }
         if (isset($dlqConfig['max_length'])) {
-            $dlqArgs['x-max-length'] = ['I', $dlqConfig['max_length']];
+            $dlqArgs['x-max-length'] =  $dlqConfig['max_length'];
         }
+        $dlqArguments = new AMQPTable($dlqArgs);
 
         $this->channel->queue_declare(
             $dlqName,
@@ -117,12 +104,12 @@ class RabbitMQWorker extends Command
             false,  // exclusive
             false,  // auto_delete
             false,  // nowait
-            $dlqArgs
+            $dlqArguments
         );
 
         $this->info("队列设置完成,死信队列: {$dlqName}");
     }
-
+*/
     private function startConsuming()
     {
         $callback = function (AMQPMessage $msg) {
@@ -170,7 +157,9 @@ class RabbitMQWorker extends Command
     private function processMessage(AMQPMessage $msg)
     {
         try {
-            $data = json_decode($msg->body, true);
+            Log::info('processMessage start', ['message_id' => $msg->get('message_id')]);
+
+            $data = json_decode($msg->getBody(), true);
 
             if (json_last_error() !== JSON_ERROR_NONE) {
                 throw new \Exception("JSON 解析失败: " . json_last_error_msg());
@@ -203,11 +192,14 @@ class RabbitMQWorker extends Command
             Log::error("RabbitMQ 消息处理异常", [
                 'queue' => $this->queueName,
                 'error' => $e->getMessage(),
-                'message_body' => $msg->body
+                'message_body' => $msg->getBody()
             ]);
 
             // 拒绝消息并发送到死信队列
-            $msg->nack(false, false);
+            //$msg->nack(false, false);
+            $this->sendToDeadLetterQueue($data, $e);
+            $msg->ack(); // 确认原消息以避免重复
+            $this->error("已发送到死信队列");
             $this->processedCount++;
         }
     }
@@ -245,17 +237,21 @@ class RabbitMQWorker extends Command
     private function requeueMessage(AMQPMessage $msg, array $data, int $newRetryCount)
     {
         // 添加重试计数到消息头
-        $headers = [
+        // 使用 AMQPTable 包装头部数据
+        $headers = new AMQPTable([
             'retry_count' => $newRetryCount,
             'original_queue' => $this->queueName,
             'retry_timestamp' => time()
-        ];
+        ]);
 
         $newMsg = new AMQPMessage(
-            json_encode($data),
+            json_encode($data, JSON_UNESCAPED_UNICODE),
             [
                 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
-                'application_headers' => $headers
+                'timestamp' => time(),
+                'message_id' => $msg->get('message_id'),
+                'application_headers' => $headers,
+                "content_type" => 'application/json; charset=utf-8'
             ]
         );
 
@@ -299,7 +295,8 @@ class RabbitMQWorker extends Command
         $this->shouldStop = true;
 
         if ($this->channel && $this->channel->is_consuming()) {
-            $this->channel->basic_cancel_on_shutdown(true);
+            //$this->channel->basic_cancel_on_shutdown(true);
+            $this->channel->basic_cancel('');
         }
     }