|
@@ -1,18 +1,12 @@
|
|
|
import logging
|
|
import logging
|
|
|
|
|
|
|
|
-from .service import AiTranslateService, SectionTimeout, Message
|
|
|
|
|
|
|
+from .service import AiTranslateService, SectionTimeout, LLMFailException, Message
|
|
|
from .decode_dataclass import ns_to_dataclass
|
|
from .decode_dataclass import ns_to_dataclass
|
|
|
from .utils import is_stopped
|
|
from .utils import is_stopped
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
-class TaskFailException(Exception):
|
|
|
|
|
- def __init__(self, message="task fail"):
|
|
|
|
|
- self.message = message
|
|
|
|
|
- super().__init__(self.message)
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
def handle_message(redis, ch, method, id, content_type, body, api_url: str, customer_timeout: int):
|
|
def handle_message(redis, ch, method, id, content_type, body, api_url: str, customer_timeout: int):
|
|
|
MaxRetry: int = 3
|
|
MaxRetry: int = 3
|
|
|
try:
|
|
try:
|
|
@@ -22,11 +16,15 @@ def handle_message(redis, ch, method, id, content_type, body, api_url: str, cust
|
|
|
messages = ns_to_dataclass([body], Message)
|
|
messages = ns_to_dataclass([body], Message)
|
|
|
consumer.process_translate(id, messages[0])
|
|
consumer.process_translate(id, messages[0])
|
|
|
ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息
|
|
ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息
|
|
|
|
|
+ logger.info(f'message {id} ack')
|
|
|
except SectionTimeout as e:
|
|
except SectionTimeout as e:
|
|
|
# 时间到了,活还没干完 NACK 并重新入队
|
|
# 时间到了,活还没干完 NACK 并重新入队
|
|
|
logger.info(
|
|
logger.info(
|
|
|
f'time is not enough for complete current message id={id}. requeued')
|
|
f'time is not enough for complete current message id={id}. requeued')
|
|
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
|
|
|
|
+ except LLMFailException as e:
|
|
|
|
|
+ ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息
|
|
|
|
|
+ logger.info(f'message {id} ack')
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
# retry
|
|
# retry
|
|
|
retryKey = f'{redis[1]}/message/retry/{id}'
|
|
retryKey = f'{redis[1]}/message/retry/{id}'
|
|
@@ -37,13 +35,14 @@ def handle_message(redis, ch, method, id, content_type, body, api_url: str, cust
|
|
|
# NACK 丢弃或者进入死信队列
|
|
# NACK 丢弃或者进入死信队列
|
|
|
ch.basic_nack(delivery_tag=method.delivery_tag,
|
|
ch.basic_nack(delivery_tag=method.delivery_tag,
|
|
|
requeue=False)
|
|
requeue=False)
|
|
|
- retry = retry+1
|
|
|
|
|
- redis[0].set(retryKey, retry)
|
|
|
|
|
- # NACK 并重新入队
|
|
|
|
|
- logger.warning(f'消息处理错误,重新压入队列 [{retry}/{MaxRetry}]')
|
|
|
|
|
- ch.basic_nack(delivery_tag=method.delivery_tag,
|
|
|
|
|
- requeue=True)
|
|
|
|
|
- logger.error(f"error: {e}")
|
|
|
|
|
- logger.exception("发生异常")
|
|
|
|
|
|
|
+ else:
|
|
|
|
|
+ retry = retry+1
|
|
|
|
|
+ redis[0].set(retryKey, retry)
|
|
|
|
|
+ # NACK 并重新入队
|
|
|
|
|
+ logger.warning(f'消息处理错误,重新压入队列 [{retry}/{MaxRetry}]')
|
|
|
|
|
+ ch.basic_nack(delivery_tag=method.delivery_tag,
|
|
|
|
|
+ requeue=True)
|
|
|
|
|
+ logger.error(f"error: {e}")
|
|
|
|
|
+ logger.exception("发生异常")
|
|
|
finally:
|
|
finally:
|
|
|
is_stopped()
|
|
is_stopped()
|