service.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. import json
  2. import time
  3. import logging
  4. from typing import List, Dict, Any, Optional
  5. from datetime import datetime
  6. from dataclasses import dataclass
  7. import time
  8. import requests
  9. from .utils import is_stopped
  10. logger = logging.getLogger(__name__)
  11. class DatabaseException(Exception):
  12. def __init__(self, message="database api access exception"):
  13. self.message = message
  14. super().__init__(self.message)
  15. class SectionTimeout(Exception):
  16. def __init__(self, message="片段超时"):
  17. self.message = message
  18. super().__init__(self.message)
  19. class TaskFailException(Exception):
  20. def __init__(self, message="task fail"):
  21. self.message = message
  22. super().__init__(self.message)
  23. class LLMFailException(Exception):
  24. def __init__(self, message="LLM request fail"):
  25. self.message = message
  26. super().__init__(self.message)
  27. @dataclass
  28. class TaskProgress:
  29. """任务进度"""
  30. current: int
  31. total: int
  32. @dataclass
  33. class TaskInfo:
  34. """任务模型"""
  35. id: str
  36. title: str
  37. category: str
  38. description: str
  39. status: str
  40. @dataclass
  41. class Task:
  42. """任务模型"""
  43. info: TaskInfo
  44. progress: TaskProgress
  45. @dataclass
  46. class AiModel:
  47. """AI模型配置"""
  48. uid: str
  49. model: str
  50. url: str
  51. key: str
  52. token: str
  53. system_prompt: Optional[str] = None
  54. @dataclass
  55. class Sentence:
  56. """句子模型"""
  57. book_id: str
  58. paragraph: int
  59. word_start: int
  60. word_end: int
  61. channel_uid: str
  62. content: str
  63. content_type: str = 'markdown'
  64. access_token: Optional[str] = None
  65. @dataclass
  66. class Payload:
  67. """消息模型"""
  68. model: AiModel
  69. task: Task
  70. prompt: str
  71. sentence: Sentence
  72. @dataclass
  73. class Message:
  74. """消息模型"""
  75. model: AiModel
  76. task: Task
  77. payload: List[Payload]
  78. sentence: Sentence
  79. class AiTranslateService:
  80. """AI翻译服务"""
  81. def __init__(self, redis, ch, method, api_url, openai_proxy, customer_timeout, worker_name: str):
  82. self.queue = 'ai_translate'
  83. self.model_token = None
  84. self.task = None
  85. self.redis_clusters = redis[0]
  86. self.redis_namespace = redis[1]
  87. self.api_timeout = 100
  88. self.llm_timeout = 300
  89. self.task_topic_id = None
  90. self.api_url = api_url
  91. self.customer_timeout = customer_timeout
  92. self.channel = ch
  93. self.maxProcessTime = 15 * 60 # 一个句子的最大处理时间
  94. self.openai_proxy = openai_proxy
  95. self.worker_name = worker_name
  96. def process_translate(self, message_id: str, body: Message) -> bool:
  97. """处理翻译任务"""
  98. is_stopped()
  99. taskStartAt = int(time.time())
  100. self.task = body.task
  101. self.redis_clusters.set(
  102. f"{self.redis_namespace}/task/{self.task.id}/message_id", message_id)
  103. pointer_key = f"{self.redis_namespace}/task/{self.task.id}/pointer"
  104. pointer = 0
  105. if self.redis_clusters.exists(pointer_key):
  106. # 回到上次中断的点
  107. pointer = int(self.redis_clusters.get(pointer_key))
  108. logger.info(f"last break point {pointer}")
  109. if pointer >= len(body.payload):
  110. self.redis_clusters.delete(pointer_key)
  111. return True
  112. # 获取model token
  113. self.model_token = body.model.token
  114. self._set_task_status(self.task.id, 'running')
  115. # 设置task discussion topic
  116. taskTopicKey = f'{self.redis_namespace}/message/{message_id}/topic'
  117. if self.redis_clusters.exists(taskTopicKey):
  118. # 获取上次的task topic id
  119. self.task_topic_id = self.redis_clusters.get(taskTopicKey)
  120. else:
  121. self.task_topic_id = self._task_discussion(
  122. self.task.id,
  123. 'task',
  124. self.task.title,
  125. f'id:{message_id} worker:{self.worker_name}',
  126. None
  127. )
  128. times = [self.maxProcessTime]
  129. # breakpoint()
  130. for i in range(pointer, len(body.payload)):
  131. is_stopped()
  132. startAt = int(time.time())
  133. message = body.payload[i]
  134. task_discussion_content = []
  135. # 推理
  136. response_llm = self._request_llm(message)
  137. task_discussion_content.append('- LLM request successful')
  138. if self.task.category == 'translate':
  139. # 写入句子库
  140. message.sentence.content = response_llm['content']
  141. self._save_sentence(message.sentence)
  142. if self.task.category == 'suggest':
  143. # 写入pr
  144. self._save_pr(message.sentence, response_llm['content'])
  145. # 获取句子id
  146. s_uid = self._get_sentence_id(message.sentence)
  147. # 写入句子 discussion
  148. if s_uid:
  149. topic_children = []
  150. # 任务结果
  151. topic_children.append(response_llm['content'])
  152. # 推理过程写入discussion
  153. if response_llm.get('reasoningContent'):
  154. topic_children.append(response_llm['reasoningContent'])
  155. self._sentence_discussion(
  156. s_uid, message.prompt, topic_children)
  157. # 修改task 完成度
  158. progress = self._set_task_progress(
  159. TaskProgress(i + 1, len(body.payload)))
  160. task_discussion_content.append(f"- progress={progress}")
  161. # 写入task discussion
  162. if self.task_topic_id:
  163. content = '\n'.join(task_discussion_content)
  164. d_id = self._task_discussion(
  165. self.task.id,
  166. 'task',
  167. self.task.title,
  168. content,
  169. self.task_topic_id
  170. )
  171. else:
  172. logger.error('no task discussion root')
  173. # 任务完成 修改任务状态为 done
  174. self._set_task_status(self.task.id, 'done')
  175. self.redis_clusters.delete(pointer_key)
  176. logger.info('ai translate task complete')
  177. return True
  178. def _sentence_discussion(self, id, prompt, discussions):
  179. topic_id = self._task_discussion(
  180. id,
  181. 'sentence',
  182. self.task.title,
  183. prompt,
  184. None
  185. )
  186. if topic_id:
  187. logger.info(f'{self.queue} discussion create topic successful')
  188. for content in discussions:
  189. logger.debug(f'{self.queue} discussion child request')
  190. d_id = self._task_discussion(
  191. id, 'sentence', self.task.title, content, topic_id)
  192. if d_id:
  193. logger.info(
  194. f'{self.queue} discussion child successful')
  195. else:
  196. logger.error(
  197. f'{self.queue} discussion create topic response is null')
  198. def _set_task_status(self, task_id: str, status: str):
  199. """设置任务状态"""
  200. url = f"{self.api_url}/v2/task-status/{task_id}"
  201. data = {'status': status}
  202. logger.debug(f'ai_translate task status request: {url}, data: {data}')
  203. headers = {'Authorization': f'Bearer {self.model_token}'}
  204. response = requests.patch(
  205. url, json=data, headers=headers, timeout=self.api_timeout)
  206. if response.ok:
  207. logger.info(f'ai_translate task status successful ({status})')
  208. else:
  209. logger.error(
  210. f'ai_translate task status update fail. response: {response.text}')
  211. def _save_model_log(self, token: str, data: Dict[str, Any]) -> bool:
  212. """保存模型日志"""
  213. url = f"{self.api_url}/v2/model-log"
  214. headers = {'Authorization': f'Bearer {token}'}
  215. response = requests.post(
  216. url, json=data, headers=headers, timeout=self.api_timeout)
  217. if not response.ok:
  218. logger.error(
  219. f'ai-translate model log create failed: {response.text}')
  220. return False
  221. return True
  222. def _task_discussion(self, res_id: str, res_type: str, title: str, content: str, parent_id: Optional[str] = None):
  223. """创建任务讨论"""
  224. url = f"{self.api_url}/v2/discussion"
  225. task_discussion_data = {
  226. 'res_id': res_id,
  227. 'res_type': res_type,
  228. 'content': content,
  229. 'content_type': 'markdown',
  230. 'type': 'discussion',
  231. 'notification': False,
  232. }
  233. if parent_id:
  234. task_discussion_data['parent'] = parent_id
  235. else:
  236. task_discussion_data['title'] = title
  237. logger.info(f'{self.queue} discussion create: {url},')
  238. headers = {'Authorization': f'Bearer {self.model_token}'}
  239. response = requests.post(
  240. url, json=task_discussion_data, headers=headers, timeout=self.api_timeout)
  241. if not response.ok:
  242. logger.error(
  243. f'{self.queue} discussion create error: {response.json()}')
  244. return False
  245. # logger.debug(
  246. # f'{self.queue} discussion create: {json.dumps(response.json())}')
  247. response_data = response.json()
  248. if response_data.get('data', {}).get('id'):
  249. return response_data['data']['id']
  250. return False
  251. def _request_llm(self, message: Message) -> Dict[str, Any]:
  252. """请求LLM"""
  253. param = {
  254. "model": message.model.model,
  255. "messages": [
  256. {"role": "system", "content": message.model.system_prompt or ''},
  257. {"role": "user", "content": message.prompt},
  258. ],
  259. "temperature": 0.7,
  260. "stream": False
  261. }
  262. logger.info(
  263. f'{self.queue} LLM request {message.model.url} model: {param["model"]}')
  264. # logger.debug(
  265. # f'{self.queue} LLM api request: {message.model.url}, data: {json.dumps(param)}')
  266. # 写入 model log
  267. model_log_data = {
  268. 'model_id': message.model.uid,
  269. 'request_at': datetime.now().isoformat(),
  270. 'request_data': json.dumps(param, ensure_ascii=False),
  271. }
  272. # 失败重试
  273. max_retries = 3
  274. attempt = 0
  275. headers = {'Authorization': f'Bearer {message.model.key}'}
  276. while attempt < max_retries:
  277. try:
  278. if self.openai_proxy:
  279. response = requests.post(
  280. self.openai_proxy,
  281. json={
  282. "open_ai_url": message.model.url,
  283. "api_key": message.model.key,
  284. 'payload': param,
  285. },
  286. headers=headers,
  287. timeout=self.llm_timeout
  288. )
  289. else:
  290. response = requests.post(
  291. message.model.url,
  292. json=param,
  293. headers=headers,
  294. timeout=self.llm_timeout
  295. )
  296. response.raise_for_status()
  297. logger.info(f'{self.queue} LLM request successful')
  298. model_log_data.update({
  299. 'request_headers': json.dumps(dict(response.request.headers), ensure_ascii=False),
  300. 'response_headers': json.dumps(dict(response.headers), ensure_ascii=False),
  301. 'status': response.status_code,
  302. 'response_data': json.dumps(response.json(), ensure_ascii=False),
  303. 'success': True
  304. })
  305. break
  306. except requests.exceptions.RequestException as e:
  307. model_log_data.update({
  308. 'request_headers': json.dumps(dict(e.response.request.headers), ensure_ascii=False),
  309. 'response_headers': json.dumps(dict(e.response.headers), ensure_ascii=False),
  310. 'status': e.response.status_code,
  311. 'response_data': e.response.text,
  312. 'success': False
  313. })
  314. attempt += 1
  315. status = getattr(e.response, 'status_code',
  316. 0) if hasattr(e, 'response') else 0
  317. # 某些错误不需要重试
  318. if status in [400, 401, 403, 404, 422]:
  319. logger.warning(f"客户端错误,不重试: {status}")
  320. self._save_model_log(self.model_token, model_log_data)
  321. raise LLMFailException
  322. # 服务器错误或网络错误可以重试
  323. if attempt < max_retries:
  324. delay = 2 ** attempt # 指数退避
  325. logger.warning(f"请求失败(第 {attempt} 次),{delay} 秒后重试...")
  326. time.sleep(delay)
  327. else:
  328. logger.error("达到最大重试次数,请求最终失败")
  329. raise e
  330. except Exception as e:
  331. raise e
  332. finally:
  333. try:
  334. self._save_model_log(self.model_token, model_log_data)
  335. logger.info(f'{self.queue} model log saved')
  336. except Exception as e:
  337. logger.error(e)
  338. ai_data = response.json()
  339. response_content = ai_data['choices'][0]['message']['content']
  340. reasoning_content = ai_data['choices'][0]['message'].get(
  341. 'reasoning_content')
  342. output = {'content': response_content}
  343. logger.debug(f'{self.queue} LLM response content={response_content}')
  344. if not reasoning_content:
  345. logger.debug(f'{self.queue} no reasoningContent')
  346. else:
  347. logger.debug(f'{self.queue} reasoning={reasoning_content}')
  348. output['reasoningContent'] = reasoning_content
  349. return output
  350. def _save_sentence(self, sentence: Sentence):
  351. """写入句子库"""
  352. url = f"{self.api_url}/v2/sentence"
  353. logger.info(f"{self.queue} sentence update {url}")
  354. headers = {'Authorization': f'Bearer {self.model_token}'}
  355. data = {'sentences': [sentence.__dict__]}
  356. response = requests.post(
  357. url, json=data, headers=headers, timeout=self.api_timeout)
  358. if not response.ok:
  359. logger.error(
  360. f'{self.queue} sentence update failed: {url}, data: {response.json()}')
  361. raise DatabaseException("sentence 数据库写入错误")
  362. count = response.json()['data']['count']
  363. logger.info(f"{self.queue} sentence update {count} successful")
  364. def _save_pr(self, sentence: Sentence, content: str):
  365. """保存PR"""
  366. url = f"{self.api_url}/v2/sentpr"
  367. logger.info(f"{self.queue} sentence update {url}")
  368. data = {
  369. 'book': sentence.book_id,
  370. 'para': sentence.paragraph,
  371. 'begin': sentence.word_start,
  372. 'end': sentence.word_end,
  373. 'channel': sentence.channel_uid,
  374. 'text': content,
  375. 'notification': False,
  376. 'webhook': False,
  377. }
  378. headers = {'Authorization': f'Bearer {self.model_token}'}
  379. response = requests.post(
  380. url, json=data, headers=headers, timeout=self.api_timeout)
  381. if not response.ok:
  382. logger.error(
  383. f'{self.queue} sentence update failed: {url}, data: {response.json()}')
  384. raise DatabaseException("pr 数据库写入错误")
  385. if response.json().get('ok'):
  386. logger.info(f"{self.queue} sentence suggest update successful")
  387. else:
  388. logger.error(
  389. f"{self.queue} sentence suggest update failed: {url}, data: {response.json()}")
  390. def _get_sentence_id(self, sentence: Sentence) -> str:
  391. """获取句子ID"""
  392. try:
  393. url = f"{self.api_url}/v2/sentence-info/aa"
  394. logger.info(f'ai translate: {url}')
  395. params = {
  396. 'book': sentence.book_id,
  397. 'par': sentence.paragraph,
  398. 'start': sentence.word_start,
  399. 'end': sentence.word_end,
  400. 'channel': sentence.channel_uid
  401. }
  402. headers = {'Authorization': f'Bearer {self.model_token}'}
  403. response = requests.get(
  404. url, params=params, headers=headers, timeout=self.api_timeout)
  405. if not response.json().get('ok'):
  406. logger.error(
  407. f'{self.queue} sentence id error: {response.text}')
  408. return False
  409. s_uid = response.json()['data']['id']
  410. logger.debug(f"sentence id={s_uid}")
  411. return s_uid
  412. except Exception as e:
  413. logger.error(f"error: {e}")
  414. return False
  415. def _set_task_progress(self, current: TaskProgress) -> int:
  416. """设置任务进度"""
  417. if current.total > 0:
  418. progress = int(current.current * 100 / current.total)
  419. else:
  420. progress = 100
  421. logger.error(
  422. f'{self.queue} progress total is zero, task_id: {self.task.id}')
  423. url = f"{self.api_url}/v2/task/{self.task.id}"
  424. data = {'progress': progress}
  425. logger.debug(
  426. f'{self.queue} task progress request: {url}, data: {data}')
  427. headers = {'Authorization': f'Bearer {self.model_token}'}
  428. response = requests.patch(
  429. url, json=data, headers=headers, timeout=self.api_timeout)
  430. if not response.ok:
  431. logger.error(
  432. f'{self.queue} task progress error: {response.json()}')
  433. else:
  434. logger.info(
  435. f'{self.queue} task progress successful progress={response.json()["data"]["progress"]}')
  436. return progress
  437. def handle_failed(self, message_id: str, message: str, exception: Exception):
  438. """处理失败的翻译任务"""
  439. try:
  440. # 彻底失败时的业务逻辑
  441. # 设置task为失败状态
  442. self._set_task_status(self.task.id, 'stop')
  443. # 将故障信息写入task discussion
  444. if self.task_topic_id:
  445. error_message = f"**任务处理失败** 请重启任务 \n- message id={message_id} \n- 错误信息:{message} \n- 异常:{str(exception)}"
  446. d_id = self._task_discussion(
  447. self.task.id,
  448. 'task',
  449. '任务处理失败',
  450. error_message,
  451. self.task_topic_id
  452. )
  453. except Exception as e:
  454. logger.error(f'处理失败ai任务时出错: {str(e)}')
  455. def handle_retry(self, message_id: str, message: str, exception: Exception):
  456. """处理失败 需要重试"""
  457. try:
  458. # 失败时的业务逻辑
  459. self._set_task_status(self.task.id, 'pause')
  460. # 将故障信息写入task discussion
  461. if self.task_topic_id:
  462. error_message = f"任务处理出错 正在重试 \n- message id={message_id} \n- 错误信息:{message} \n- 异常:{str(exception)}"
  463. d_id = self._task_discussion(
  464. self.task.id,
  465. 'task',
  466. '任务处理出错',
  467. error_message,
  468. self.task_topic_id
  469. )
  470. except Exception as e:
  471. logger.error(f'处理失败ai任务时出错: {str(e)}')
  472. def handle_complete(self):
  473. try:
  474. # 将故障信息写入task discussion
  475. if self.task_topic_id:
  476. d_id = self._task_discussion(
  477. self.task.id,
  478. 'task',
  479. '任务处理完成',
  480. '任务处理完成',
  481. self.task_topic_id
  482. )
  483. except Exception as e:
  484. logger.error(f'处理任务完成时出错: {str(e)}')
  485. def get_task_id(self) -> str:
  486. return self.task.id