Просмотр исходного кода

Merge pull request #2306 from visuddhinanda/development

Development
visuddhinanda 9 месяцев назад
Родитель
Сommit
ef2e3fa9a4

+ 1 - 0
ai-translate/.gitignore

@@ -3,3 +3,4 @@
 
 __pycache__/
 ai_translate.egg-info/
+.stop

+ 1 - 1
ai-translate/README.md

@@ -9,7 +9,7 @@ $ source $HOME/tmp/python3/bin/activate
 > python3 -m pip install -e .
 
 > python3 -m ai_translate -h
-> python3 -m ai_translate -d -c config.toml -n worker-us-1 -q ai.translate.us
+> python3 -m ai_translate -d -c config.toml -n worker-us-1 -q ai_translate_cn
 
 # exit the virtual env
 > deactivate

+ 15 - 7
ai-translate/ai_translate/__init__.py

@@ -1,10 +1,12 @@
 import logging
 import tomllib
+import json
 import os
 import sys
 
 import pika
 from redis.cluster import RedisCluster
+from types import SimpleNamespace
 
 from .worker import handle_message
 
@@ -25,22 +27,25 @@ def open_redis_cluster(config):
 
 
 def start_consumer(context, name, queue, config):
+    mq_config = config['rabbitmq']
     connection = pika.BlockingConnection(
         pika.ConnectionParameters(
-            host=config['host'], port=config['port'],
+            host=mq_config['host'], port=mq_config['port'],
             credentials=pika.PlainCredentials(
-                config['user'], config['password']),
-            virtual_host=config['virtual-host']))
+                mq_config['user'], mq_config['password']),
+            virtual_host=mq_config['virtual-host']))
     channel = connection.channel()
 
     def callback(ch, method, properties, body):
         logger.info("received message(%s,%s)",
                     properties.message_id, properties.content_type)
-        handle_message(context, properties.message_id,
-                       properties.content_type, body)
+        handle_message(context, ch, method, properties.message_id,
+                       properties.content_type, json.loads(
+                           body, object_hook=SimpleNamespace),
+                       config['app']['api-url'], config['rabbitmq']['customer-timeout'])
 
     channel.basic_consume(
-        queue=queue, on_message_callback=callback, auto_ack=True)
+        queue=queue, on_message_callback=callback, auto_ack=False)
 
     logger.info('start a consumer(%s) for queue(%s)', name, queue)
     channel.start_consuming()
@@ -51,4 +56,7 @@ def launch(name, queue, config_file):
     with open(config_file, "rb") as config_fd:
         config = tomllib.load(config_fd)
         redis_cli = open_redis_cluster(config['redis'])
-        start_consumer(redis_cli, name, queue, config['rabbitmq'])
+        logger.info('api-url:(%s)', config['app']['api-url'])
+        logger.info('customer-timeout:(%s)',
+                    config['rabbitmq']['customer-timeout'])
+        start_consumer(redis_cli, name, queue, config)

+ 141 - 148
ai-translate/ai_translate/ai_translate.py

@@ -4,10 +4,10 @@ import logging
 import requests
 from typing import List, Dict, Any, Optional
 from datetime import datetime
-import redis
 from requests.exceptions import RequestException
 from dataclasses import dataclass
-from abc import ABC, abstractmethod
+import pdb
+import time
 
 # 配置日志
 logging.basicConfig(level=logging.INFO)
@@ -15,8 +15,15 @@ logger = logging.getLogger(__name__)
 
 
 class DatabaseException(Exception):
-    """数据库异常"""
-    pass
+    def __init__(self, message="database api access exception"):
+        self.message = message
+        super().__init__(self.message)
+
+
+class SectionTimeout(Exception):
+    def __init__(self, message="片段超时"):
+        self.message = message
+        super().__init__(self.message)
 
 
 @dataclass
@@ -27,7 +34,7 @@ class TaskProgress:
 
 
 @dataclass
-class Task:
+class TaskInfo:
     """任务模型"""
     id: str
     title: str
@@ -36,6 +43,13 @@ class Task:
     status: str
 
 
+@dataclass
+class Task:
+    """任务模型"""
+    info: TaskInfo
+    progress: TaskProgress
+
+
 @dataclass
 class AiModel:
     """AI模型配置"""
@@ -61,7 +75,7 @@ class Sentence:
 
 
 @dataclass
-class Message:
+class Payload:
     """消息模型"""
     model: AiModel
     task: Task
@@ -69,180 +83,113 @@ class Message:
     sentence: Sentence
 
 
-class RedisClusters:
-    """Redis集群工具类"""
-
-    def __init__(self, host='localhost', port=6379, db=0):
-        self.redis_client = redis.Redis(host=host, port=port, db=db)
-
-    def put(self, key: str, value: Any, ttl: Optional[int] = None):
-        """存储数据"""
-        if isinstance(value, (dict, list)):
-            value = json.dumps(value)
-        if ttl:
-            self.redis_client.setex(key, ttl, value)
-        else:
-            self.redis_client.set(key, value)
-
-    def get(self, key: str) -> Any:
-        """获取数据"""
-        value = self.redis_client.get(key)
-        if value:
-            return value.decode('utf-8')
-        return None
-
-    def has(self, key: str) -> bool:
-        """检查键是否存在"""
-        return self.redis_client.exists(key) > 0
-
-    def forget(self, key: str):
-        """删除键"""
-        self.redis_client.delete(key)
-
-
-class JobInterface(ABC):
-    """作业接口"""
-
-    @abstractmethod
-    def is_stop(self) -> bool:
-        """检查作业是否停止"""
-        pass
+@dataclass
+class Message:
+    """消息模型"""
+    model: AiModel
+    task: Task
+    payload: List[Payload]
+    sentence: Sentence
 
 
 class AiTranslateService:
     """AI翻译服务"""
 
-    def __init__(self, app_url: str = "http://localhost:8000"):
+    def __init__(self, redis, ch, method, api_url, customer_timeout):
         self.queue = 'ai_translate'
         self.model_token = None
         self.task = None
-        self.redis_clusters = RedisClusters()
-        self.mq = RabbitMQService()
+        self.redis_clusters = redis[0]
+        self.redis_namespace = redis[1]
         self.api_timeout = 100
         self.llm_timeout = 300
         self.task_topic_id = None
-        self.app_url = app_url
+        self.api_url = api_url
+        self.customer_timeout = customer_timeout
+        self.channel = ch
+        self.maxProcessTime = 15 * 60  # 一个句子的最大处理时间
 
-    def process_translate(self, message_id: str, messages: List[Message], job: JobInterface) -> bool:
+    def process_translate(self, message_id: str, body: Message) -> bool:
         """处理翻译任务"""
 
-        if not messages or len(messages) == 0:
-            logger.error('message is not array')
-            return False
+        taskStartAt = int(time.time())
 
-        first = messages[0]
-        self.task = first.task
-        task_id = self.task.id
+        self.task = body.task
 
-        self.redis_clusters.put(f"/task/{task_id}/message_id", message_id)
-        pointer_key = f"/task/{task_id}/pointer"
+        self.redis_clusters.set(
+            f"{self.redis_namespace}/task/{self.task.id}/message_id", message_id)
+        pointer_key = f"{self.redis_namespace}/task/{message_id}/pointer"
         pointer = 0
 
-        if self.redis_clusters.has(pointer_key):
+        if self.redis_clusters.exists(pointer_key):
             # 回到上次中断的点
             pointer = int(self.redis_clusters.get(pointer_key))
             logger.info(f"last break point {pointer}")
+            if pointer >= len(body.payload):
+                self.redis_clusters.delete(pointer_key)
+                return True
 
         # 获取model token
-        self.model_token = first.model.token
-        logger.debug(f'{self.queue} ai assistant token: {self.model_token}')
+        self.model_token = body.model.token
 
         self._set_task_status(self.task.id, 'running')
 
         # 设置task discussion topic
-        self.task_topic_id = self._task_discussion(
-            self.task.id,
-            'task',
-            self.task.title,
-            self.task.category,
-            None
-        )
-
-        for i in range(pointer, len(messages)):
-            # 获取当前内存使用量(Python版本的内存监控)
-            try:
-                import psutil
-                process = psutil.Process()
-                memory_info = process.memory_info()
-                logger.debug(
-                    f"memory usage: {memory_info.rss / 1024 / 1024:.2f} MB")
-            except ImportError:
-                logger.debug(
-                    "psutil not installed, skipping memory monitoring")
-
-            if job.is_stop():
-                logger.info(f"收到退出信号 pointer={i}")
-                return False
+        taskTopicKey = f'{self.redis_namespace}/message/{message_id}/topic'
+        if self.redis_clusters.exists(taskTopicKey):
+            # 获取上次的task topic id
+            self.task_topic_id = self.redis_clusters.get(taskTopicKey)
+        else:
+            self.task_topic_id = self._task_discussion(
+                self.task.id,
+                'task',
+                self.task.title,
+                self.task.category,
+                None
+            )
+        times = [self.maxProcessTime]
+        # breakpoint()
+        for i in range(pointer, len(body.payload)):
+            startAt = int(time.time())
 
             # 检测停止标记的工具函数需要实现
             # if Tools.is_stop():
             #     return False
 
-            self.redis_clusters.put(pointer_key, i)
-            message = messages[i]
+            message = body.payload[i]
             task_discussion_content = []
 
             # 推理
-            try:
-                response_llm = self._request_llm(message)
-                task_discussion_content.append('- LLM request successful')
-            except RequestException as e:
-                raise e
+
+            response_llm = self._request_llm(message)
+            task_discussion_content.append('- LLM request successful')
 
             if self.task.category == 'translate':
                 # 写入句子库
                 message.sentence.content = response_llm['content']
-                try:
-                    self._save_sentence(message.sentence)
-                except Exception as e:
-                    logger.error(f'sentence error: {e}')
-                    continue
+                self._save_sentence(message.sentence)
 
             if self.task.category == 'suggest':
                 # 写入pr
-                try:
-                    self._save_pr(message.sentence, response_llm['content'])
-                except Exception as e:
-                    logger.error(f'sentence error: {e}')
-                    continue
+                self._save_pr(message.sentence, response_llm['content'])
 
             # 获取句子id
             s_uid = self._get_sentence_id(message.sentence)
 
             # 写入句子 discussion
-            topic_id = self._task_discussion(
-                s_uid,
-                'sentence',
-                self.task.title,
-                self.task.category,
-                None
-            )
-
-            if topic_id:
-                logger.info(f'{self.queue} discussion create topic successful')
-                topic_children = []
-                # 提示词
-                topic_children.append(message.prompt)
-                # 任务结果
-                topic_children.append(response_llm['content'])
-                # 推理过程写入discussion
-                if response_llm.get('reasoningContent'):
-                    topic_children.append(response_llm['reasoningContent'])
-
-                for content in topic_children:
-                    logger.debug(f'{self.queue} discussion child request')
-                    d_id = self._task_discussion(
-                        s_uid, 'sentence', self.task.title, content, topic_id)
-                    if d_id:
-                        logger.info(
-                            f'{self.queue} discussion child successful')
-            else:
-                logger.error(
-                    f'{self.queue} discussion create topic response is null')
+            topic_children = []
+            # 提示词
+            topic_children.append(message.prompt)
+            # 任务结果
+            topic_children.append(response_llm['content'])
+            # 推理过程写入discussion
+            if response_llm.get('reasoningContent'):
+                topic_children.append(response_llm['reasoningContent'])
+            self._sentence_discussion(s_uid, topic_children)
 
             # 修改task 完成度
             progress = self._set_task_progress(
-                TaskProgress(i + 1, len(messages)))
+                TaskProgress(i + 1, len(body.payload)))
             task_discussion_content.append(f"- progress={progress}")
 
             # 写入task discussion
@@ -258,19 +205,53 @@ class AiTranslateService:
             else:
                 logger.error('no task discussion root')
 
+            if i + 1 < len(body.payload):
+                self.redis_clusters.set(pointer_key, i+1)
+                # 计算本次时间和剩余时间
+                # breakpoint()
+                onceTime = int(time.time())-startAt
+                times.append(onceTime)
+                times.sort(reverse=True)
+                # 取出第一个元素
+                maxTime = times[0]
+                # 计算剩余时间
+                remain = self.customer_timeout-(int(time.time())-taskStartAt)
+                if remain < maxTime:
+                    # 时间不足
+                    raise SectionTimeout
         # 任务完成 修改任务状态为 done
-        if i + 1 == len(messages):
-            self._set_task_status(self.task.id, 'done')
-
-        self.redis_clusters.forget(pointer_key)
+        self._set_task_status(self.task.id, 'done')
+        self.redis_clusters.delete(pointer_key)
         logger.info('ai translate task complete')
         return True
 
+    def _sentence_discussion(self, id, discussions):
+        topic_id = self._task_discussion(
+            id,
+            'sentence',
+            self.task.title,
+            self.task.category,
+            None
+        )
+
+        if topic_id:
+            logger.info(f'{self.queue} discussion create topic successful')
+
+            for content in discussions:
+                logger.debug(f'{self.queue} discussion child request')
+                d_id = self._task_discussion(
+                    id, 'sentence', self.task.title, content, topic_id)
+                if d_id:
+                    logger.info(
+                        f'{self.queue} discussion child successful')
+        else:
+            logger.error(
+                f'{self.queue} discussion create topic response is null')
+
     def _set_task_status(self, task_id: str, status: str):
         """设置任务状态"""
-        url = f"{self.app_url}/api/v2/task-status/{task_id}"
+        url = f"{self.api_url}/v2/task-status/{task_id}"
         data = {'status': status}
-
         logger.debug(f'ai_translate task status request: {url}, data: {data}')
 
         headers = {'Authorization': f'Bearer {self.model_token}'}
@@ -280,16 +261,16 @@ class AiTranslateService:
         if not response.ok:
             logger.error(f'ai_translate task status error: {response.json()}')
         else:
-            logger.info('ai_translate task status done')
+            logger.info(f'ai_translate task status successful ({status})')
 
     def _save_model_log(self, token: str, data: Dict[str, Any]) -> bool:
         """保存模型日志"""
-        url = f"{self.app_url}/api/v2/model-log"
+        url = f"{self.api_url}/v2/model-log"
 
         headers = {'Authorization': f'Bearer {token}'}
         response = requests.post(
             url, json=data, headers=headers, timeout=self.api_timeout)
-
+        # breakpoint()
         if not response.ok:
             logger.error(
                 f'ai-translate model log create failed: {response.json()}')
@@ -298,7 +279,7 @@ class AiTranslateService:
 
     def _task_discussion(self, res_id: str, res_type: str, title: str, content: str, parent_id: Optional[str] = None):
         """创建任务讨论"""
-        url = f"{self.app_url}/api/v2/discussion"
+        url = f"{self.api_url}/v2/discussion"
 
         task_discussion_data = {
             'res_id': res_id,
@@ -377,15 +358,21 @@ class AiTranslateService:
                 logger.info(f'{self.queue} LLM request successful')
 
                 model_log_data.update({
+                    'request_headers': json.dumps(dict(response.request.headers), ensure_ascii=False),
                     'response_headers': json.dumps(dict(response.headers), ensure_ascii=False),
                     'status': response.status_code,
                     'response_data': json.dumps(response.json(), ensure_ascii=False),
                     'success': True
                 })
-                self._save_model_log(self.model_token, model_log_data)
                 break
-
             except requests.exceptions.RequestException as e:
+                model_log_data.update({
+                    'response_headers': json.dumps(dict(e.response.request.headers), ensure_ascii=False),
+                    'response_headers': json.dumps(dict(e.response.headers), ensure_ascii=False),
+                    'status': e.response.status_code,
+                    'response_data': json.dumps(e.response.json(), ensure_ascii=False),
+                    'success': False
+                })
                 attempt += 1
                 status = getattr(e.response, 'status_code',
                                  0) if hasattr(e, 'response') else 0
@@ -403,8 +390,14 @@ class AiTranslateService:
                 else:
                     logger.error("达到最大重试次数,请求最终失败")
                     raise e
-
-        logger.info(f'{self.queue} model log saved')
+            except Exception as e:
+                raise e
+            finally:
+                try:
+                    self._save_model_log(self.model_token, model_log_data)
+                    logger.info(f'{self.queue} model log saved')
+                except Exception as e:
+                    logger.error(e)
 
         ai_data = response.json()
         logger.debug(f'{self.queue} LLM http response: {response.json()}')
@@ -426,7 +419,7 @@ class AiTranslateService:
 
     def _save_sentence(self, sentence: Sentence):
         """写入句子库"""
-        url = f"{self.app_url}/api/v2/sentence"
+        url = f"{self.api_url}/v2/sentence"
 
         logger.info(f"{self.queue} sentence update {url}")
 
@@ -446,7 +439,7 @@ class AiTranslateService:
 
     def _save_pr(self, sentence: Sentence, content: str):
         """保存PR"""
-        url = f"{self.app_url}/api/v2/sentpr"
+        url = f"{self.api_url}/v2/sentpr"
         logger.info(f"{self.queue} sentence update {url}")
 
         data = {
@@ -477,7 +470,7 @@ class AiTranslateService:
 
     def _get_sentence_id(self, sentence: Sentence) -> str:
         """获取句子ID"""
-        url = f"{self.app_url}/api/v2/sentence-info/aa"
+        url = f"{self.api_url}/v2/sentence-info/aa"
         logger.info(f'ai translate: {url}')
 
         params = {
@@ -509,7 +502,7 @@ class AiTranslateService:
             logger.error(
                 f'{self.queue} progress total is zero, task_id: {self.task.id}')
 
-        url = f"{self.app_url}/api/v2/task/{self.task.id}"
+        url = f"{self.api_url}/v2/task/{self.task.id}"
         data = {'progress': progress}
 
         logger.debug(

+ 182 - 0
ai-translate/ai_translate/decode_dataclass.py

@@ -0,0 +1,182 @@
+import json
+import dataclasses
+from dataclasses import dataclass, fields, is_dataclass
+from typing import Type, get_type_hints, get_origin, get_args, Union, List, Dict, Any, Optional
+from types import SimpleNamespace
+# 方法1: 使用递归反射的通用解码器
+
+
+def decode_dataclass(cls, data):
+    """通用的dataclass解码器,支持嵌套结构"""
+    if not is_dataclass(cls):
+        return data
+
+    if not isinstance(data, dict):
+        raise ValueError(f"Expected dict for {cls.__name__}, got {type(data)}")
+
+    # 获取类型提示
+    type_hints = get_type_hints(cls)
+    field_values = {}
+
+    for field in fields(cls):
+        field_name = field.name
+        field_type = type_hints.get(field_name, field.type)
+
+        if field_name not in data:
+            if field.default != dataclasses.MISSING:
+                field_values[field_name] = field.default
+            elif field.default_factory != dataclasses.MISSING:
+                field_values[field_name] = field.default_factory()
+            else:
+                raise ValueError(f"Missing required field: {field_name}")
+            continue
+
+        field_value = data[field_name]
+        field_values[field_name] = _decode_field_value(field_type, field_value)
+    output = cls(**field_values)
+    breakpoint()
+    return output
+
+
+def _decode_field_value(field_type, value):
+    """解码单个字段值"""
+    # 处理None值
+    if value is None:
+        return None
+
+    # 获取类型的origin(如List, Dict等)
+    origin = get_origin(field_type)
+    args = get_args(field_type)
+
+    # 处理Optional类型 (Union[T, None])
+    if origin is Union:
+        # 检查是否是Optional类型
+        if len(args) == 2 and type(None) in args:
+            non_none_type = args[0] if args[1] is type(None) else args[1]
+            return _decode_field_value(non_none_type, value)
+        else:
+            # 其他Union类型,尝试第一个类型
+            return _decode_field_value(args[0], value)
+
+    # 处理List类型
+    if origin is list or origin is List:
+        if not isinstance(value, list):
+            raise ValueError(f"Expected list, got {type(value)}")
+        element_type = args[0] if args else Any
+        return [_decode_field_value(element_type, item) for item in value]
+
+    # 处理Dict类型
+    if origin is dict or origin is Dict:
+        if not isinstance(value, dict):
+            raise ValueError(f"Expected dict, got {type(value)}")
+        value_type = args[1] if len(args) > 1 else Any
+        return {k: _decode_field_value(value_type, v) for k, v in value.items()}
+
+    # 处理dataclass类型
+    if is_dataclass(field_type):
+        return decode_dataclass(field_type, value)
+
+    # 基础类型直接返回
+    return value
+
+# 方法4: 专门处理对象数组的函数
+
+
+def decode_dataclass_list(cls, data_list):
+    """将JSON对象数组解码为dataclass列表"""
+    if not isinstance(data_list, list):
+        raise ValueError(f"Expected list, got {type(data_list)}")
+
+    return [decode_dataclass(cls, item) for item in data_list]
+
+
+# 方法5: 扩展通用解码器支持顶层列表
+def decode_json_to_type(target_type, json_data):
+    """更通用的JSON解码器,支持顶层数组"""
+    origin = get_origin(target_type)
+    args = get_args(target_type)
+
+    # 处理List[SomeDataClass]类型
+    if origin is list or origin is List:
+        if not isinstance(json_data, list):
+            raise ValueError(
+                f"Expected list for {target_type}, got {type(json_data)}")
+
+        element_type = args[0] if args else Any
+        return [decode_json_to_type(element_type, item) for item in json_data]
+
+    # 处理单个dataclass
+    if is_dataclass(target_type):
+        return decode_dataclass(target_type, json_data)
+
+    # 其他类型直接返回
+    return json_data
+
+
+def ns_to_dataclass(ns: Any, dataclass_type: Type[Any], field_mapping: Dict[str, str] = None) -> Any:
+    """
+    将 SimpleNamespace 对象转换为 dataclass 实例,支持嵌套结构和列表。
+
+    Args:
+        ns: 要转换的 SimpleNamespace 对象或其他值。
+        dataclass_type: 目标 dataclass 类型。
+        field_mapping: 可选的字段映射字典,键为 SimpleNamespace 属性名,值为 dataclass 字段名。
+
+    Returns:
+        转换后的 dataclass 实例或其他原始值。
+    """
+    if field_mapping is None:
+        field_mapping = {}
+
+    # 如果不是 SimpleNamespace,直接返回原始值
+    if not isinstance(ns, SimpleNamespace):
+        return ns
+
+    # 获取 dataclass 的字段信息
+    if not is_dataclass(dataclass_type) and not isinstance(dataclass_type, type):
+        raise ValueError(f"{dataclass_type} 不是有效的 dataclass 类型")
+
+    dc_fields = {f.name: f.type for f in fields(
+        dataclass_type)} if is_dataclass(dataclass_type) else {}
+
+    # 将 SimpleNamespace 转为字典
+    ns_dict = vars(ns)
+    result_dict = {}
+
+    # 遍历 SimpleNamespace 的属性
+    for ns_key, value in ns_dict.items():
+        # 应用字段映射(如果有)
+        dc_key = field_mapping.get(ns_key, ns_key)
+
+        if dc_key not in dc_fields:
+            continue  # 忽略 dataclass 中不存在的字段
+
+        # 获取 dataclass 字段的类型
+        field_type = dc_fields.get(dc_key)
+
+        # 处理嵌套的 SimpleNamespace
+        if isinstance(value, SimpleNamespace):
+            result_dict[dc_key] = ns_to_dataclass(
+                value, field_type, field_mapping)
+
+        # 处理列表
+        elif isinstance(value, list) and field_type:
+            origin_type = get_origin(field_type)
+            if origin_type is list:
+                item_type = get_args(field_type)[
+                    0] if get_args(field_type) else Any
+                result_dict[dc_key] = [
+                    ns_to_dataclass(item, item_type, field_mapping)
+                    if isinstance(item, SimpleNamespace)
+                    else item
+                    for item in value
+                ]
+            else:
+                result_dict[dc_key] = value
+
+        # 直接赋值其他类型
+        else:
+            result_dict[dc_key] = value
+
+    # 创建 dataclass 实例
+    return dataclass_type(**result_dict)

+ 43 - 2
ai-translate/ai_translate/worker.py

@@ -1,7 +1,48 @@
 import logging
+from .ai_translate import AiTranslateService, SectionTimeout, Message, Sentence, AiModel, Task, TaskProgress
+from .decode_dataclass import ns_to_dataclass, decode_dataclass, decode_json_to_type, decode_dataclass_list
+from typing import List
 
 logger = logging.getLogger(__name__)
 
 
-def handle_message(context, id, content_type, body):
-    logger.info("TODO: --- using redis namespace(%s) ---", context[1])
+class TaskFailException(Exception):
+    def __init__(self, message="task fail"):
+        self.message = message
+        super().__init__(self.message)
+
+
+def handle_message(redis, ch, method, id, content_type, body, api_url, customer_timeout):
+    maxRetry = 3
+    try:
+        logger.info("process message start (%s) messages", len(body.payload))
+        consumer = AiTranslateService(
+            redis, ch, method, api_url, customer_timeout)
+        messages = ns_to_dataclass([body], Message)
+        consumer.process_translate(id, messages[0])
+        ch.basic_ack(delivery_tag=method.delivery_tag)  # 确认消息
+    except SectionTimeout as e:
+        # 时间到了,活还没干完 NACK 并重新入队
+        ch.basic_nack(delivery_tag=method.delivery_tag,
+                      requeue=True)
+        pass
+    except Exception as e:
+        # retry
+        retryKey = f'{redis[1]}/message/retry/{id}'
+        retry = 0
+        if redis[0].exists(retryKey):
+            retry = redis[0].get(retryKey)
+        if retry > maxRetry:
+            logger.error(f'超过最大重试次数[{maxRetry}],任务失败')
+            # NACK 丢弃或者进入死信队列
+            ch.basic_nack(delivery_tag=method.delivery_tag,
+                          requeue=False)
+            raise TaskFailException
+        retry = retry+1
+        redis[0].set(retryKey, retry)
+        # NACK 并重新入队
+        logger.warning(f'消息处理错误,重新压入队列 [{retry}/{maxRetry}]')
+        ch.basic_nack(delivery_tag=method.delivery_tag,
+                      requeue=True)
+        logger.error(f"error: {e}")
+        logger.exception("发生异常")

+ 0 - 50
ai-translate/ai_translate/worker2.py

@@ -1,50 +0,0 @@
-import logging
-import ai_translate
-import os
-import sys
-import logging
-from typing import List
-from ai_translate import (
-    AiTranslateService,
-    JobInterface
-)
-
-logger = logging.getLogger(__name__)
-
-
-class MockJob(JobInterface):
-    """模拟作业类,用于测试"""
-
-    def __init__(self):
-        self._stop = False
-
-    def is_stop(self) -> bool:
-        """检查作业是否停止"""
-        return self._stop
-
-    def stop(self):
-        """停止作业"""
-        self._stop = True
-
-
-def handle_message(message_id, content_type, body):
-    logger.info(f"process message start len:{len(body)}")
-    try:
-        # 创建服务实例
-        service = AiTranslateService(app_url="http://localhost:8000")
-
-        # 创建作业实例
-        job = MockJob()
-
-        # 处理翻译任务
-        result = service.process_translate(message_id, body, job)
-
-        if result:
-            logger.info("翻译任务完成成功!")
-        else:
-            logger.error("翻译任务失败!")
-
-        return result
-
-    except Exception as e:
-        logger.error(f"测试过程中发生错误: {str(e)}")

+ 1 - 0
api-v12/.gitignore

@@ -25,3 +25,4 @@ Thumbs.db
 
 composer.lock
 package-lock.json
+yarn.lock

+ 7 - 5
api-v8/app/Http/Controllers/TaskStatusController.php

@@ -188,20 +188,22 @@ class TaskStatusController extends Controller
                 ->select('assignee_id')->get();
             $aiAssistant = AiModel::whereIn('uid', $taskAssignee)->first();
             if ($aiAssistant) {
+                $aiTask = Task::find($taskId);
                 try {
-                    $ai = app(AiTranslateService::class);
-                    $params = $ai->makeByTask($taskId, $aiAssistant->uid, true);
-                    //\App\Jobs\AiTranslate::dispatch(['message_id' => Str::uuid(), 'payload' => $params]);
-                    $aiTask = Task::find($taskId);
+                    //$ai = app(AiTranslateService::class);
+                    //$params = $ai->makeByTask($taskId, $aiAssistant->uid, true);
+                    \App\Jobs\ProcessAITranslateJob::publish($taskId, $aiAssistant->uid);
                     $aiTask->executor_id = $aiAssistant->uid;
                     $aiTask->status = 'queue';
-                    $aiTask->save();
                     $this->pushChange('queue', $taskId);
                 } catch (\Exception $e) {
+                    $aiTask->status = 'pending';
                     Log::error('ai assistant start fail', [
                         'task' => $taskId,
                         'error' => $e->getMessage()
                     ]);
+                } finally {
+                    $aiTask->save();
                 }
             }
         }

+ 16 - 1
api-v8/app/Jobs/ProcessAITranslateJob.php

@@ -3,11 +3,13 @@
 namespace App\Jobs;
 
 use App\Services\AiTranslateService;
+use App\Services\RabbitMQService;
 use Illuminate\Support\Facades\Log;
 
 class ProcessAITranslateJob extends BaseRabbitMQJob
 {
     private $aiService;
+
     protected function processMessage(array $messageData)
     {
         $startTime = microtime(true);
@@ -39,5 +41,18 @@ class ProcessAITranslateJob extends BaseRabbitMQJob
         $this->aiService->stop();
     }
 
-    public static function publish(AiTranslateService $ai) {}
+    public static function publish(string $taskId, $aiAssistantId)
+    {
+        $us = ['openai.com', 'googleapis.com', 'x.ai', 'anthropic.com'];
+        $data = AiTranslateService::makeByTask($taskId, $aiAssistantId);
+        $mq = app(RabbitMQService::class);
+        $queue = 'ai_translate_cn';
+        $found = array_filter($us, function ($value) use ($data) {
+            return str_contains($data['model']['url'], $value);
+        });
+        if (count($found) > 0) {
+            $queue = 'ai_translate_us';
+        }
+        $mq->publishMessage($queue, $data);
+    }
 }

+ 7 - 6
api-v8/app/Services/AiTranslateService.php

@@ -483,7 +483,7 @@ class AiTranslateService
      * @param  string  $taskId 任务uuid
      * @return array 拆解后的提示词数组
      */
-    public function makeByTask(string $taskId, $aiAssistantId, bool $send = true)
+    public static function makeByTask(string $taskId, $aiAssistantId)
     {
         $task = Task::findOrFail($taskId);
         $description = $task->description;
@@ -637,11 +637,12 @@ class AiTranslateService
             ];
             array_push($mqData, $aiMqData);
         }
-        if ($send) {
-            $mq = app(RabbitMQService::class);
-            $mq->publishMessage('ai_translate', $mqData);
-        }
-        return $mqData;
+        $output = [
+            'model' => $aiModel->toArray(),
+            'task' => $task,
+        ];
+        $output['payload'] = $mqData;
+        return $output;
     }
     public function stop()
     {

+ 45 - 30
api-v8/app/Services/RabbitMQService.php

@@ -47,7 +47,13 @@ class RabbitMQService
     {
         $queueConfig = config("mint.rabbitmq.queues.{$queueName}");
 
-
+        $workerArgs = [];
+        if (isset($queueConfig['ttl'])) {
+            $workerArgs['x-message-ttl'] =  $queueConfig['ttl'];
+        }
+        if (isset($queueConfig['max_length'])) {
+            $workerArgs['x-max-length'] =  $queueConfig['max_length'];
+        }
 
         // 创建死信交换机
         if (isset($queueConfig['dead_letter_exchange'])) {
@@ -87,59 +93,68 @@ class RabbitMQService
                 $queueConfig['dead_letter_exchange']
             );
 
-            // 创建主队列,配置死信
-            $arguments = new AMQPTable([
-                'x-dead-letter-exchange' => $queueConfig['dead_letter_exchange'],
-                'x-dead-letter-routing-key' => $queueConfig['dead_letter_queue'], // 死信路由键
-            ]);
-        } else {
-            $workerArgs = [];
-            if (isset($queueConfig['ttl'])) {
-                $workerArgs['x-message-ttl'] =  $queueConfig['ttl'];
-            }
-            if (isset($queueConfig['max_length'])) {
-                $workerArgs['x-max-length'] =  $queueConfig['max_length'];
-            }
-            $arguments = new AMQPTable($workerArgs);
+            // 主队列,配置死信
+            $workerArgs['x-dead-letter-exchange'] =  $queueConfig['dead_letter_exchange'];
+            $workerArgs['x-dead-letter-routing-key'] =  $queueConfig['dead_letter_queue'];
         }
+        $arguments = new AMQPTable($workerArgs);
 
-
-        $this->channel->queue_declare(
-            $queueName,
-            false,  // passive
-            true,   // durable
-            false,  // exclusive
-            false,  // auto_delete
-            false,  // nowait
-            $arguments
-        );
+        // 检查队列是否存在
+        try {
+            $this->channel->queue_declare(
+                $queueName,
+                true, // passive
+                true,   // durable
+                false, // exclusive
+                false, // auto_delete
+                false,  // nowait
+                $arguments
+            );
+            //存在,不用创建
+        } catch (\Exception $e) {
+            if (strpos($e->getMessage(), 'NOT_FOUND') !== false) {
+                Log::info("Queue $queueName does not exist.");
+                // 队列不存在,创建新队列
+                $this->channel->queue_declare(
+                    $queueName,
+                    false,  // passive
+                    true,   // durable
+                    false,  // exclusive
+                    false,  // auto_delete
+                    false,  // nowait
+                    $arguments
+                );
+            } else {
+                Log::error("Error checking queue $queueName: {$e->getMessage()}");
+                throw $e;
+            }
+        }
     }
 
     public function publishMessage(string $queueName, array $data): bool
     {
         try {
             $this->setupQueue($queueName);
-
+            $msgId = Str::uuid();
             $message = new AMQPMessage(
                 json_encode($data, JSON_UNESCAPED_UNICODE),
                 [
                     'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                     'timestamp' => time(),
-                    'message_id' => Str::uuid(),
+                    'message_id' => $msgId,
                     "content_type" => 'application/json; charset=utf-8'
                 ]
             );
 
             $this->channel->basic_publish($message, '', $queueName);
 
-            Log::info("Message published to queue: {$queueName}", $data);
+            Log::info("Message published to queue: {$queueName} msg id={$msgId}");
             return true;
         } catch (\Exception $e) {
             Log::error("Failed to publish message to queue: {$queueName}", [
                 'error' => $e->getMessage(),
-                'data' => $data
             ]);
-            return false;
+            throw $e;
         }
     }
 

+ 8 - 1
api-v8/config/mint.php

@@ -126,7 +126,14 @@ return [
     ],
     'rabbitmq' => [
         'queues' => [
-            'ai_translate' => [
+            'ai_translate_cn' => [
+                'retry_times' => env('RABBITMQ_AI_RETRY_TIMES', 3),
+                'max_loop_count' => env('RABBITMQ_AI_MAX_LOOP', 10),
+                'timeout' => env('RABBITMQ_AI_TIMEOUT', 300),
+                'dead_letter_queue' => 'ai_translate_dlq',
+                'dead_letter_exchange' => 'ai_translate_dlx',
+            ],
+            'ai_translate_us' => [
                 'retry_times' => env('RABBITMQ_AI_RETRY_TIMES', 3),
                 'max_loop_count' => env('RABBITMQ_AI_MAX_LOOP', 10),
                 'timeout' => env('RABBITMQ_AI_TIMEOUT', 300),