Переглянути джерело

SectionTimeoutException 重新入队

visuddhinanda 9 місяців тому
батько
коміт
46865b6521
1 змінених файлів з 14 додано та 2 видалено
  1. 14 2
      api-v8/app/Console/Commands/RabbitMQWorker.php

+ 14 - 2
api-v8/app/Console/Commands/RabbitMQWorker.php

@@ -10,6 +10,7 @@ use Illuminate\Support\Facades\Log;
 use PhpAmqpLib\Exception\AMQPTimeoutException;
 use PhpAmqpLib\Wire\AMQPTable;
 use App\Services\RabbitMQService;
+use App\Exceptions\SectionTimeoutException;
 
 class RabbitMQWorker extends Command
 {
@@ -97,7 +98,10 @@ class RabbitMQWorker extends Command
             try {
                 $this->channel->wait(null, false, $this->timeout);
             } catch (AMQPTimeoutException $e) {
-                // ignore it
+                //忽略
+            } catch (\Exception $e) {
+                $this->error($e->getMessage());
+                throw $e;
             }
 
 
@@ -151,6 +155,9 @@ class RabbitMQWorker extends Command
                 $this->processedCount++;
 
                 $this->info("消息处理成功 [{$this->processedCount}/{$this->maxLoopCount}]");
+            } catch (SectionTimeoutException $e) {
+                $msg->nack(true, false);
+                Log::warning('attempt to requeue the message message_id:' . $msg->get('message_id'));
             } catch (\Exception $e) {
                 $this->handleJobException($msg, $data, $retryCount, $e);
             }
@@ -176,7 +183,12 @@ class RabbitMQWorker extends Command
         // 根据队列名称创建对应的 Job 实例
         switch ($this->queueName) {
             case 'ai_translate':
-                return new ProcessAITranslateJob($this->queueName, $messageId, $data, $retryCount);
+                return new ProcessAITranslateJob(
+                    $this->queueName,
+                    $messageId,
+                    $data,
+                    $retryCount,
+                );
                 // 可以添加更多队列类型
             default:
                 throw new \Exception("未知的队列类型: {$this->queueName}");