visuddhinanda 2 лет назад
Родитель
Сommit
e54bed5fce
1 измененных файлов с 48 добавлено и 44 удалено
  1. 48 44
      app/Http/Api/Mq.php

+ 48 - 44
app/Http/Api/Mq.php

@@ -1,52 +1,56 @@
 <?php
+
 namespace App\Http\Api;
+
 use PhpAmqpLib\Connection\AMQPStreamConnection;
 use PhpAmqpLib\Message\AMQPMessage;
 use PhpAmqpLib\Exchange\AMQPExchangeType;
 use Illuminate\Support\Facades\Log;
 
-class Mq{
+class Mq
+{
 
-    private static function connection(){
+    private static function connection()
+    {
         $host = config("queue.connections.rabbitmq.host");
-            $port = config("queue.connections.rabbitmq.port");
-            $user = config("queue.connections.rabbitmq.user");
-            $password = config("queue.connections.rabbitmq.password");
-            $vhost = config("queue.connections.rabbitmq.password");
-            if(empty($host) || empty($port) || empty($user) || empty($password) || empty($vhost)){
-                Log::error('rabbitmq set error');
-                return;
-            }
-            $connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost);
-            return $connection;
+        $port = config("queue.connections.rabbitmq.port");
+        $user = config("queue.connections.rabbitmq.user");
+        $password = config("queue.connections.rabbitmq.password");
+        $vhost = config("queue.connections.rabbitmq.password");
+        if (empty($host) || empty($port) || empty($user) || empty($password) || empty($vhost)) {
+            Log::error('rabbitmq set error');
+            return;
+        }
+        $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
+        return $connection;
     }
-    public static function publish(string $channelName, $message){
-       //一对一
-        try{
-            Log::debug('mq start {channel} {message}',['channel'=>$channelName,'message'=>$message]);
+    public static function publish(string $channelName, $message)
+    {
+        //一对一
+        try {
+            Log::debug('mq start {channel} {message}', ['channel' => $channelName, 'message' => $message]);
             $host = config("queue.connections.rabbitmq.host");
             $port = config("queue.connections.rabbitmq.port");
             $user = config("queue.connections.rabbitmq.user");
             $password = config("queue.connections.rabbitmq.password");
             $vhost = config("queue.connections.rabbitmq.virtual_host");
-            if(empty($host) || empty($port) || empty($user) || empty($password) || empty($vhost)){
+            if (empty($host) || empty($port) || empty($user) || empty($password) || empty($vhost)) {
                 Log::error('rabbitmq set error');
                 return;
             }
-            $connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost);
+            $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
             $channel = $connection->channel();
             $channel->queue_declare($channelName, false, true, false, false);
 
-            $msg = new AMQPMessage(json_encode($message,JSON_UNESCAPED_UNICODE));
+            $msg = new AMQPMessage(json_encode($message, JSON_UNESCAPED_UNICODE));
             $channel->basic_publish($msg, '', $channelName);
 
             $channel->close();
             $connection->close();
-        }catch(\Exception $e){
+        } catch (\Exception $e) {
             Log::error($e);
             return;
         }
-
     }
 
     /**
@@ -54,7 +58,8 @@ class Mq{
      * @param string $queue
      * @param callable|null $callback
      */
-    public static function worker($exchange,$queue,$callback=null){
+    public static function worker($exchange, $queue, $callback = null)
+    {
 
         $consumerTag = 'consumer';
 
@@ -64,17 +69,17 @@ class Mq{
         $user = config("queue.connections.rabbitmq.user");
         $password = config("queue.connections.rabbitmq.password");
         $vhost = config("queue.connections.rabbitmq.virtual_host");
-        $connection = new AMQPStreamConnection($host,$port,$user,$password,$vhost);
+        $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
 
         $channel = $connection->channel();
 
- /*
+        /*
      The following code is the same both in the consumer and the producer.
      In this way we are sure we always have a queue to consume from and an
          exchange where to publish messages.
  */
 
- /*
+        /*
      name: $queue
      passive: false
      durable: true // the queue will survive server restarts
@@ -96,32 +101,32 @@ class Mq{
 
         /**
          * @param \PhpAmqpLib\Message\AMQPMessage $message
-        */
-        $process_message = function ($message) use($callback,$connection,$exchange,$queue)
-        {
-            if($callback !== null){
-                try{
+         */
+        $process_message = function ($message) use ($callback, $connection, $exchange, $queue) {
+            if ($callback !== null) {
+                try {
                     $result = $callback(json_decode($message->body));
-                    if(\App\Tools\Tools::isStop()){
+                    if (\App\Tools\Tools::isStop()) {
                         Log::debug('mq worker: .stop file exist. cancel the consumer.');
                         $message->getChannel()->basic_cancel($message->getConsumerTag());
                     }
-                    if($result !== 0){
+                    if ($result !== 0) {
                         throw new \Exception('task error');
                     }
-                }catch(\Exception $e){
+                } catch (\Exception $e) {
                     // push to issues
-                    Log::error('mq worker exception',$e);
+                    Log::error('mq worker exception', $e);
                     $channelName = 'issues';
                     $channelIssues = $connection->channel();
                     $channelIssues->queue_declare($channelName, false, true, false, false);
 
-                    $msg = new AMQPMessage(json_encode(['exchange'=>$exchange,
-                                                        'channel'=>$queue,
-                                                        'message'=>json_decode($message->body),
-                                                        'result'=>0,
-                                                        'error'=>$e,
-                                                        ],JSON_UNESCAPED_UNICODE));
+                    $msg = new AMQPMessage(json_encode([
+                        'exchange' => $exchange,
+                        'channel' => $queue,
+                        'message' => json_decode($message->body),
+                        'result' => 0,
+                        'error' => $e,
+                    ], JSON_UNESCAPED_UNICODE));
                     $channelIssues->basic_publish($msg, '', $channelName);
                     $channelIssues->close();
                 }
@@ -150,10 +155,9 @@ class Mq{
 
         /**
          * @param \PhpAmqpLib\Channel\AMQPChannel $channel
-        * @param \PhpAmqpLib\Connection\AbstractConnection $connection
-        */
-        $shutdown = function ($channel, $connection)
-        {
+         * @param \PhpAmqpLib\Connection\AbstractConnection $connection
+         */
+        $shutdown = function ($channel, $connection) {
             $channel->close();
             $connection->close();
         };