visuddhinanda 10 luni în urmă
părinte
comite
1b92ffd97b

+ 92 - 0
api-v8/app/Console/Commands/ProcessDeadLetterQueue.php

@@ -0,0 +1,92 @@
+<?php
+
+namespace App\Console\Commands;
+
+use Illuminate\Console\Command;
+use PhpAmqpLib\Connection\AMQPStreamConnection;
+use PhpAmqpLib\Message\AMQPMessage;
+use Illuminate\Support\Facades\Log;
+
+class ProcessDeadLetterQueue extends Command
+{
+    /**
+     * The name and signature of the console command.
+     * 查看死信队列消息
+     * php artisan rabbitmq:process-dlq orders_dlq
+     *
+     * 重新入队死信消息
+     * php artisan rabbitmq:process-dlq orders_dlq --requeue
+     *
+     * 删除死信消息
+     * php artisan rabbitmq:process-dlq orders_dlq --delete
+     * @var string
+     */
+    protected $signature = 'rabbitmq:process-dlq {dlq_name} {--requeue} {--delete}';
+    protected $description = '处理死信队列中的消息';
+
+    public function handle()
+    {
+        $dlqName = $this->argument('dlq_name');
+        $requeue = $this->option('requeue');
+        $delete = $this->option('delete');
+
+        $config = config('rabbitmq.connection');
+        $connection = new AMQPStreamConnection(
+            $config['host'],
+            $config['port'],
+            $config['user'],
+            $config['password'],
+            $config['vhost']
+        );
+
+        $channel = $connection->channel();
+
+        $this->info("开始处理死信队列: {$dlqName}");
+
+        $messageCount = 0;
+
+        while (true) {
+            $msg = $channel->basic_get($dlqName, false);
+
+            if (!$msg) {
+                break; // 队列为空
+            }
+
+            $messageCount++;
+            $data = json_decode($msg->body, true);
+
+            $this->info("处理第 {$messageCount} 条死信消息");
+            $this->line("原始队列: " . ($data['queue'] ?? 'unknown'));
+            $this->line("失败原因: " . ($data['failure_reason'] ?? 'unknown'));
+            $this->line("失败时间: " . ($data['failed_at'] ?? 'unknown'));
+
+            if ($requeue) {
+                // 重新入队到原始队列
+                $originalQueue = $data['queue'] ?? null;
+                if ($originalQueue && isset($data['original_message'])) {
+                    $requeueMsg = new AMQPMessage(
+                        json_encode($data['original_message']),
+                        ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
+                    );
+
+                    $channel->basic_publish($requeueMsg, '', $originalQueue);
+                    $this->info("消息已重新入队到: {$originalQueue}");
+                }
+            }
+
+            if ($delete || $requeue) {
+                $msg->ack();
+            } else {
+                // 只是查看,不删除
+                $msg->nack(false, true);
+            }
+        }
+
+        $this->info("死信队列处理完成,共处理 {$messageCount} 条消息");
+
+        $channel->close();
+        $connection->close();
+
+        return 0;
+    }
+}

+ 321 - 0
api-v8/app/Console/Commands/RabbitMQWorker.php

@@ -0,0 +1,321 @@
+<?php
+
+namespace App\Console\Commands;
+
+use Illuminate\Console\Command;
+use PhpAmqpLib\Connection\AMQPStreamConnection;
+use PhpAmqpLib\Message\AMQPMessage;
+use PhpAmqpLib\Channel\AMQPChannel;
+use App\Jobs\ProcessAITranslateJob;
+use App\Jobs\BaseRabbitMQJob;
+use Illuminate\Support\Facades\Log;
+use PhpAmqpLib\Exception\AMQPTimeoutException;
+
+
+class RabbitMQWorker extends Command
+{
+    /**
+     * The name and signature of the console command.
+     * php artisan rabbitmq:consume ai_translate
+     * @var string
+     */
+    protected $signature = 'rabbitmq:consume {queue} {--reset-loop-count}';
+    protected $description = '消费 RabbitMQ 队列消息';
+
+    private $connection;
+    private $channel;
+    private $processedCount = 0;
+    private $maxLoopCount = 0;
+    private $queueName;
+    private $queueConfig;
+    private $shouldStop = false;
+    private $timeout = 15;
+    public function handle()
+    {
+        $this->queueName = $this->argument('queue');
+        $this->queueConfig = config("mint.rabbitmq.queues.{$this->queueName}");
+
+        if (!$this->queueConfig) {
+            $this->error("队列 {$this->queueName} 的配置不存在");
+            return 1;
+        }
+
+        $this->maxLoopCount = $this->queueConfig['max_loop_count'];
+
+        $this->info("启动 RabbitMQ Worker");
+        $this->info("队列: {$this->queueName}");
+        $this->info("最大循环次数: {$this->maxLoopCount}");
+        $this->info("重试次数: {$this->queueConfig['retry_times']}");
+
+        try {
+            $this->setupConnection();
+            $this->setupQueues();
+            $this->startConsuming();
+        } catch (\Exception $e) {
+            $this->error("Worker 启动失败: " . $e->getMessage());
+            Log::error("RabbitMQ Worker 启动失败", [
+                'queue' => $this->queueName,
+                'error' => $e->getMessage()
+            ]);
+            return 1;
+        } finally {
+            $this->cleanup();
+        }
+
+        return 0;
+    }
+
+    private function setupConnection()
+    {
+        $config = config('queue.connections.rabbitmq');
+        $this->connection = new AMQPStreamConnection(
+            $config['host'],
+            $config['port'],
+            $config['user'],
+            $config['password'],
+            $config['virtual_host']
+        );
+
+        $this->channel = $this->connection->channel();
+
+        // 设置 QoS - 每次只处理一条消息
+        $this->channel->basic_qos(null, 1, null);
+    }
+
+    private function setupQueues()
+    {
+        // 声明主队列
+        $this->channel->queue_declare(
+            $this->queueName,
+            false,  // passive
+            true,   // durable
+            false,  // exclusive
+            false,  // auto_delete
+            false,  // nowait
+            [
+                'x-dead-letter-exchange' => ['S', ''],
+                'x-dead-letter-routing-key' => ['S', $this->queueConfig['dead_letter_queue']]
+            ]
+        );
+
+        // 声明死信队列
+        $dlqName = $this->queueConfig['dead_letter_queue'];
+        $dlqConfig = config("rabbitmq.dead_letter_queues.{$dlqName}", []);
+
+        $dlqArgs = [];
+        if (isset($dlqConfig['ttl'])) {
+            $dlqArgs['x-message-ttl'] = ['I', $dlqConfig['ttl']];
+        }
+        if (isset($dlqConfig['max_length'])) {
+            $dlqArgs['x-max-length'] = ['I', $dlqConfig['max_length']];
+        }
+
+        $this->channel->queue_declare(
+            $dlqName,
+            false,  // passive
+            true,   // durable
+            false,  // exclusive
+            false,  // auto_delete
+            false,  // nowait
+            $dlqArgs
+        );
+
+        $this->info("队列设置完成,死信队列: {$dlqName}");
+    }
+
+    private function startConsuming()
+    {
+        $callback = function (AMQPMessage $msg) {
+            $this->processMessage($msg);
+        };
+
+        $this->channel->basic_consume(
+            $this->queueName,
+            '',     // consumer_tag
+            false,  // no_local
+            false,  // no_ack
+            false,  // exclusive
+            false,  // nowait
+            $callback
+        );
+
+        $this->info("开始消费消息... 按 Ctrl+C 退出");
+
+        // 设置信号处理
+        if (extension_loaded('pcntl')) {
+            pcntl_signal(SIGTERM, [$this, 'handleSignal']);
+            pcntl_signal(SIGINT, [$this, 'handleSignal']);
+        }
+
+        while ($this->channel->is_consuming() && !$this->shouldStop) {
+            try {
+                $this->channel->wait(null, false, $this->timeout);
+            } catch (AMQPTimeoutException $e) {
+                // ignore it
+            }
+
+
+            if (extension_loaded('pcntl')) {
+                pcntl_signal_dispatch();
+            }
+
+            // 检查是否达到最大循环次数
+            if ($this->processedCount >= $this->maxLoopCount) {
+                $this->info("达到最大循环次数 ({$this->maxLoopCount}),Worker 自动退出");
+                break;
+            }
+        }
+    }
+
+    private function processMessage(AMQPMessage $msg)
+    {
+        try {
+            $data = json_decode($msg->body, true);
+
+            if (json_last_error() !== JSON_ERROR_NONE) {
+                throw new \Exception("JSON 解析失败: " . json_last_error_msg());
+            }
+
+            // 获取重试次数(从消息头中获取)
+            $retryCount = 0;
+            if ($msg->has('application_headers')) {
+                $headers = $msg->get('application_headers')->getNativeData();
+                $retryCount = $headers['retry_count'] ?? 0;
+            }
+
+            // 根据队列类型创建对应的 Job
+            $job = $this->createJob($data, $retryCount);
+
+            try {
+                // 执行业务逻辑
+                $job->handle();
+
+                // 成功处理,确认消息
+                $msg->ack();
+                $this->processedCount++;
+
+                $this->info("消息处理成功 [{$this->processedCount}/{$this->maxLoopCount}]");
+            } catch (\Exception $e) {
+                $this->handleJobException($msg, $data, $retryCount, $e);
+            }
+        } catch (\Exception $e) {
+            $this->error("消息处理异常: " . $e->getMessage());
+            Log::error("RabbitMQ 消息处理异常", [
+                'queue' => $this->queueName,
+                'error' => $e->getMessage(),
+                'message_body' => $msg->body
+            ]);
+
+            // 拒绝消息并发送到死信队列
+            $msg->nack(false, false);
+            $this->processedCount++;
+        }
+    }
+
+    private function createJob(array $data, int $retryCount): BaseRabbitMQJob
+    {
+        // 根据队列名称创建对应的 Job 实例
+        switch ($this->queueName) {
+            case 'ai_translate':
+                return new ProcessAITranslateJob($this->queueName, $data, $retryCount);
+                // 可以添加更多队列类型
+            default:
+                throw new \Exception("未知的队列类型: {$this->queueName}");
+        }
+    }
+
+    private function handleJobException(AMQPMessage $msg, array $data, int $retryCount, \Exception $e)
+    {
+        $maxRetries = $this->queueConfig['retry_times'];
+
+        if ($retryCount < $maxRetries - 1) {
+            // 还有重试机会,重新入队
+            $this->requeueMessage($msg, $data, $retryCount + 1);
+            $this->info("消息重新入队,重试次数: " . ($retryCount + 1) . "/{$maxRetries}");
+        } else {
+            // 超过重试次数,发送到死信队列
+            $this->sendToDeadLetterQueue($data, $e);
+            $msg->ack(); // 确认原消息以避免重复
+            $this->error("消息超过最大重试次数,已发送到死信队列");
+        }
+
+        $this->processedCount++;
+    }
+
+    private function requeueMessage(AMQPMessage $msg, array $data, int $newRetryCount)
+    {
+        // 添加重试计数到消息头
+        $headers = [
+            'retry_count' => $newRetryCount,
+            'original_queue' => $this->queueName,
+            'retry_timestamp' => time()
+        ];
+
+        $newMsg = new AMQPMessage(
+            json_encode($data),
+            [
+                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
+                'application_headers' => $headers
+            ]
+        );
+
+        // 发布到同一队列
+        $this->channel->basic_publish($newMsg, '', $this->queueName);
+
+        // 确认原消息
+        $msg->ack();
+    }
+
+    private function sendToDeadLetterQueue(array $data, \Exception $e)
+    {
+        $dlqName = $this->queueConfig['dead_letter_queue'];
+
+        $dlqData = [
+            'original_message' => $data,
+            'failure_reason' => $e->getMessage(),
+            'failed_at' => date('Y-m-d H:i:s'),
+            'queue' => $this->queueName,
+            'max_retries' => $this->queueConfig['retry_times']
+        ];
+
+        $dlqMsg = new AMQPMessage(
+            json_encode($dlqData),
+            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
+        );
+
+        $this->channel->basic_publish($dlqMsg, '', $dlqName);
+
+        Log::error("消息发送到死信队列", [
+            'original_queue' => $this->queueName,
+            'dead_letter_queue' => $dlqName,
+            'data' => $data,
+            'error' => $e->getMessage()
+        ]);
+    }
+
+    public function handleSignal($signal)
+    {
+        $this->info("接收到退出信号,正在优雅关闭...");
+        $this->shouldStop = true;
+
+        if ($this->channel && $this->channel->is_consuming()) {
+            $this->channel->basic_cancel_on_shutdown(true);
+        }
+    }
+
+    private function cleanup()
+    {
+        try {
+            if ($this->channel) {
+                $this->channel->close();
+            }
+            if ($this->connection) {
+                $this->connection->close();
+            }
+
+            $this->info("连接已关闭,处理了 {$this->processedCount} 条消息");
+        } catch (\Exception $e) {
+            $this->error("清理资源时出错: " . $e->getMessage());
+        }
+    }
+}

+ 107 - 0
api-v8/app/Jobs/BaseRabbitMQJob.php

@@ -0,0 +1,107 @@
+<?php
+
+namespace App\Jobs;
+
+use Illuminate\Bus\Queueable;
+use Illuminate\Contracts\Queue\ShouldQueue;
+use Illuminate\Foundation\Bus\Dispatchable;
+use Illuminate\Queue\InteractsWithQueue;
+use Illuminate\Queue\SerializesModels;
+use Illuminate\Support\Facades\Log;
+use Illuminate\Support\Facades\Config;
+
+abstract class BaseRabbitMQJob implements ShouldQueue
+{
+    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
+
+    protected $queueName;
+    protected $messageData;
+    protected $currentRetryCount = 0;
+    protected $tries = 0;
+    protected $timeout = 0;
+
+    public function __construct(string $queueName, array $messageData, int $retryCount = 0)
+    {
+        $this->queueName = $queueName;
+        $this->messageData = $messageData;
+        $this->currentRetryCount = $retryCount;
+
+        // 从配置读取重试次数和超时时间
+        $queueConfig = config("rabbitmq.queues.{$queueName}");
+        $this->tries = $queueConfig['retry_times'] ?? 3;
+        $this->timeout = $queueConfig['timeout'] ?? 300;
+    }
+
+    public function handle()
+    {
+        try {
+            Log::info("开始处理队列消息", [
+                'queue' => $this->queueName,
+                'message_id' => $this->messageData['id'] ?? 'unknown',
+                'retry_count' => $this->currentRetryCount
+            ]);
+
+            // 调用子类的具体业务逻辑
+            $result = $this->processMessage($this->messageData);
+
+            Log::info("队列消息处理完成", [
+                'queue' => $this->queueName,
+                'message_id' => $this->messageData['id'] ?? 'unknown',
+                'result' => $result
+            ]);
+
+            return $result;
+        } catch (\Exception $e) {
+            Log::error("队列消息处理失败", [
+                'queue' => $this->queueName,
+                'message_id' => $this->messageData['id'] ?? 'unknown',
+                'error' => $e->getMessage(),
+                'retry_count' => $this->currentRetryCount,
+                'max_retries' => $this->tries
+            ]);
+
+            // 如果达到最大重试次数,处理失败逻辑
+            if ($this->currentRetryCount >= $this->tries - 1) {
+                $this->handleFinalFailure($this->messageData, $e);
+            }
+
+            throw $e; // 重新抛出异常以触发重试
+        }
+    }
+
+    public function failed(\Exception $exception)
+    {
+        Log::error("队列消息最终失败", [
+            'queue' => $this->queueName,
+            'message_id' => $this->messageData['id'] ?? 'unknown',
+            'error' => $exception->getMessage(),
+            'retry_count' => $this->currentRetryCount
+        ]);
+
+        // 发送到死信队列的逻辑将在 Worker 中处理
+    }
+
+    // 子类需要实现的具体业务逻辑
+    abstract protected function processMessage(array $messageData);
+
+    // 子类可以重写的失败处理逻辑
+    protected function handleFinalFailure(array $messageData, \Exception $exception)
+    {
+        // 默认实现:记录日志
+        Log::error("消息处理最终失败,准备发送到死信队列", [
+            'queue' => $this->queueName,
+            'message_data' => $messageData,
+            'error' => $exception->getMessage()
+        ]);
+    }
+
+    public function getQueueName(): string
+    {
+        return $this->queueName;
+    }
+
+    public function getCurrentRetryCount(): int
+    {
+        return $this->currentRetryCount;
+    }
+}

+ 35 - 0
api-v8/app/Jobs/ProcessAITranslateJob.php

@@ -0,0 +1,35 @@
+<?php
+
+namespace App\Jobs;
+
+use App\Services\AiTranslateService;
+use Illuminate\Support\Facades\Log;
+
+class ProcessAITranslateJob extends BaseRabbitMQJob
+{
+    protected function processMessage(array $messageData)
+    {
+        $startTime = microtime(true);
+        try {
+            $translateService = app(AiTranslateService::class);
+            return $translateService->processTranslate($messageData);
+        } catch (\Exception $e) {
+            // 记录失败指标
+
+            throw $e;
+        } finally {
+            // 记录处理时间
+            $processingTime = microtime(true) - $startTime;
+            Log::info('翻译处理耗时', ['time' => $processingTime]);
+        }
+    }
+
+    protected function handleFinalFailure(array $messageData, \Exception $exception)
+    {
+        parent::handleFinalFailure($messageData, $exception);
+
+        // 订单特定的失败处理
+        $orderService = app(AiTranslateService::class);
+        $orderService->handleFailedTranslate($messageData, $exception);
+    }
+}

+ 28 - 0
api-v8/app/Services/AiTranslateSerevice.php

@@ -0,0 +1,28 @@
+<?php
+
+namespace App\Services;
+
+use Illuminate\Support\Facades\Log;
+
+class AiTranslateService
+{
+
+
+    public function __construct() {}
+
+    public function processTranslate(array $translateData): array
+    {
+        Log::debug('AiTranslateService processOrder', $translateData);
+        return [];
+    }
+
+    public function handleFailedTranslate(array $translateData, \Exception $exception): void
+    {
+        try {
+            //失败时的业务逻辑
+            // 发送失败通知
+        } catch (\Exception $e) {
+            Log::error('处理失败订单时出错', ['error' => $e->getMessage()]);
+        }
+    }
+}