Просмотр исходного кода

Merge pull request #2264 from visuddhinanda/development

:fire: issuess queue
visuddhinanda 1 год назад
Родитель
Сommit
822091deaf

+ 26 - 20
api-v8/app/Http/Api/Mq.php

@@ -5,6 +5,7 @@ namespace App\Http\Api;
 use PhpAmqpLib\Connection\AMQPStreamConnection;
 use PhpAmqpLib\Message\AMQPMessage;
 use PhpAmqpLib\Exchange\AMQPExchangeType;
+use PhpAmqpLib\Exception\AMQPTimeoutException;
 use Illuminate\Support\Facades\Log;
 use Illuminate\Support\Str;
 
@@ -27,11 +28,11 @@ class Mq
         return $connection;
     }
 
-    public static function publish(string $channelName, $message)
+    public static function publish(string $queue, $message)
     {
         //一对一
         try {
-            Log::debug('mq publish {channel} {message}', ['channel' => $channelName, 'message' => $message]);
+            Log::debug('mq publish', ['queue' => $queue, 'message' => $message]);
             $host = config("queue.connections.rabbitmq.host");
             $port = config("queue.connections.rabbitmq.port");
             $user = config("queue.connections.rabbitmq.user");
@@ -43,10 +44,10 @@ class Mq
             }
             $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
             $channel = $connection->channel();
-            $channel->queue_declare($channelName, false, true, false, false);
+            $channel->queue_declare($queue, false, true, false, false);
 
             $msgId = Str::uuid();
-            Log::info('mq push message id=' . $msgId);
+            Log::info("mq push message queue={$queue} id={$msgId}");
             $msg = new AMQPMessage(
                 json_encode($message, JSON_UNESCAPED_UNICODE),
                 [
@@ -54,7 +55,7 @@ class Mq
                     "content_type" => 'application/json; charset=utf-8'
                 ]
             );
-            $channel->basic_publish($msg, '', $channelName);
+            $channel->basic_publish($msg, '', $queue);
 
             $channel->close();
             $connection->close();
@@ -120,22 +121,26 @@ class Mq
             ]);
             if ($callback !== null) {
                 try {
-                    $result = $callback(json_decode($message->body));
-            $message->ack();
-            Log::debug(
-                'mq done',
-                [
-                    'message_id' => $message->get('message_id')
-                ]
-            );
+                    $result = $callback(json_decode($message->getBody()));
+                    $message->ack();
+                    Log::debug(
+                        'mq done',
+                        [
+                            'message_id' => $message->get('message_id')
+                        ]
+                    );
                     if ($result !== 0) {
                         throw new \Exception('task error');
                     }
                 } catch (\Exception $e) {
-			$message->nack();
+                    $message->nack();
+                    Log::error('mq worker exception', [
+                        'message_id' => $message->get('message_id'),
+                        'exception' => $e
+                    ]);
 
                     // push to issues
-                    Log::error('mq worker exception', ['exception' => $e]);
+                    /*
                     $channelName = 'issues';
                     $channelIssues = $connection->channel();
                     $channelIssues->queue_declare($channelName, false, true, false, false);
@@ -149,12 +154,13 @@ class Mq
                     ], JSON_UNESCAPED_UNICODE));
                     $channelIssues->basic_publish($msg, '', $channelName);
                     $channelIssues->close();
+                    */
                 }
 
-                    if (\App\Tools\Tools::isStop()) {
-                        Log::debug('mq worker: .stop file exist. cancel the consumer.');
-                        $message->getChannel()->basic_cancel($message->getConsumerTag());
-                    }
+                if (\App\Tools\Tools::isStop()) {
+                    Log::debug('mq worker: .stop file exist. cancel the consumer.');
+                    $message->getChannel()->basic_cancel($message->getConsumerTag());
+                }
             }
 
 
@@ -209,7 +215,7 @@ class Mq
         while ($channel->is_consuming()) {
             try {
                 $channel->wait(null, false, $timeout);
-            } catch (\AMQPTimeoutException $e) {
+            } catch (AMQPTimeoutException $e) {
             }
         }
     }

+ 4 - 1
dashboard-v4/dashboard/src/components/task/TaskStatus.tsx

@@ -4,7 +4,10 @@ import { useIntl } from "react-intl";
 import { useEffect, useState } from "react";
 import { get } from "../../request";
 
-const TaskStatus = ({ task }: { task?: ITaskData }) => {
+interface IWidget {
+  task?: ITaskData;
+}
+const TaskStatus = ({ task }: IWidget) => {
   const intl = useIntl();
   const [progress, setProgress] = useState(task?.progress);
 

+ 30 - 28
dashboard-v4/dashboard/src/components/task/TaskStatusButton.tsx

@@ -7,7 +7,7 @@ import {
   PopconfirmProps,
 } from "antd";
 import { useIntl } from "react-intl";
-import { CheckOutlined } from "@ant-design/icons";
+import { CheckOutlined, LoadingOutlined } from "@ant-design/icons";
 
 import {
   ITaskData,
@@ -17,6 +17,13 @@ import {
 } from "../api/task";
 import { patch } from "../../request";
 import TaskStatus from "./TaskStatus";
+import { useState } from "react";
+
+interface IStatusMenu {
+  label: string;
+  key: TTaskStatus;
+  disabled?: boolean;
+}
 
 interface IWidget {
   type?: "button" | "tag";
@@ -30,25 +37,24 @@ const TaskStatusButton = ({
   buttonType = "primary",
   onChange,
 }: IWidget) => {
-  interface IStatusMenu {
-    label: string;
-    key: TTaskStatus;
-    disabled?: boolean;
-  }
   const intl = useIntl();
+  const [loading, setLoading] = useState(false);
 
   const setStatus = (setting: ITaskUpdateRequest) => {
     const url = `/v2/task-status/${setting.id}`;
     console.info("api request", url, setting);
-    patch<ITaskUpdateRequest, ITaskListResponse>(url, setting).then((json) => {
-      console.info("api response", json);
-      if (json.ok) {
-        message.success("Success");
-        onChange && onChange(json.data.rows);
-      } else {
-        message.error(json.message);
-      }
-    });
+    setLoading(true);
+    patch<ITaskUpdateRequest, ITaskListResponse>(url, setting)
+      .then((json) => {
+        console.info("api response", json);
+        if (json.ok) {
+          message.success("Success");
+          onChange && onChange(json.data.rows);
+        } else {
+          message.error(json.message);
+        }
+      })
+      .finally(() => setLoading(false));
   };
   const handleMenuClick: MenuProps["onClick"] = (e) => {
     console.log("click", e);
@@ -179,7 +185,7 @@ const TaskStatusButton = ({
     id: `buttons.task.status.change.to.${newStatus}`,
     defaultMessage: "unknown",
   });
-  return (
+  return type === "button" ? (
     <Popconfirm
       title={intl.formatMessage(
         { id: "message.task.status.change" },
@@ -189,19 +195,15 @@ const TaskStatusButton = ({
       okText="Yes"
       cancelText="No"
     >
-      {type === "button" ? (
-        <Dropdown.Button type={buttonType} trigger={["click"]} menu={menuProps}>
-          <CheckOutlined />
-          {buttonText}
-        </Dropdown.Button>
-      ) : (
-        <Dropdown placement="bottomLeft" menu={menuProps}>
-          <span>
-            <TaskStatus task={task} />
-          </span>
-        </Dropdown>
-      )}
+      <Dropdown.Button type={buttonType} trigger={["click"]} menu={menuProps}>
+        {loading ? <LoadingOutlined /> : <CheckOutlined />}
+        {buttonText}
+      </Dropdown.Button>
     </Popconfirm>
+  ) : (
+    <Dropdown placement="bottomLeft" menu={menuProps}>
+      <span>{loading ? <LoadingOutlined /> : <TaskStatus task={task} />}</span>
+    </Dropdown>
   );
 };