|
|
@@ -7,26 +7,28 @@ from .utils import is_stopped
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
-def handle_message(redis, ch, method, id, content_type, body, api_url: str,openai_proxy:str, customer_timeout: int):
|
|
|
+def handle_message(redis, ch, method, id, content_type, body, api_url: str, openai_proxy: str, customer_timeout: int):
|
|
|
MaxRetry: int = 3
|
|
|
try:
|
|
|
logger.info("process message start (%s) messages", len(body.payload))
|
|
|
consumer = AiTranslateService(
|
|
|
- redis, ch, method, api_url,openai_proxy, customer_timeout)
|
|
|
+ redis, ch, method, api_url, openai_proxy, customer_timeout)
|
|
|
messages = ns_to_dataclass([body], Message)
|
|
|
consumer.process_translate(id, messages[0])
|
|
|
- ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息
|
|
|
logger.info(f'message {id} ack')
|
|
|
+ ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息
|
|
|
except SectionTimeout as e:
|
|
|
# 时间到了,活还没干完 NACK 并重新入队
|
|
|
- logger.info(
|
|
|
+ logger.warning(
|
|
|
f'time is not enough for complete current message id={id}. requeued')
|
|
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
|
|
except LLMFailException as e:
|
|
|
+ logger.warning(f'message {id} LLMFailException')
|
|
|
ch.basic_nack(delivery_tag=method.delivery_tag,
|
|
|
requeue=False)
|
|
|
- logger.warning(f'message {id} LLMFailException')
|
|
|
except Exception as e:
|
|
|
+ logger.error(f"error: {e}")
|
|
|
+ logger.exception("Exception")
|
|
|
# retry
|
|
|
retryKey = f'{redis[1]}/message/retry/{id}'
|
|
|
retry = int(redis[0].get(retryKey)
|
|
|
@@ -42,7 +44,5 @@ def handle_message(redis, ch, method, id, content_type, body, api_url: str,opena
|
|
|
# NACK 并重新入队
|
|
|
logger.warning(f'消息处理错误,重新压入队列 [{retry}/{MaxRetry}]')
|
|
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
|
|
- logger.error(f"error: {e}")
|
|
|
- logger.exception("发生异常")
|
|
|
finally:
|
|
|
is_stopped()
|