|
|
@@ -1,7 +1,8 @@
|
|
|
import logging
|
|
|
-from .ai_translate import AiTranslateService, SectionTimeout, Message, Sentence, AiModel, Task, TaskProgress
|
|
|
-from .decode_dataclass import ns_to_dataclass, decode_dataclass, decode_json_to_type, decode_dataclass_list
|
|
|
-from typing import List
|
|
|
+
|
|
|
+from .ai_translate import AiTranslateService, SectionTimeout, Message
|
|
|
+from .decode_dataclass import ns_to_dataclass
|
|
|
+from . import is_stopped
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@@ -13,7 +14,7 @@ class TaskFailException(Exception):
|
|
|
|
|
|
|
|
|
def handle_message(redis, ch, method, id, content_type, body, api_url, customer_timeout):
|
|
|
- maxRetry = 3
|
|
|
+ MaxRetry = 3
|
|
|
try:
|
|
|
logger.info("process message start (%s) messages", len(body.payload))
|
|
|
consumer = AiTranslateService(
|
|
|
@@ -23,17 +24,15 @@ def handle_message(redis, ch, method, id, content_type, body, api_url, customer_
|
|
|
ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息
|
|
|
except SectionTimeout as e:
|
|
|
# 时间到了,活还没干完 NACK 并重新入队
|
|
|
- ch.basic_nack(delivery_tag=method.delivery_tag,
|
|
|
- requeue=True)
|
|
|
- pass
|
|
|
+ ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
|
|
except Exception as e:
|
|
|
# retry
|
|
|
retryKey = f'{redis[1]}/message/retry/{id}'
|
|
|
retry = 0
|
|
|
if redis[0].exists(retryKey):
|
|
|
retry = redis[0].get(retryKey)
|
|
|
- if retry > maxRetry:
|
|
|
- logger.error(f'超过最大重试次数[{maxRetry}],任务失败')
|
|
|
+ if retry > MaxRetry:
|
|
|
+ logger.error(f'超过最大重试次数[{MaxRetry}],任务失败')
|
|
|
# NACK 丢弃或者进入死信队列
|
|
|
ch.basic_nack(delivery_tag=method.delivery_tag,
|
|
|
requeue=False)
|
|
|
@@ -41,8 +40,10 @@ def handle_message(redis, ch, method, id, content_type, body, api_url, customer_
|
|
|
retry = retry+1
|
|
|
redis[0].set(retryKey, retry)
|
|
|
# NACK 并重新入队
|
|
|
- logger.warning(f'消息处理错误,重新压入队列 [{retry}/{maxRetry}]')
|
|
|
+ logger.warning(f'消息处理错误,重新压入队列 [{retry}/{MaxRetry}]')
|
|
|
ch.basic_nack(delivery_tag=method.delivery_tag,
|
|
|
requeue=True)
|
|
|
logger.error(f"error: {e}")
|
|
|
logger.exception("发生异常")
|
|
|
+ finally:
|
|
|
+ is_stopped()
|