worker.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. import logging
  2. from .service import AiTranslateService, SectionTimeout, LLMFailException, Message
  3. from .decode_dataclass import ns_to_dataclass
  4. from .utils import is_stopped
  5. logger = logging.getLogger(__name__)
  6. def handle_message(redis, ch, method, id, content_type, body, api_url: str, openai_proxy: str, customer_timeout: int, worker_name: str):
  7. MaxRetry: int = 3
  8. try:
  9. logger.info("process message start (%s) messages", len(body.payload))
  10. consumer = AiTranslateService(
  11. redis, ch, method, api_url, openai_proxy, customer_timeout, worker_name)
  12. messages = ns_to_dataclass([body], Message)
  13. consumer.process_translate(id, messages[0])
  14. logger.info(f'message {id} ack')
  15. consumer.handle_complete()
  16. except LLMFailException as e:
  17. errMsg = f'message {id} LLM Fail'
  18. logger.warning(errMsg)
  19. consumer.handle_failed(id, errMsg, e)
  20. except Exception as e:
  21. logger.error(f"error: {e}")
  22. logger.exception("Exception")
  23. # retry
  24. task_id = consumer.get_task_id()
  25. retryKey = f'/mq/message/retry/{task_id}'
  26. retry = int(redis[0].get(retryKey)
  27. or 0) if redis[0].exists(retryKey) else 0
  28. if retry >= MaxRetry:
  29. errMsg = f'超过最大重试次数[{MaxRetry}],任务失败 id={id} task={task_id}'
  30. redis[0].delete(retryKey)
  31. logger.warning(errMsg)
  32. consumer.handle_failed(id, errMsg, e)
  33. else:
  34. retry = retry+1
  35. redis[0].set(retryKey, retry)
  36. errMsg = f'消息处理错误,需要重试 [{retry}/{MaxRetry}]'
  37. logger.warning(errMsg)
  38. consumer.handle_retry(id, errMsg, e)
  39. finally:
  40. is_stopped()