Просмотр исходного кода

mq push message 输出 queue name

visuddhinanda 1 год назад
Родитель
Сommit
59b88e201f
1 измененных файлов с 26 добавлено и 20 удалено
  1. 26 20
      api-v8/app/Http/Api/Mq.php

+ 26 - 20
api-v8/app/Http/Api/Mq.php

@@ -5,6 +5,7 @@ namespace App\Http\Api;
 use PhpAmqpLib\Connection\AMQPStreamConnection;
 use PhpAmqpLib\Message\AMQPMessage;
 use PhpAmqpLib\Exchange\AMQPExchangeType;
+use PhpAmqpLib\Exception\AMQPTimeoutException;
 use Illuminate\Support\Facades\Log;
 use Illuminate\Support\Str;
 
@@ -27,11 +28,11 @@ class Mq
         return $connection;
     }
 
-    public static function publish(string $channelName, $message)
+    public static function publish(string $queue, $message)
     {
         //一对一
         try {
-            Log::debug('mq publish {channel} {message}', ['channel' => $channelName, 'message' => $message]);
+            Log::debug('mq publish', ['queue' => $queue, 'message' => $message]);
             $host = config("queue.connections.rabbitmq.host");
             $port = config("queue.connections.rabbitmq.port");
             $user = config("queue.connections.rabbitmq.user");
@@ -43,10 +44,10 @@ class Mq
             }
             $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
             $channel = $connection->channel();
-            $channel->queue_declare($channelName, false, true, false, false);
+            $channel->queue_declare($queue, false, true, false, false);
 
             $msgId = Str::uuid();
-            Log::info('mq push message id=' . $msgId);
+            Log::info('mq push message queue={$queue} id={$msgId}');
             $msg = new AMQPMessage(
                 json_encode($message, JSON_UNESCAPED_UNICODE),
                 [
@@ -54,7 +55,7 @@ class Mq
                     "content_type" => 'application/json; charset=utf-8'
                 ]
             );
-            $channel->basic_publish($msg, '', $channelName);
+            $channel->basic_publish($msg, '', $queue);
 
             $channel->close();
             $connection->close();
@@ -120,22 +121,26 @@ class Mq
             ]);
             if ($callback !== null) {
                 try {
-                    $result = $callback(json_decode($message->body));
-            $message->ack();
-            Log::debug(
-                'mq done',
-                [
-                    'message_id' => $message->get('message_id')
-                ]
-            );
+                    $result = $callback(json_decode($message->getBody()));
+                    $message->ack();
+                    Log::debug(
+                        'mq done',
+                        [
+                            'message_id' => $message->get('message_id')
+                        ]
+                    );
                     if ($result !== 0) {
                         throw new \Exception('task error');
                     }
                 } catch (\Exception $e) {
-			$message->nack();
+                    $message->nack();
+                    Log::error('mq worker exception', [
+                        'message_id' => $message->get('message_id'),
+                        'exception' => $e
+                    ]);
 
                     // push to issues
-                    Log::error('mq worker exception', ['exception' => $e]);
+                    /*
                     $channelName = 'issues';
                     $channelIssues = $connection->channel();
                     $channelIssues->queue_declare($channelName, false, true, false, false);
@@ -149,12 +154,13 @@ class Mq
                     ], JSON_UNESCAPED_UNICODE));
                     $channelIssues->basic_publish($msg, '', $channelName);
                     $channelIssues->close();
+                    */
                 }
 
-                    if (\App\Tools\Tools::isStop()) {
-                        Log::debug('mq worker: .stop file exist. cancel the consumer.');
-                        $message->getChannel()->basic_cancel($message->getConsumerTag());
-                    }
+                if (\App\Tools\Tools::isStop()) {
+                    Log::debug('mq worker: .stop file exist. cancel the consumer.');
+                    $message->getChannel()->basic_cancel($message->getConsumerTag());
+                }
             }
 
 
@@ -209,7 +215,7 @@ class Mq
         while ($channel->is_consuming()) {
             try {
                 $channel->wait(null, false, $timeout);
-            } catch (\AMQPTimeoutException $e) {
+            } catch (AMQPTimeoutException $e) {
             }
         }
     }