Explorar el Código

改为非阻塞

visuddhinanda hace 2 años
padre
commit
1d6e9c7ed1

+ 6 - 17
app/Console/Commands/MqDiscussion.php

@@ -3,9 +3,9 @@
 namespace App\Console\Commands;
 
 use Illuminate\Console\Command;
-use PhpAmqpLib\Connection\AMQPStreamConnection;
 use App\Models\Sentence;
 use App\Models\WebHook;
+use App\Http\Api\Mq;
 
 class MqDiscussion extends Command
 {
@@ -40,15 +40,10 @@ class MqDiscussion extends Command
      */
     public function handle()
     {
-		$connection = new AMQPStreamConnection(env("RABBITMQ_HOST"), env("RABBITMQ_PORT"), env("RABBITMQ_USERNAME"), env("RABBITMQ_PASSWORD"));
-		$channel = $connection->channel();
-
-		$channel->queue_declare('discussion', false, true, false, false);
-
-		$this->info(" [*] Waiting for wbw-analyses. To exit press CTRL+C");
-
-		$callback = function ($msg) {
-            $message = json_decode($msg->body);
+        $exchange = 'router';
+        $queue = 'discussion';
+        $this->info(" [*] Waiting for {$queue}. To exit press CTRL+C");
+        Mq::worker($exchange,$queue,function ($message){
             switch ($message->res_type) {
                 case 'sentence':
                     $sentence = Sentence::where('uid',$message->res_id)->first();
@@ -107,14 +102,8 @@ class MqDiscussion extends Command
                     # code...
                     break;
             }
+        });
 
-		};
-
-		$channel->basic_consume('discussion', '', false, true, false, false, $callback);
-
-		while ($channel->is_open()) {
-			  $channel->wait();
-		  }
         return 0;
     }
 }

+ 14 - 13
app/Console/Commands/MqPr.php

@@ -3,9 +3,10 @@
 namespace App\Console\Commands;
 
 use Illuminate\Console\Command;
-use PhpAmqpLib\Connection\AMQPStreamConnection;
+use App\Http\Api\Mq;
 use App\Models\Sentence;
 use App\Models\WebHook;
+use App\Models\PaliSentence;
 
 class MqPr extends Command
 {
@@ -40,16 +41,10 @@ class MqPr extends Command
      */
     public function handle()
     {
-		$connection = new AMQPStreamConnection(env("RABBITMQ_HOST"), env("RABBITMQ_PORT"), env("RABBITMQ_USERNAME"), env("RABBITMQ_PASSWORD"));
-		$channel = $connection->channel();
-
-		$channel->queue_declare('suggestion', false, true, false, false);
-
-		$this->info(" [*] Waiting for suggestion. To exit press CTRL+C");
-
-		$callback = function ($msg) {
-            $message = json_decode($msg->body);
-
+        $exchange = 'router';
+        $queue = 'suggestion';
+        $this->info(" [*] Waiting for {$queue}. To exit press CTRL+C");
+        Mq::worker($exchange,$queue,function ($message){
             /**生成消息内容 */
             $msgTitle = '';
             $username = $message->editor->nickName;
@@ -61,14 +56,14 @@ class MqPr extends Command
             $prtext = mb_substr($message->content,0,140,"UTF-8");
             $sent_num = "{$message->book}-{$message->paragraph}-{$message->word_start}-{$message->word_end}";
             $link = "https://next.wikipali.org/pcd/article/para/{$message->book}-{$message->paragraph}";
-            $link .= "?book={$message->book}&par={$message->paragraph}&channel={$message->channel_uid}";
+            $link .= "?book={$message->book}&par={$message->paragraph}&channel={$message->channel->id}";
 
             $msgContent = "{$username} 就文句`{$palitext}`提出了修改建议:
             >内容摘要:<font color=\"comment\">{$prtext}</font>,\n
             >句子编号:<font color=\"info\">{$sent_num}</font>\n
             欢迎大家[点击链接]({$link})查看并讨论。";
 
-            $webhooks = WebHook::where('res_id',$message->channel_uid)
+            $webhooks = WebHook::where('res_id',$message->channel->id)
                             ->where('status','active')
                             ->get();
             foreach ($webhooks as $key => $hook) {
@@ -99,6 +94,12 @@ class MqPr extends Command
                     WebHook::where('id',$hook->id)->increment('fail');
                 }
             }
+        });
+
+		$callback = function ($msg) {
+            $message = json_decode($msg->body);
+
+
 
 
 		};

+ 6 - 17
app/Console/Commands/MqProgress.php

@@ -3,7 +3,7 @@
 namespace App\Console\Commands;
 
 use Illuminate\Console\Command;
-use PhpAmqpLib\Connection\AMQPStreamConnection;
+use App\Http\Api\Mq;
 
 class MqProgress extends Command
 {
@@ -38,16 +38,10 @@ class MqProgress extends Command
      */
     public function handle()
     {
-		$connection = new AMQPStreamConnection(env("RABBITMQ_HOST"), env("RABBITMQ_PORT"), env("RABBITMQ_USERNAME"), env("RABBITMQ_PASSWORD"));
-		$channel = $connection->channel();
-
-		$channel->queue_declare('progress', false, true, false, false);
-
-		$this->info(" [*] Waiting for messages. To exit press CTRL+C");
-
-		$callback = function ($msg) {
-            $message = json_decode($msg->body);
-
+        $exchange = 'router';
+        $queue = 'progress';
+        $this->info(" [*] Waiting for {$queue}. To exit press CTRL+C");
+        Mq::worker($exchange,$queue,function ($message){
             $ok = $this->call('upgrade:progress',['--book'=>$message->book,
                                             '--para'=>$message->para,
                                             '--channel'=>$message->channel,
@@ -57,13 +51,8 @@ class MqProgress extends Command
                                                 '--channel'=>$message->channel,
                                                 ]);
             $this->info("Received book=".$message->book.' progress='.$ok.' chapter='.$ok2);
-		};
-
-		$channel->basic_consume('progress', '', false, true, false, false, $callback);
+        });
 
-		while ($channel->is_open()) {
-			  $channel->wait();
-		  }
         return 0;
     }
 }

+ 6 - 16
app/Console/Commands/MqWbwAnalyses.php

@@ -3,7 +3,7 @@
 namespace App\Console\Commands;
 
 use Illuminate\Console\Command;
-use PhpAmqpLib\Connection\AMQPStreamConnection;
+use App\Http\Api\Mq;
 
 class MqWbwAnalyses extends Command
 {
@@ -38,24 +38,14 @@ class MqWbwAnalyses extends Command
      */
     public function handle()
     {
-		$connection = new AMQPStreamConnection(env("RABBITMQ_HOST"), env("RABBITMQ_PORT"), env("RABBITMQ_USERNAME"), env("RABBITMQ_PASSWORD"));
-		$channel = $connection->channel();
-
-		$channel->queue_declare('wbw-analyses', false, true, false, false);
-
-		$this->info(" [*] Waiting for wbw-analyses. To exit press CTRL+C");
-
-		$callback = function ($msg) {
-            $message = json_decode($msg->body);
+        $exchange = 'router';
+        $queue = 'wbw-analyses';
+        $this->info(" [*] Waiting for {$queue}. To exit press CTRL+C");
+        Mq::worker($exchange,$queue,function ($message){
             $ok = $this->call('upgrade:wbw.analyses',['id'=>implode(',',$message)]);
             $this->info("Received count=".count($message).' ok='.$ok);
-		};
-
-		$channel->basic_consume('wbw-analyses', '', false, true, false, false, $callback);
+        });
 
-		while ($channel->is_open()) {
-			  $channel->wait();
-		  }
         return 0;
     }
 }

+ 2 - 2
app/Console/Commands/TestMq.php

@@ -40,8 +40,8 @@ class TestMq extends Command
     public function handle()
     {
         //一对一
-		Mq::publish('wbw-analyses',[13607982709477376]);
-		//Mq::publish('hello','hello world');
+		//Mq::publish('wbw-analyses',[13607982709477376]);
+		Mq::publish('hello',['hello world']);
 
         //一对多
         /*

+ 8 - 19
app/Console/Commands/TestMqWorker.php

@@ -3,8 +3,7 @@
 namespace App\Console\Commands;
 
 use Illuminate\Console\Command;
-
-use PhpAmqpLib\Connection\AMQPStreamConnection;
+use App\Http\Api\Mq;
 
 class TestMqWorker extends Command
 {
@@ -13,7 +12,7 @@ class TestMqWorker extends Command
      *
      * @var string
      */
-    protected $signature = 'test:mqworker';
+    protected $signature = 'test:mq.worker';
 
     /**
      * The console command description.
@@ -39,22 +38,12 @@ class TestMqWorker extends Command
      */
     public function handle()
     {
-		$connection = new AMQPStreamConnection(env("RABBITMQ_HOST"), env("RABBITMQ_PORT"), env("RABBITMQ_USERNAME"), env("RABBITMQ_PASSWORD"));
-		$channel = $connection->channel();
-
-		$channel->queue_declare('hello', false, true, false, false);
-
-		echo " [*] Waiting for messages. To exit press CTRL+C\n";
-
-		$callback = function ($msg) {
-			echo ' [x] Received ', $msg->body, "\n";
-		};
-
-		$channel->basic_consume('hello', '', false, true, false, false, $callback);
-
-		while ($channel->is_open()) {
-			  $channel->wait();
-		  }
+        $exchange = 'router';
+        $queue = 'hello';
+        $this->info(" [*] Waiting for {$queue}. To exit press CTRL+C");
+        Mq::worker($exchange,$queue,function ($message){
+            print_r($message);
+        });
         return 0;
     }
 }

+ 98 - 1
app/Http/Api/Mq.php

@@ -2,10 +2,17 @@
 namespace App\Http\Api;
 use PhpAmqpLib\Connection\AMQPStreamConnection;
 use PhpAmqpLib\Message\AMQPMessage;
+use PhpAmqpLib\Exchange\AMQPExchangeType;
+
 class Mq{
     public static function publish(string $channelName, $message){
                 //一对一
-		$connection = new AMQPStreamConnection(env("RABBITMQ_HOST"), env("RABBITMQ_PORT"), env("RABBITMQ_USERNAME"), env("RABBITMQ_PASSWORD"));
+		$connection = new AMQPStreamConnection(env("RABBITMQ_HOST"),
+                                               env("RABBITMQ_PORT"),
+                                               env("RABBITMQ_USERNAME"),
+                                               env("RABBITMQ_PASSWORD"),
+                                               env("RABBITMQ_VIRTUAL_HOST")
+                                            );
 		$channel = $connection->channel();
 		$channel->queue_declare($channelName, false, true, false, false);
 
@@ -15,4 +22,94 @@ class Mq{
 		$channel->close();
 		$connection->close();
     }
+
+    /**
+     * @param string $exchange
+     * @param string $queue
+     * @param callable|null $callback
+     */
+    public static function worker($exchange,$queue,$callback=null){
+
+        $consumerTag = 'consumer';
+
+        $connection = new AMQPStreamConnection(env("RABBITMQ_HOST"),
+                                               env("RABBITMQ_PORT"),
+                                               env("RABBITMQ_USERNAME"),
+                                               env("RABBITMQ_PASSWORD"),
+                                               env("RABBITMQ_VIRTUAL_HOST"));
+        $channel = $connection->channel();
+
+ /*
+     The following code is the same both in the consumer and the producer.
+     In this way we are sure we always have a queue to consume from and an
+         exchange where to publish messages.
+ */
+
+ /*
+     name: $queue
+     passive: false
+     durable: true // the queue will survive server restarts
+     exclusive: false // the queue can be accessed in other channels
+     auto_delete: false //the queue won't be deleted once the channel is closed.
+ */
+        $channel->queue_declare($queue, false, true, false, false);
+
+        /*
+            name: $exchange
+            type: direct
+            passive: false
+            durable: true // the exchange will survive server restarts
+            auto_delete: false //the exchange won't be deleted once the channel is closed.
+        */
+
+        $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
+        $channel->queue_bind($queue, $exchange);
+
+        /**
+         * @param \PhpAmqpLib\Message\AMQPMessage $message
+        */
+        $process_message = function ($message) use($callback)
+        {
+            if($callback !== null){
+                $callback(json_decode($message->body));
+            }
+            $message->ack();
+
+            // Send a message with the string "quit" to cancel the consumer.
+            /*
+            if ($message->body === 'quit') {
+                $message->getChannel()->basic_cancel($message->getConsumerTag());
+            }
+            */
+        };
+
+        /*
+            queue: Queue from where to get the messages
+            consumer_tag: Consumer identifier
+            no_local: Don't receive messages published by this consumer.
+            no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
+            exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
+            nowait:
+            callback: A PHP Callback
+        */
+        $channel->basic_consume($queue, $consumerTag, false, false, false, false, $process_message);
+
+        /**
+         * @param \PhpAmqpLib\Channel\AMQPChannel $channel
+        * @param \PhpAmqpLib\Connection\AbstractConnection $connection
+        */
+        $shutdown = function ($channel, $connection)
+        {
+            $channel->close();
+            $connection->close();
+        };
+
+        register_shutdown_function($shutdown, $channel, $connection);
+        // Loop as long as the channel has callbacks registered
+        while ($channel->is_consuming()) {
+            $channel->wait(null, true);
+            // do something else
+            usleep(300000);
+        }
+    }
 }