Procházet zdrojové kódy

Merge pull request #2345 from visuddhinanda/development

Development
visuddhinanda před 8 měsíci
rodič
revize
5bc4bbbf45

+ 1 - 1
ai-translate/ai_translate/__init__.py

@@ -44,7 +44,7 @@ def start_consumer(context, name, config, queue, callback, proxy):
                        callback, proxy, HeartBeat, name)
                        callback, proxy, HeartBeat, name)
 
 
     channel.basic_consume(
     channel.basic_consume(
-        queue=queue, on_message_callback=_callback, auto_ack=False)
+        queue=queue, on_message_callback=_callback, auto_ack=True)
 
 
     name = "%s.%s.%d" % (name, socket.gethostname(), os.getpid())
     name = "%s.%s.%d" % (name, socket.gethostname(), os.getpid())
     logger.info('start a consumer(%s) for queue(%s)', name, queue)
     logger.info('start a consumer(%s) for queue(%s)', name, queue)

+ 6 - 17
ai-translate/ai_translate/service.py

@@ -131,7 +131,7 @@ class AiTranslateService:
 
 
         self.redis_clusters.set(
         self.redis_clusters.set(
             f"{self.redis_namespace}/task/{self.task.id}/message_id", message_id)
             f"{self.redis_namespace}/task/{self.task.id}/message_id", message_id)
-        pointer_key = f"{self.redis_namespace}/task/{message_id}/pointer"
+        pointer_key = f"{self.redis_namespace}/task/{self.task.id}/pointer"
         pointer = 0
         pointer = 0
 
 
         if self.redis_clusters.exists(pointer_key):
         if self.redis_clusters.exists(pointer_key):
@@ -216,20 +216,6 @@ class AiTranslateService:
             else:
             else:
                 logger.error('no task discussion root')
                 logger.error('no task discussion root')
 
 
-            if i + 1 < len(body.payload):
-                self.redis_clusters.set(pointer_key, i+1)
-                # 计算本次时间和剩余时间
-                # breakpoint()
-                onceTime = int(time.time())-startAt
-                times.append(onceTime)
-                times.sort(reverse=True)
-                # 取出第一个元素
-                maxTime = times[0]
-                # 计算剩余时间
-                remain = self.customer_timeout-(int(time.time())-taskStartAt)
-                if remain < maxTime:
-                    # 时间不足
-                    raise SectionTimeout
         # 任务完成 修改任务状态为 done
         # 任务完成 修改任务状态为 done
         self._set_task_status(self.task.id, 'done')
         self._set_task_status(self.task.id, 'done')
         self.redis_clusters.delete(pointer_key)
         self.redis_clusters.delete(pointer_key)
@@ -570,10 +556,10 @@ class AiTranslateService:
             logger.error(f'处理失败ai任务时出错: {str(e)}')
             logger.error(f'处理失败ai任务时出错: {str(e)}')
 
 
     def handle_retry(self, message_id: str, message: str, exception: Exception):
     def handle_retry(self, message_id: str, message: str, exception: Exception):
-        """处理失败的翻译任务"""
+        """处理失败 需要重试"""
         try:
         try:
             # 失败时的业务逻辑
             # 失败时的业务逻辑
-
+            self._set_task_status(self.task.id, 'pause')
             # 将故障信息写入task discussion
             # 将故障信息写入task discussion
             if self.task_topic_id:
             if self.task_topic_id:
                 error_message = f"任务处理出错 正在重试 \n- message id={message_id} \n- 错误信息:{message} \n- 异常:{str(exception)}"
                 error_message = f"任务处理出错 正在重试 \n- message id={message_id} \n- 错误信息:{message} \n- 异常:{str(exception)}"
@@ -600,3 +586,6 @@ class AiTranslateService:
                 )
                 )
         except Exception as e:
         except Exception as e:
             logger.error(f'处理任务完成时出错: {str(e)}')
             logger.error(f'处理任务完成时出错: {str(e)}')
+
+    def get_task_id(self) -> str:
+        return self.task.id

+ 5 - 16
ai-translate/ai_translate/worker.py

@@ -16,40 +16,29 @@ def handle_message(redis, ch, method, id, content_type, body, api_url: str, open
         messages = ns_to_dataclass([body], Message)
         messages = ns_to_dataclass([body], Message)
         consumer.process_translate(id, messages[0])
         consumer.process_translate(id, messages[0])
         logger.info(f'message {id} ack')
         logger.info(f'message {id} ack')
-        ch.basic_ack(delivery_tag=method.delivery_tag)  # 确认消息
         consumer.handle_complete()
         consumer.handle_complete()
-    except SectionTimeout as e:
-        # 时间到了,活还没干完 NACK 并重新入队
-        logger.warning(
-            f'time is not enough for complete current message id={id}. requeued')
-        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
     except LLMFailException as e:
     except LLMFailException as e:
         errMsg = f'message {id} LLM Fail'
         errMsg = f'message {id} LLM Fail'
         logger.warning(errMsg)
         logger.warning(errMsg)
         consumer.handle_failed(id, errMsg, e)
         consumer.handle_failed(id, errMsg, e)
-        ch.basic_nack(delivery_tag=method.delivery_tag,
-                      requeue=False)
     except Exception as e:
     except Exception as e:
         logger.error(f"error: {e}")
         logger.error(f"error: {e}")
         logger.exception("Exception")
         logger.exception("Exception")
         # retry
         # retry
-        retryKey = f'{redis[1]}/message/retry/{id}'
+        task_id = consumer.get_task_id()
+        retryKey = f'/mq/message/retry/{task_id}'
         retry = int(redis[0].get(retryKey)
         retry = int(redis[0].get(retryKey)
                     or 0) if redis[0].exists(retryKey) else 0
                     or 0) if redis[0].exists(retryKey) else 0
         if retry >= MaxRetry:
         if retry >= MaxRetry:
-            errMsg = f'超过最大重试次数[{MaxRetry}],任务失败 id={id}'
+            errMsg = f'超过最大重试次数[{MaxRetry}],任务失败 id={id} task={task_id}'
+            redis[0].delete(retryKey)
             logger.warning(errMsg)
             logger.warning(errMsg)
             consumer.handle_failed(id, errMsg, e)
             consumer.handle_failed(id, errMsg, e)
-            # NACK 丢弃或者进入死信队列
-            ch.basic_nack(delivery_tag=method.delivery_tag,
-                          requeue=False)
         else:
         else:
             retry = retry+1
             retry = retry+1
             redis[0].set(retryKey, retry)
             redis[0].set(retryKey, retry)
-            # NACK 并重新入队
-            errMsg = f'消息处理错误,重新压入队列 [{retry}/{MaxRetry}]'
+            errMsg = f'消息处理错误,需要重试 [{retry}/{MaxRetry}]'
             logger.warning(errMsg)
             logger.warning(errMsg)
             consumer.handle_retry(id, errMsg, e)
             consumer.handle_retry(id, errMsg, e)
-            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
     finally:
     finally:
         is_stopped()
         is_stopped()

+ 24 - 13
dashboard-v4/dashboard/src/components/task/TaskLog.tsx

@@ -1,10 +1,11 @@
-import { Button, Timeline } from "antd";
+import { Button, Skeleton, Timeline } from "antd";
 import React, { useEffect, useState } from "react";
 import React, { useEffect, useState } from "react";
 import { get } from "../../request";
 import { get } from "../../request";
 import { ICommentApiData, ICommentListResponse } from "../api/Comment";
 import { ICommentApiData, ICommentListResponse } from "../api/Comment";
 import TimeShow from "../general/TimeShow";
 import TimeShow from "../general/TimeShow";
 import { StatusButtons, TTaskStatus } from "../api/task";
 import { StatusButtons, TTaskStatus } from "../api/task";
 import { TaskStatusColor } from "./TaskStatus";
 import { TaskStatusColor } from "./TaskStatus";
+import User from "../auth/User";
 
 
 interface IWidget {
 interface IWidget {
   taskId?: string;
   taskId?: string;
@@ -12,15 +13,21 @@ interface IWidget {
 }
 }
 const TaskLog = ({ taskId, onMore }: IWidget) => {
 const TaskLog = ({ taskId, onMore }: IWidget) => {
   const [data, setData] = useState<ICommentApiData[]>();
   const [data, setData] = useState<ICommentApiData[]>();
+  const [total, setTotal] = useState<number>(0);
+  const [loading, setLoading] = useState(false);
   useEffect(() => {
   useEffect(() => {
-    const url: string = `/v2/discussion?view=res_id&id=${taskId}&limit=5`;
+    const url: string = `/v2/discussion?type=discussion&res_type=task&view=question&id=${taskId}&limit=5&offset=0&status=active`;
     console.info("api request", url);
     console.info("api request", url);
-    get<ICommentListResponse>(url).then((json) => {
-      if (json.ok) {
-        console.debug("discussion api response", json);
-        setData(json.data.rows);
-      }
-    });
+    setLoading(true);
+    get<ICommentListResponse>(url)
+      .then((json) => {
+        if (json.ok) {
+          console.debug("discussion api response", json);
+          setData(json.data.rows);
+          setTotal(json.data.count);
+        }
+      })
+      .finally(() => setLoading(false));
   }, [taskId]);
   }, [taskId]);
 
 
   function findKeywordInTitle(title?: string): string | undefined {
   function findKeywordInTitle(title?: string): string | undefined {
@@ -41,12 +48,14 @@ const TaskLog = ({ taskId, onMore }: IWidget) => {
   return (
   return (
     <>
     <>
       <Timeline>
       <Timeline>
+        {loading && <Skeleton paragraph={{ rows: 1 }} active avatar />}
         {data?.map((item, id) => {
         {data?.map((item, id) => {
           const status = findKeywordInTitle(item.title);
           const status = findKeywordInTitle(item.title);
           return (
           return (
             <Timeline.Item
             <Timeline.Item
               key={id}
               key={id}
               color={TaskStatusColor(status as TTaskStatus)}
               color={TaskStatusColor(status as TTaskStatus)}
+              dot={<User {...item.editor} showName={false} />}
             >
             >
               <div>
               <div>
                 <TimeShow
                 <TimeShow
@@ -59,11 +68,13 @@ const TaskLog = ({ taskId, onMore }: IWidget) => {
             </Timeline.Item>
             </Timeline.Item>
           );
           );
         })}
         })}
-        <Timeline.Item>
-          <Button type="link" onClick={onMore}>
-            更多
-          </Button>
-        </Timeline.Item>
+        {total > 5 && (
+          <Timeline.Item>
+            <Button type="link" onClick={onMore}>
+              更多
+            </Button>
+          </Timeline.Item>
+        )}
       </Timeline>
       </Timeline>
     </>
     </>
   );
   );