|
@@ -16,18 +16,10 @@ def handle_message(redis, ch, method, id, content_type, body, api_url: str, open
|
|
|
messages = ns_to_dataclass([body], Message)
|
|
messages = ns_to_dataclass([body], Message)
|
|
|
consumer.process_translate(id, messages[0])
|
|
consumer.process_translate(id, messages[0])
|
|
|
logger.info(f'message {id} ack')
|
|
logger.info(f'message {id} ack')
|
|
|
- ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息
|
|
|
|
|
consumer.handle_complete()
|
|
consumer.handle_complete()
|
|
|
- except SectionTimeout as e:
|
|
|
|
|
- # 时间到了,活还没干完 NACK 并重新入队
|
|
|
|
|
- logger.warning(
|
|
|
|
|
- f'time is not enough for complete current message id={id}. requeued')
|
|
|
|
|
- ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
|
|
|
|
except LLMFailException as e:
|
|
except LLMFailException as e:
|
|
|
errMsg = f'message {id} LLM Fail'
|
|
errMsg = f'message {id} LLM Fail'
|
|
|
logger.warning(errMsg)
|
|
logger.warning(errMsg)
|
|
|
- consumer.handle_failed(id, errMsg, e)
|
|
|
|
|
- ch.basic_nack(delivery_tag=method.delivery_tag,
|
|
|
|
|
requeue=False)
|
|
requeue=False)
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
logger.error(f"error: {e}")
|
|
logger.error(f"error: {e}")
|
|
@@ -40,9 +32,6 @@ def handle_message(redis, ch, method, id, content_type, body, api_url: str, open
|
|
|
errMsg = f'超过最大重试次数[{MaxRetry}],任务失败 id={id}'
|
|
errMsg = f'超过最大重试次数[{MaxRetry}],任务失败 id={id}'
|
|
|
logger.warning(errMsg)
|
|
logger.warning(errMsg)
|
|
|
consumer.handle_failed(id, errMsg, e)
|
|
consumer.handle_failed(id, errMsg, e)
|
|
|
- # NACK 丢弃或者进入死信队列
|
|
|
|
|
- ch.basic_nack(delivery_tag=method.delivery_tag,
|
|
|
|
|
- requeue=False)
|
|
|
|
|
else:
|
|
else:
|
|
|
retry = retry+1
|
|
retry = retry+1
|
|
|
redis[0].set(retryKey, retry)
|
|
redis[0].set(retryKey, retry)
|
|
@@ -50,6 +39,5 @@ def handle_message(redis, ch, method, id, content_type, body, api_url: str, open
|
|
|
errMsg = f'消息处理错误,重新压入队列 [{retry}/{MaxRetry}]'
|
|
errMsg = f'消息处理错误,重新压入队列 [{retry}/{MaxRetry}]'
|
|
|
logger.warning(errMsg)
|
|
logger.warning(errMsg)
|
|
|
consumer.handle_retry(id, errMsg, e)
|
|
consumer.handle_retry(id, errMsg, e)
|
|
|
- ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
|
|
|
|
finally:
|
|
finally:
|
|
|
is_stopped()
|
|
is_stopped()
|