| 1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- import logging
- from .service import AiTranslateService, SectionTimeout, LLMFailException, Message
- from .decode_dataclass import ns_to_dataclass
- from .utils import is_stopped
- logger = logging.getLogger(__name__)
- def handle_message(redis, ch, method, id, content_type, body, api_url: str, openai_proxy: str, customer_timeout: int, worker_name: str):
- MaxRetry: int = 3
- try:
- logger.info("process message start (%s) messages", len(body.payload))
- consumer = AiTranslateService(
- redis, ch, method, api_url, openai_proxy, customer_timeout, worker_name)
- messages = ns_to_dataclass([body], Message)
- consumer.process_translate(id, messages[0])
- logger.info(f'message {id} ack')
- consumer.handle_complete()
- except LLMFailException as e:
- errMsg = f'message {id} LLM Fail'
- logger.warning(errMsg)
- consumer.handle_failed(id, errMsg, e)
- except Exception as e:
- logger.error(f"error: {e}")
- logger.exception("Exception")
- # retry
- task_id = consumer.get_task_id()
- retryKey = f'/mq/message/retry/{task_id}'
- retry = int(redis[0].get(retryKey)
- or 0) if redis[0].exists(retryKey) else 0
- if retry >= MaxRetry:
- errMsg = f'超过最大重试次数[{MaxRetry}],任务失败 id={id} task={task_id}'
- redis[0].delete(retryKey)
- logger.warning(errMsg)
- consumer.handle_failed(id, errMsg, e)
- else:
- retry = retry+1
- redis[0].set(retryKey, retry)
- errMsg = f'消息处理错误,需要重试 [{retry}/{MaxRetry}]'
- logger.warning(errMsg)
- consumer.handle_retry(id, errMsg, e)
- finally:
- is_stopped()
|