Explorar o código

Merge pull request #2300 from visuddhinanda/development

Development
visuddhinanda hai 10 meses
pai
achega
64784a40b3

+ 1 - 1
api-v8/app/Console/Commands/MqAiTranslate.php

@@ -334,7 +334,7 @@ class MqAiTranslate extends Command
     {
         $url = config('app.url') . '/api/v2/task-status/' . $taskId;
         $data = [
-            'status' => 'done',
+            'status' => $status,
         ];
         Log::debug('ai_translate task status request', ['url' => $url, 'data' => $data]);
         $response = Http::timeout(10)->withToken($token)->patch($url, $data);

+ 35 - 38
api-v8/app/Console/Commands/RabbitMQWorker.php

@@ -10,7 +10,8 @@ use App\Jobs\ProcessAITranslateJob;
 use App\Jobs\BaseRabbitMQJob;
 use Illuminate\Support\Facades\Log;
 use PhpAmqpLib\Exception\AMQPTimeoutException;
-
+use PhpAmqpLib\Wire\AMQPTable;
+use App\Services\RabbitMQService;
 
 class RabbitMQWorker extends Command
 {
@@ -30,7 +31,7 @@ class RabbitMQWorker extends Command
     private $queueConfig;
     private $shouldStop = false;
     private $timeout = 15;
-    public function handle()
+    public function handle(RabbitMQService $consume)
     {
         $this->queueName = $this->argument('queue');
         $this->queueConfig = config("mint.rabbitmq.queues.{$this->queueName}");
@@ -48,8 +49,8 @@ class RabbitMQWorker extends Command
         $this->info("重试次数: {$this->queueConfig['retry_times']}");
 
         try {
-            $this->setupConnection();
-            $this->setupQueues();
+            $consume->setupQueue($this->queueName);
+            $this->channel = $consume->getChannel();
             $this->startConsuming();
         } catch (\Exception $e) {
             $this->error("Worker 启动失败: " . $e->getMessage());
@@ -65,26 +66,14 @@ class RabbitMQWorker extends Command
         return 0;
     }
 
-    private function setupConnection()
-    {
-        $config = config('queue.connections.rabbitmq');
-        $this->connection = new AMQPStreamConnection(
-            $config['host'],
-            $config['port'],
-            $config['user'],
-            $config['password'],
-            $config['virtual_host']
-        );
-
-        $this->channel = $this->connection->channel();
-
-        // 设置 QoS - 每次只处理一条消息
-        $this->channel->basic_qos(null, 1, null);
-    }
-
+    /*
     private function setupQueues()
     {
         // 声明主队列
+        $arguments = new AMQPTable([
+            'x-dead-letter-exchange' => '',
+            'x-dead-letter-routing-key' => $this->queueConfig['dead_letter_queue'], // 死信路由键
+        ]);
         $this->channel->queue_declare(
             $this->queueName,
             false,  // passive
@@ -92,23 +81,21 @@ class RabbitMQWorker extends Command
             false,  // exclusive
             false,  // auto_delete
             false,  // nowait
-            [
-                'x-dead-letter-exchange' => ['S', ''],
-                'x-dead-letter-routing-key' => ['S', $this->queueConfig['dead_letter_queue']]
-            ]
+            $arguments
         );
 
         // 声明死信队列
         $dlqName = $this->queueConfig['dead_letter_queue'];
-        $dlqConfig = config("rabbitmq.dead_letter_queues.{$dlqName}", []);
+        $dlqConfig = config("mint.rabbitmq.dead_letter_queues.{$dlqName}", []);
 
         $dlqArgs = [];
         if (isset($dlqConfig['ttl'])) {
-            $dlqArgs['x-message-ttl'] = ['I', $dlqConfig['ttl']];
+            $dlqArgs['x-message-ttl'] =  $dlqConfig['ttl'];
         }
         if (isset($dlqConfig['max_length'])) {
-            $dlqArgs['x-max-length'] = ['I', $dlqConfig['max_length']];
+            $dlqArgs['x-max-length'] =  $dlqConfig['max_length'];
         }
+        $dlqArguments = new AMQPTable($dlqArgs);
 
         $this->channel->queue_declare(
             $dlqName,
@@ -117,12 +104,12 @@ class RabbitMQWorker extends Command
             false,  // exclusive
             false,  // auto_delete
             false,  // nowait
-            $dlqArgs
+            $dlqArguments
         );
 
         $this->info("队列设置完成,死信队列: {$dlqName}");
     }
-
+*/
     private function startConsuming()
     {
         $callback = function (AMQPMessage $msg) {
@@ -170,7 +157,9 @@ class RabbitMQWorker extends Command
     private function processMessage(AMQPMessage $msg)
     {
         try {
-            $data = json_decode($msg->body, true);
+            Log::info('processMessage start', ['message_id' => $msg->get('message_id')]);
+
+            $data = json_decode($msg->getBody(), true);
 
             if (json_last_error() !== JSON_ERROR_NONE) {
                 throw new \Exception("JSON 解析失败: " . json_last_error_msg());
@@ -203,11 +192,14 @@ class RabbitMQWorker extends Command
             Log::error("RabbitMQ 消息处理异常", [
                 'queue' => $this->queueName,
                 'error' => $e->getMessage(),
-                'message_body' => $msg->body
+                'message_body' => $msg->getBody()
             ]);
 
             // 拒绝消息并发送到死信队列
-            $msg->nack(false, false);
+            //$msg->nack(false, false);
+            $this->sendToDeadLetterQueue($data, $e);
+            $msg->ack(); // 确认原消息以避免重复
+            $this->error("已发送到死信队列");
             $this->processedCount++;
         }
     }
@@ -245,17 +237,21 @@ class RabbitMQWorker extends Command
     private function requeueMessage(AMQPMessage $msg, array $data, int $newRetryCount)
     {
         // 添加重试计数到消息头
-        $headers = [
+        // 使用 AMQPTable 包装头部数据
+        $headers = new AMQPTable([
             'retry_count' => $newRetryCount,
             'original_queue' => $this->queueName,
             'retry_timestamp' => time()
-        ];
+        ]);
 
         $newMsg = new AMQPMessage(
-            json_encode($data),
+            json_encode($data, JSON_UNESCAPED_UNICODE),
             [
                 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
-                'application_headers' => $headers
+                'timestamp' => time(),
+                'message_id' => $msg->get('message_id'),
+                'application_headers' => $headers,
+                "content_type" => 'application/json; charset=utf-8'
             ]
         );
 
@@ -299,7 +295,8 @@ class RabbitMQWorker extends Command
         $this->shouldStop = true;
 
         if ($this->channel && $this->channel->is_consuming()) {
-            $this->channel->basic_cancel_on_shutdown(true);
+            //$this->channel->basic_cancel_on_shutdown(true);
+            $this->channel->basic_cancel('');
         }
     }
 

+ 12 - 8
api-v8/app/Console/Commands/TestMq.php

@@ -10,6 +10,7 @@ use App\Models\Discussion;
 use App\Http\Resources\DiscussionResource;
 use App\Models\SentPr;
 use App\Http\Resources\SentPrResource;
+use App\Services\RabbitMQService;
 
 class TestMq extends Command
 {
@@ -19,7 +20,7 @@ class TestMq extends Command
      * @var string
      */
     protected $signature = 'test:mq {--discussion=} {--pr=}';
-
+    protected $publish;
     /**
      * The console command description.
      *
@@ -32,8 +33,9 @@ class TestMq extends Command
      *
      * @return void
      */
-    public function __construct()
+    public function __construct(RabbitMQService $publish)
     {
+        $this->publish = $publish;
         parent::__construct();
     }
 
@@ -44,18 +46,20 @@ class TestMq extends Command
      */
     public function handle()
     {
-        if(\App\Tools\Tools::isStop()){
+        if (\App\Tools\Tools::isStop()) {
             return 0;
         }
-		Mq::publish('hello',['hello world']);
+        $this->publish->publishMessage('ai_translate', ['text' => 'hello']);
+
+        Mq::publish('hello', ['hello world']);
         $discussion = $this->option('discussion');
-        if($discussion && Str::isUuid($discussion)){
-            Mq::publish('discussion',new DiscussionResource(Discussion::find($discussion)));
+        if ($discussion && Str::isUuid($discussion)) {
+            Mq::publish('discussion', new DiscussionResource(Discussion::find($discussion)));
         }
 
         $pr = $this->option('pr');
-        if($pr && Str::isUuid($pr)){
-            Mq::publish('suggestion',new SentPrResource(SentPr::where('uid',$pr)->first()));
+        if ($pr && Str::isUuid($pr)) {
+            Mq::publish('suggestion', new SentPrResource(SentPr::where('uid', $pr)->first()));
         }
 
         return 0;

+ 1 - 1
api-v8/app/Http/Api/Mq.php

@@ -115,7 +115,7 @@ class Mq
         /**
          * @param \PhpAmqpLib\Message\AMQPMessage $message
          */
-        $process_message = function ($message) use ($callback, $queue) {
+        $process_message = function (AMQPMessage $message) use ($callback, $queue) {
             Log::debug('received message', [
                 'message_id' => $message->get('message_id'),
                 'content_type' => $message->get('content_type')

+ 1 - 1
api-v8/app/Http/Api/TemplateRender.php

@@ -192,7 +192,7 @@ class TemplateRender
                 }
             }
             if (!isset($channelInfo)) {
-                Log::error('channel is null');
+                Log::warning('channel is null');
                 $output = [
                     "word" => $word,
                     'innerHtml' => '',

+ 0 - 1
api-v8/app/Http/Controllers/BlogController.php

@@ -31,7 +31,6 @@ class BlogController extends Controller
             ->latest()
             ->paginate(10);
 
-        Log::info($posts[0]->formatted_created_at);
         $categories = $this->categories;
         /*
         $posts = Post::published()

+ 103 - 92
api-v8/app/Http/Controllers/CollectionController.php

@@ -22,123 +22,135 @@ class CollectionController extends Controller
     public function index(Request $request)
     {
 
-		$result=false;
-		$indexCol = ['uid','title','subtitle','summary',
-                      'article_list','owner','status',
-                      'default_channel','lang',
-                      'updated_at','created_at'];
-		switch ($request->get('view')) {
+        $result = false;
+        $indexCol = [
+            'uid',
+            'title',
+            'subtitle',
+            'summary',
+            'article_list',
+            'owner',
+            'status',
+            'default_channel',
+            'lang',
+            'updated_at',
+            'created_at'
+        ];
+        switch ($request->get('view')) {
             case 'studio_list':
-		        $indexCol = ['owner'];
+                $indexCol = ['owner'];
                 //TODO ?
                 $table = Collection::select($indexCol)
-                                    ->selectRaw('count(*) as count')
-                                    ->where('status', 30)
-                                    ->groupBy('owner');
+                    ->selectRaw('count(*) as count')
+                    ->where('status', 30)
+                    ->groupBy('owner');
                 break;
-			case 'studio':
+            case 'studio':
                 $user = AuthApi::current($request);
-                if(!$user){
+                if (!$user) {
                     return $this->error(__('auth.failed'));
                 }
                 $studioId = StudioApi::getIdByName($request->get('name'));
                 //判断当前用户是否有指定的studio的权限
-                if($user['user_uid'] !== $studioId){
+                if ($user['user_uid'] !== $studioId) {
                     return $this->error(__('auth.failed'));
                 }
                 $table = Collection::select($indexCol);
-                if($request->get('view2','my')==='my'){
+                if ($request->get('view2', 'my') === 'my') {
                     $table = $table->where('owner', $studioId);
-                }else{
+                } else {
                     //协作
-                    $resList = ShareApi::getResList($studioId,4);
-                    $resId=[];
+                    $resList = ShareApi::getResList($studioId, 4);
+                    $resId = [];
                     foreach ($resList as $res) {
                         $resId[] = $res['res_id'];
                     }
-                    $table = $table->whereIn('uid', $resId)->where('owner','<>', $studioId);
+                    $table = $table->whereIn('uid', $resId)->where('owner', '<>', $studioId);
                 }
-				break;
-			case 'public':
+                break;
+            case 'public':
                 //全网公开
-				$table = Collection::select($indexCol)->where('status', 30);
-                if($request->has('studio')){
+                $table = Collection::select($indexCol)->where('status', 30);
+                if ($request->has('studio')) {
                     $studioId = StudioApi::getIdByName($request->get('studio'));
-                    $table = $table->where('owner',$studioId);
+                    $table = $table->where('owner', $studioId);
                 }
-				break;
-			default:
-				# code...
-			    return $this->error("无法识别的view参数",200,200);
-				break;
-		}
-        if($request->has("search") && !empty($request->has("search"))){
-            $table = $table->where('title', 'like', "%".$request->get("search")."%");
+                break;
+            default:
+                # code...
+                return $this->error("无法识别的view参数", 200, 200);
+                break;
+        }
+        if ($request->has("search") && !empty($request->has("search"))) {
+            $table = $table->where('title', 'like', "%" . $request->get("search") . "%");
         }
         $count = $table->count();
-        if($request->has("order") && $request->has("dir")){
-            $table = $table->orderBy($request->get("order"),$request->get("dir"));
-        }else{
-            if($request->get('view') === 'studio_list'){
-                $table = $table->orderBy('count','desc');
-            }else{
-                $table = $table->orderBy('updated_at','desc');
+        if ($request->has("order") && $request->has("dir")) {
+            $table = $table->orderBy($request->get("order"), $request->get("dir"));
+        } else {
+            if ($request->get('view') === 'studio_list') {
+                $table = $table->orderBy('count', 'desc');
+            } else {
+                $table = $table->orderBy('updated_at', 'desc');
             }
         }
 
-        $table = $table->skip($request->get("offset",0))
-                       ->take($request->get("limit",1000));
+        $table = $table->skip($request->get("offset", 0))
+            ->take($request->get("limit", 1000));
 
         $result = $table->get();
-		return $this->ok(["rows"=>CollectionResource::collection($result),"count"=>$count]);
+        return $this->ok(["rows" => CollectionResource::collection($result), "count" => $count]);
     }
 
-            /**
+    /**
      * Display a listing of the resource.
      *
      * @return \Illuminate\Http\Response
      */
-    public function showMyNumber(Request $request){
+    public function showMyNumber(Request $request)
+    {
         $user = AuthApi::current($request);
-        if(!$user){
+        if (!$user) {
             return $this->error(__('auth.failed'));
         }
         //判断当前用户是否有指定的studio的权限
         $studioId = StudioApi::getIdByName($request->get('studio'));
-        if($user['user_uid'] !== $studioId){
+        if ($user['user_uid'] !== $studioId) {
             return $this->error(__('auth.failed'));
         }
         //我的
         $my = Collection::where('owner', $studioId)->count();
         //协作
-        $resList = ShareApi::getResList($studioId,4);
-        $resId=[];
+        $resList = ShareApi::getResList($studioId, 4);
+        $resId = [];
         foreach ($resList as $res) {
             $resId[] = $res['res_id'];
         }
-        $collaboration = Collection::whereIn('uid', $resId)->where('owner','<>', $studioId)->count();
+        $collaboration = Collection::whereIn('uid', $resId)->where('owner', '<>', $studioId)->count();
 
-        return $this->ok(['my'=>$my,'collaboration'=>$collaboration]);
+        return $this->ok(['my' => $my, 'collaboration' => $collaboration]);
     }
 
-    public static function UserCanEdit($user_uid,$collection){
-        if($collection->owner === $user_uid){
+    public static function UserCanEdit($user_uid, $collection)
+    {
+        if ($collection->owner === $user_uid) {
             return true;
         }
         //查协作
-        $currPower = ShareApi::getResPower($user_uid,$collection->uid);
-        if($currPower >= 20){
+        $currPower = ShareApi::getResPower($user_uid, $collection->uid);
+        if ($currPower >= 20) {
             return true;
         }
         return false;
     }
-    public static function UserCanRead($user_uid,$collection){
-        if($collection->owner === $user_uid){
+    public static function UserCanRead($user_uid, $collection)
+    {
+        if ($collection->owner === $user_uid) {
             return true;
         }
         //查协作
-        $currPower = ShareApi::getResPower($user_uid,$collection->uid);
-        if($currPower >= 10){
+        $currPower = ShareApi::getResPower($user_uid, $collection->uid);
+        if ($currPower >= 10) {
             return true;
         }
         return false;
@@ -152,17 +164,17 @@ class CollectionController extends Controller
     public function store(Request $request)
     {
         $user = \App\Http\Api\AuthApi::current($request);
-        if(!$user){
-            return $this->error(__('auth.failed'),401,401);
+        if (!$user) {
+            return $this->error(__('auth.failed'), 401, 401);
         }
         //判断当前用户是否有指定的studio的权限
-        if($user['user_uid'] !== \App\Http\Api\StudioApi::getIdByName($request->get('studio'))){
-            return $this->error(__('auth.failed'),403,403);
+        if ($user['user_uid'] !== \App\Http\Api\StudioApi::getIdByName($request->get('studio'))) {
+            return $this->error(__('auth.failed'), 403, 403);
         }
         //查询是否重复
-        if(Collection::where('title',$request->get('title'))->where('owner',$user['user_uid'])->exists()){
-            return $this->error(__('validation.exists'),200,200);
-        }else{
+        if (Collection::where('title', $request->get('title'))->where('owner', $user['user_uid'])->exists()) {
+            return $this->error(__('validation.exists'), 200, 200);
+        } else {
             $newOne = new Collection;
             $newOne->id = app('snowflake')->id();
             $newOne->uid = Str::uuid();
@@ -172,8 +184,8 @@ class CollectionController extends Controller
             $newOne->owner = $user['user_uid'];
             $newOne->owner_id = $user['user_id'];
             $newOne->editor_id = $user['user_id'];
-            $newOne->create_time = time()*1000;
-            $newOne->modify_time = time()*1000;
+            $newOne->create_time = time() * 1000;
+            $newOne->modify_time = time() * 1000;
             $newOne->save();
             return $this->ok(new CollectionResource($newOne));
         }
@@ -185,33 +197,32 @@ class CollectionController extends Controller
      * @param  string  $id
      * @return \Illuminate\Http\Response
      */
-    public function show(Request  $request,$id)
+    public function show(Request  $request, $id)
     {
-		$result  = Collection::where('uid', $id)->first();
-		if(!$result){
-            return $this->error("没有查询到数据");
+        $result  = Collection::where('uid', $id)->first();
+        if (!$result) {
+            return $this->warning("没有查询到数据 id={$id}");
         }
-        if($result->status<30){
+        if ($result->status < 30) {
             //私有文章,判断权限
-            Log::error('私有文章,判断权限'.$id);
+            Log::info('私有文章,判断权限' . $id);
             $user = \App\Http\Api\AuthApi::current($request);
-            if(!$user){
-                Log::error('未登录');
-                return $this->error(__('auth.failed'),401,401);
+            if (!$user) {
+                Log::warning('未登录');
+                return $this->error(__('auth.failed'), 403, 403);
             }
             //判断当前用户是否有指定的studio的权限
-            if($user['user_uid'] !== $result->owner){
-                Log::error($user["user_uid"].'私有文章,判断权限'.$id);
+            if ($user['user_uid'] !== $result->owner) {
+                Log::info($user["user_uid"] . '私有文章,判断权限' . $id);
                 //非所有者
-                if(CollectionController::UserCanRead($user['user_uid'],$result)===false){
-                    Log::error($user["user_uid"].'没有读取权限');
-                    return $this->error(__('auth.failed'),403,403);
+                if (CollectionController::UserCanRead($user['user_uid'], $result) === false) {
+                    Log::warning($user["user_uid"] . '没有读取权限');
+                    return $this->error(__('auth.failed'), 403, 403);
                 }
             }
         }
         $result->fullArticleList = true;
         return $this->ok(new CollectionResource($result));
-
     }
 
     /**
@@ -225,27 +236,27 @@ class CollectionController extends Controller
     {
         //
         $collection  = Collection::find($id);
-        if(!$collection){
+        if (!$collection) {
             return $this->error("no recorder");
         }
         //鉴权
         $user = AuthApi::current($request);
-        if(!$user){
-            return $this->error(__('auth.failed'),401,401);
+        if (!$user) {
+            return $this->error(__('auth.failed'), 401, 401);
         }
-        if(!CollectionController::UserCanEdit($user["user_uid"],$collection)){
-            return $this->error(__('auth.failed'),403,403);
+        if (!CollectionController::UserCanEdit($user["user_uid"], $collection)) {
+            return $this->error(__('auth.failed'), 403, 403);
         }
         $collection->title = $request->get('title');
         $collection->subtitle = $request->get('subtitle');
         $collection->summary = $request->get('summary');
-        if($request->has('aritcle_list')){
+        if ($request->has('aritcle_list')) {
             $collection->article_list = \json_encode($request->get('aritcle_list'));
         }
         $collection->lang = $request->get('lang');
         $collection->status = $request->get('status');
         $collection->default_channel = $request->get('default_channel');
-        $collection->modify_time = time()*1000;
+        $collection->modify_time = time() * 1000;
         $collection->save();
         return $this->ok(new CollectionResource($collection));
     }
@@ -256,20 +267,20 @@ class CollectionController extends Controller
      * @param  string  $id
      * @return \Illuminate\Http\Response
      */
-    public function destroy(Request $request,string $id)
+    public function destroy(Request $request, string $id)
     {
         //
         $user = AuthApi::current($request);
-        if(!$user){
+        if (!$user) {
             return $this->error(__('auth.failed'));
         }
         //判断当前用户是否有指定的studio的权限
         $collection = Collection::find($id);
-        if($user['user_uid'] !== $collection['owner']){
+        if ($user['user_uid'] !== $collection['owner']) {
             return $this->error(__('auth.failed'));
         }
         $delete = 0;
-        DB::transaction(function() use($collection,$delete){
+        DB::transaction(function () use ($collection, $delete) {
             //TODO 删除文集中的文章
             $delete = $collection->delete();
         });

+ 2 - 2
api-v8/app/Jobs/BaseRabbitMQJob.php

@@ -27,12 +27,12 @@ abstract class BaseRabbitMQJob implements ShouldQueue
         $this->currentRetryCount = $retryCount;
 
         // 从配置读取重试次数和超时时间
-        $queueConfig = config("rabbitmq.queues.{$queueName}");
+        $queueConfig = config("mint.rabbitmq.queues.{$queueName}");
         $this->tries = $queueConfig['retry_times'] ?? 3;
         $this->timeout = $queueConfig['timeout'] ?? 300;
     }
 
-    public function handle()
+    public function handle($messageId = null)
     {
         try {
             Log::info("开始处理队列消息", [

+ 1 - 0
api-v8/app/Services/AiTranslateSerevice.php → api-v8/app/Services/AiTranslateService.php

@@ -12,6 +12,7 @@ class AiTranslateService
 
     public function processTranslate(array $translateData): array
     {
+        $a = $translateData['count'] / 10;
         Log::debug('AiTranslateService processOrder', $translateData);
         return [];
     }

+ 253 - 0
api-v8/app/Services/RabbitMQService.php

@@ -0,0 +1,253 @@
+<?php
+
+namespace App\Services;
+
+use PhpAmqpLib\Connection\AMQPStreamConnection;
+use PhpAmqpLib\Channel\AMQPChannel;
+use PhpAmqpLib\Message\AMQPMessage;
+use PhpAmqpLib\Wire\AMQPTable;
+use Illuminate\Support\Facades\Log;
+
+class RabbitMQService
+{
+    private $connection;
+    private $channel;
+    private $config;
+
+    public function __construct()
+    {
+        $this->config = config('queue.connections.rabbitmq');
+        $this->connect();
+    }
+
+    private function connect()
+    {
+        $conn = $this->config;
+        $this->connection = new AMQPStreamConnection(
+            $conn['host'],
+            $conn['port'],
+            $conn['user'],
+            $conn['password'],
+            $conn['virtual_host']
+        );
+
+        $this->channel = $this->connection->channel();
+
+        // 设置 QoS - 每次只处理一条消息
+        $this->channel->basic_qos(null, 1, null);
+    }
+
+    public function getChannel(): AMQPChannel
+    {
+        return $this->channel;
+    }
+
+    public function setupQueue(string $queueName): void
+    {
+        $queueConfig = config("mint.rabbitmq.queues.{$queueName}");
+
+
+
+        // 创建死信交换机
+        $this->channel->exchange_declare(
+            $queueConfig['dead_letter_exchange'],
+            'direct',
+            false,
+            true,
+            false
+        );
+
+        $dlqName = $queueConfig['dead_letter_queue'];
+        $dlqConfig = config("mint.rabbitmq.dead_letter_queues.{$dlqName}", []);
+        $dlqArgs = [];
+        if (isset($dlqConfig['ttl'])) {
+            $dlqArgs['x-message-ttl'] =  $dlqConfig['ttl'];
+        }
+        if (isset($dlqConfig['max_length'])) {
+            $dlqArgs['x-max-length'] =  $dlqConfig['max_length'];
+        }
+        $dlqArguments = new AMQPTable($dlqArgs);
+
+        // 创建死信队列
+        $this->channel->queue_declare(
+            $dlqName,
+            false,  // passive
+            true,   // durable
+            false,  // exclusive
+            false,  // auto_delete
+            false,  // nowait
+            $dlqArguments
+        );
+
+        // 绑定死信队列到死信交换机
+        $this->channel->queue_bind(
+            $queueConfig['dead_letter_queue'],
+            $queueConfig['dead_letter_exchange']
+        );
+
+        // 创建主队列,配置死信
+        $arguments = new AMQPTable([
+            'x-dead-letter-exchange' => $queueConfig['dead_letter_exchange'],
+            'x-dead-letter-routing-key' => $queueConfig['dead_letter_queue'], // 死信路由键
+        ]);
+
+        $this->channel->queue_declare(
+            $queueName,
+            false,  // passive
+            true,   // durable
+            false,  // exclusive
+            false,  // auto_delete
+            false,  // nowait
+            $arguments
+        );
+    }
+
+    public function publishMessage(string $queueName, array $data): bool
+    {
+        try {
+            $this->setupQueue($queueName);
+
+            $message = new AMQPMessage(
+                json_encode($data, JSON_UNESCAPED_UNICODE),
+                [
+                    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
+                    'timestamp' => time(),
+                    'message_id' => uniqid(),
+                    "content_type" => 'application/json; charset=utf-8'
+                ]
+            );
+
+            $this->channel->basic_publish($message, '', $queueName);
+
+            Log::info("Message published to queue: {$queueName}", $data);
+            return true;
+        } catch (\Exception $e) {
+            Log::error("Failed to publish message to queue: {$queueName}", [
+                'error' => $e->getMessage(),
+                'data' => $data
+            ]);
+            return false;
+        }
+    }
+
+    public function consume(string $queueName, callable $callback, int $maxIterations = null): void
+    {
+        $this->setupQueue($queueName);
+        $maxIterations = $maxIterations ?? $this->config['consumer']['max_iterations'];
+        $iteration = 0;
+
+        $consumerCallback = function (AMQPMessage $msg) use ($callback, $queueName, &$iteration) {
+            try {
+                $data = json_decode($msg->getBody(), true);
+                $retryCount = $this->getRetryCount($msg);
+                $maxRetries = $this->config['queues'][$queueName]['retry_count'];
+
+                Log::info("Processing message from queue: {$queueName}", [
+                    'data' => $data,
+                    'retry_count' => $retryCount,
+                    'delivery_tag' => $msg->getDeliveryTag()
+                ]);
+
+                // 执行回调处理消息
+                $result = $callback($data, $retryCount);
+
+                if ($result === true) {
+                    // 处理成功,确认消息
+                    $msg->ack();
+                    Log::info("Message processed successfully", ['delivery_tag' => $msg->getDeliveryTag()]);
+                } else {
+                    // 处理失败,检查重试次数
+                    if ($retryCount < $maxRetries) {
+                        // 重新入队,延迟处理
+                        $this->requeueWithDelay($msg, $queueName, $retryCount + 1);
+                        Log::warning("Message requeued for retry", [
+                            'delivery_tag' => $msg->getDeliveryTag(),
+                            'retry_count' => $retryCount + 1
+                        ]);
+                    } else {
+                        // 超过重试次数,拒绝消息(进入死信队列)
+                        $msg->nack(false, false);
+                        Log::error("Message rejected after max retries", [
+                            'delivery_tag' => $msg->getDeliveryTag(),
+                            'retry_count' => $retryCount
+                        ]);
+                    }
+                }
+            } catch (\Exception $e) {
+                Log::error("Error processing message", [
+                    'error' => $e->getMessage(),
+                    'delivery_tag' => $msg->getDeliveryTag()
+                ]);
+                $msg->nack(false, false);
+            }
+
+            $iteration++;
+        };
+
+        $this->channel->basic_qos(null, 1, null);
+        $this->channel->basic_consume($queueName, '', false, false, false, false, $consumerCallback);
+
+        Log::info("Starting consumer for queue: {$queueName}", ['max_iterations' => $maxIterations]);
+
+        while ($this->channel->is_consuming() && $iteration < $maxIterations) {
+            $this->channel->wait(null, false, $this->config['consumer']['sleep_between_iterations']);
+        }
+
+        Log::info("Consumer stopped", ['iterations_processed' => $iteration]);
+    }
+
+    private function getRetryCount(AMQPMessage $msg): int
+    {
+        $headers = $msg->get_properties();
+        return isset($headers['application_headers']['x-retry-count'])
+            ? $headers['application_headers']['x-retry-count'] : 0;
+    }
+
+    private function requeueWithDelay(AMQPMessage $msg, string $queueName, int $retryCount): void
+    {
+        $delay = $this->config['queues'][$queueName]['retry_delay'];
+
+        // 创建延迟队列
+        $delayQueueName = "{$queueName}_delay_{$retryCount}";
+        $arguments = new AMQPTable([
+            'x-message-ttl' => $delay,
+            'x-dead-letter-exchange' => '',
+            'x-dead-letter-routing-key' => $queueName,
+        ]);
+
+        $this->channel->queue_declare(
+            $delayQueueName,
+            false,
+            true,
+            false,
+            false,
+            false,
+            $arguments
+        );
+
+        // 重新发布消息到延迟队列
+        $data = json_decode($msg->getBody(), true);
+        $newMessage = new AMQPMessage(
+            json_encode($data),
+            [
+                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
+                'application_headers' => new AMQPTable([
+                    'x-retry-count' => $retryCount
+                ])
+            ]
+        );
+
+        $this->channel->basic_publish($newMessage, '', $delayQueueName);
+        $msg->ack();
+    }
+
+    public function close(): void
+    {
+        if ($this->channel) {
+            $this->channel->close();
+        }
+        if ($this->connection) {
+            $this->connection->close();
+        }
+    }
+}

+ 4 - 3
api-v8/config/mint.php

@@ -127,10 +127,11 @@ return [
     'rabbitmq' => [
         'queues' => [
             'ai_translate' => [
-                'retry_times' => env('RABBITMQ_ORDERS_RETRY_TIMES', 3),
-                'max_loop_count' => env('RABBITMQ_ORDERS_MAX_LOOP', 10),
-                'timeout' => env('RABBITMQ_ORDERS_TIMEOUT', 300),
+                'retry_times' => env('RABBITMQ_AI_RETRY_TIMES', 3),
+                'max_loop_count' => env('RABBITMQ_AI_MAX_LOOP', 10),
+                'timeout' => env('RABBITMQ_AI_TIMEOUT', 300),
                 'dead_letter_queue' => 'ai_translate_dlq',
+                'dead_letter_exchange' => 'ai_translate_dlx',
             ],
         ],