瀏覽代碼

处理重试

visuddhinanda 9 月之前
父節點
當前提交
540c9ce5d5
共有 1 個文件被更改,包括 43 次插入2 次删除
  1. 43 2
      ai-translate/ai_translate/worker.py

+ 43 - 2
ai-translate/ai_translate/worker.py

@@ -1,7 +1,48 @@
 import logging
 import logging
+from .ai_translate import AiTranslateService, SectionTimeout, Message, Sentence, AiModel, Task, TaskProgress
+from .decode_dataclass import ns_to_dataclass, decode_dataclass, decode_json_to_type, decode_dataclass_list
+from typing import List
 
 
 logger = logging.getLogger(__name__)
 logger = logging.getLogger(__name__)
 
 
 
 
-def handle_message(context, id, content_type, body):
-    logger.info("TODO: --- using redis namespace(%s) ---", context[1])
+class TaskFailException(Exception):
+    def __init__(self, message="task fail"):
+        self.message = message
+        super().__init__(self.message)
+
+
+def handle_message(redis, ch, method, id, content_type, body, api_url, customer_timeout):
+    maxRetry = 3
+    try:
+        logger.info("process message start (%s) messages", len(body.payload))
+        consumer = AiTranslateService(
+            redis, ch, method, api_url, customer_timeout)
+        messages = ns_to_dataclass([body], Message)
+        consumer.process_translate(id, messages[0])
+        ch.basic_ack(delivery_tag=method.delivery_tag)  # 确认消息
+    except SectionTimeout as e:
+        # 时间到了,活还没干完 NACK 并重新入队
+        ch.basic_nack(delivery_tag=method.delivery_tag,
+                      requeue=True)
+        pass
+    except Exception as e:
+        # retry
+        retryKey = f'{redis[1]}/message/retry/{id}'
+        retry = 0
+        if redis[0].exists(retryKey):
+            retry = redis[0].get(retryKey)
+        if retry > maxRetry:
+            logger.error(f'超过最大重试次数[{maxRetry}],任务失败')
+            # NACK 丢弃或者进入死信队列
+            ch.basic_nack(delivery_tag=method.delivery_tag,
+                          requeue=False)
+            raise TaskFailException
+        retry = retry+1
+        redis[0].set(retryKey, retry)
+        # NACK 并重新入队
+        logger.warning(f'消息处理错误,重新压入队列 [{retry}/{maxRetry}]')
+        ch.basic_nack(delivery_tag=method.delivery_tag,
+                      requeue=True)
+        logger.error(f"error: {e}")
+        logger.exception("发生异常")