|
@@ -20,23 +20,24 @@ def handle_message(redis, ch, method, id, content_type, body, api_url: str, open
|
|
|
except LLMFailException as e:
|
|
except LLMFailException as e:
|
|
|
errMsg = f'message {id} LLM Fail'
|
|
errMsg = f'message {id} LLM Fail'
|
|
|
logger.warning(errMsg)
|
|
logger.warning(errMsg)
|
|
|
- requeue=False)
|
|
|
|
|
|
|
+ consumer.handle_failed(id, errMsg, e)
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
logger.error(f"error: {e}")
|
|
logger.error(f"error: {e}")
|
|
|
logger.exception("Exception")
|
|
logger.exception("Exception")
|
|
|
# retry
|
|
# retry
|
|
|
- retryKey = f'{redis[1]}/message/retry/{id}'
|
|
|
|
|
|
|
+ task_id = consumer.get_task_id()
|
|
|
|
|
+ retryKey = f'/mq/message/retry/{task_id}'
|
|
|
retry = int(redis[0].get(retryKey)
|
|
retry = int(redis[0].get(retryKey)
|
|
|
or 0) if redis[0].exists(retryKey) else 0
|
|
or 0) if redis[0].exists(retryKey) else 0
|
|
|
if retry >= MaxRetry:
|
|
if retry >= MaxRetry:
|
|
|
- errMsg = f'超过最大重试次数[{MaxRetry}],任务失败 id={id}'
|
|
|
|
|
|
|
+ errMsg = f'超过最大重试次数[{MaxRetry}],任务失败 id={id} task={task_id}'
|
|
|
|
|
+ redis[0].delete(retryKey)
|
|
|
logger.warning(errMsg)
|
|
logger.warning(errMsg)
|
|
|
consumer.handle_failed(id, errMsg, e)
|
|
consumer.handle_failed(id, errMsg, e)
|
|
|
else:
|
|
else:
|
|
|
retry = retry+1
|
|
retry = retry+1
|
|
|
redis[0].set(retryKey, retry)
|
|
redis[0].set(retryKey, retry)
|
|
|
- # NACK 并重新入队
|
|
|
|
|
- errMsg = f'消息处理错误,重新压入队列 [{retry}/{MaxRetry}]'
|
|
|
|
|
|
|
+ errMsg = f'消息处理错误,需要重试 [{retry}/{MaxRetry}]'
|
|
|
logger.warning(errMsg)
|
|
logger.warning(errMsg)
|
|
|
consumer.handle_retry(id, errMsg, e)
|
|
consumer.handle_retry(id, errMsg, e)
|
|
|
finally:
|
|
finally:
|