|
@@ -23,7 +23,9 @@ def handle_message(redis, ch, method, id, content_type, body, api_url: str, open
|
|
|
f'time is not enough for complete current message id={id}. requeued')
|
|
f'time is not enough for complete current message id={id}. requeued')
|
|
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
|
|
except LLMFailException as e:
|
|
except LLMFailException as e:
|
|
|
- logger.warning(f'message {id} LLMFailException')
|
|
|
|
|
|
|
+ errMsg = f'message {id} LLM Fail'
|
|
|
|
|
+ logger.warning(errMsg)
|
|
|
|
|
+ consumer.handle_failed(id, errMsg, e)
|
|
|
ch.basic_nack(delivery_tag=method.delivery_tag,
|
|
ch.basic_nack(delivery_tag=method.delivery_tag,
|
|
|
requeue=False)
|
|
requeue=False)
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
@@ -33,8 +35,10 @@ def handle_message(redis, ch, method, id, content_type, body, api_url: str, open
|
|
|
retryKey = f'{redis[1]}/message/retry/{id}'
|
|
retryKey = f'{redis[1]}/message/retry/{id}'
|
|
|
retry = int(redis[0].get(retryKey)
|
|
retry = int(redis[0].get(retryKey)
|
|
|
or 0) if redis[0].exists(retryKey) else 0
|
|
or 0) if redis[0].exists(retryKey) else 0
|
|
|
- if retry > MaxRetry:
|
|
|
|
|
- logger.warning(f'超过最大重试次数[{MaxRetry}],任务失败 id={id}')
|
|
|
|
|
|
|
+ if retry >= MaxRetry:
|
|
|
|
|
+ errMsg = f'超过最大重试次数[{MaxRetry}],任务失败 id={id}'
|
|
|
|
|
+ logger.warning(errMsg)
|
|
|
|
|
+ consumer.handle_failed(id, errMsg, e)
|
|
|
# NACK 丢弃或者进入死信队列
|
|
# NACK 丢弃或者进入死信队列
|
|
|
ch.basic_nack(delivery_tag=method.delivery_tag,
|
|
ch.basic_nack(delivery_tag=method.delivery_tag,
|
|
|
requeue=False)
|
|
requeue=False)
|
|
@@ -42,7 +46,9 @@ def handle_message(redis, ch, method, id, content_type, body, api_url: str, open
|
|
|
retry = retry+1
|
|
retry = retry+1
|
|
|
redis[0].set(retryKey, retry)
|
|
redis[0].set(retryKey, retry)
|
|
|
# NACK 并重新入队
|
|
# NACK 并重新入队
|
|
|
- logger.warning(f'消息处理错误,重新压入队列 [{retry}/{MaxRetry}]')
|
|
|
|
|
|
|
+ errMsg = f'消息处理错误,重新压入队列 [{retry}/{MaxRetry}]'
|
|
|
|
|
+ logger.warning(errMsg)
|
|
|
|
|
+ consumer.handle_retry(id, errMsg, e)
|
|
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
|
|
finally:
|
|
finally:
|
|
|
is_stopped()
|
|
is_stopped()
|