| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678 |
- <?php
- namespace App\Services;
- use Illuminate\Support\Facades\Log;
- use Illuminate\Support\Facades\Http;
- use Illuminate\Http\Client\RequestException;
- use Illuminate\Support\Facades\Cache;
- use App\Models\Task;
- use App\Models\PaliText;
- use App\Models\PaliSentence;
- use App\Models\AiModel;
- use App\Models\Sentence;
- use App\Http\Api\ChannelApi;
- use App\Http\Controllers\AuthController;
- use App\Http\Api\MdRender;
- use App\Exceptions\SectionTimeoutException;
- use App\Exceptions\TaskFailException;
- class DatabaseException extends \Exception {}
- class AiTranslateService
- {
- private $queue = 'ai_translate_v2';
- private $modelToken = null;
- private $task = null;
- protected $mq;
- private $apiTimeout = 30;
- private $llmTimeout = 300;
- private $taskTopicId;
- private $stop = false;
- private $maxProcessTime = 15 * 60; //一个句子的最大处理时间
- private $mqTimeout = 60;
- private $openaiProxy = null;
- public function __construct() {}
- public function setProxy(string $proxy): self
- {
- $this->openaiProxy = $proxy;
- return $this;
- }
- /**
- * @param string $messageId
- * @param array $translateData
- */
- public function processTranslate(string $messageId, array $messages): bool
- {
- $start = time();
- if (!is_array($messages) || count($messages) === 0) {
- Log::error('message is not array');
- return false;
- }
- $first = $messages[0];
- $this->task = $first->task->info;
- $taskId = $this->task->id;
- Cache::put("/task/{$taskId}/message_id", $messageId);
- $pointerKey = "/task/{$taskId}/pointer";
- $pointer = 0;
- if (Cache::has($pointerKey)) {
- //回到上次中断的点
- $pointer = Cache::get($pointerKey);
- Log::info("last break point {$pointer}");
- }
- //获取model token
- $this->modelToken = $first->model->token;
- Log::debug($this->queue . ' ai assistant token', ['token' => $this->modelToken]);
- $this->setTaskStatus($this->task->id, 'running');
- // 设置task discussion topic
- $this->taskTopicId = $this->taskDiscussion(
- $this->task->id,
- 'task',
- $this->task->title,
- $this->task->category,
- null
- );
- $time = [$this->maxProcessTime];
- for ($i = $pointer; $i < count($messages); $i++) {
- // 获取当前内存使用量
- Log::debug("memory usage: " . memory_get_usage(true) / 1024 / 1024 . " MB");
- // 获取峰值内存使用量
- Log::debug("memory peak usage: " . memory_get_peak_usage(true) / 1024 / 1024 . " MB");
- if ($this->stop) {
- Log::info("收到退出信号 pointer={$i}");
- return false;
- }
- if (\App\Tools\Tools::isStop()) {
- //检测到停止标记
- return false;
- }
- Cache::put($pointerKey, $i);
- $message = $messages[$i];
- $taskDiscussionContent = [];
- //推理
- $responseLLM = $this->requestLLM($message);
- $taskDiscussionContent[] = '- LLM request successful';
- if ($this->task->category === 'translate') {
- //写入句子库
- $message->sentence->content = $responseLLM['content'];
- try {
- $this->saveSentence($message->sentence);
- } catch (\Exception $e) {
- Log::error('sentence', ['message' => $e]);
- continue;
- }
- }
- if ($this->task->category === 'suggest') {
- //写入pr
- try {
- $this->savePr($message->sentence, $responseLLM['content']);
- } catch (\Exception $e) {
- Log::error('sentence', ['message' => $e]);
- continue;
- }
- }
- #获取句子id
- $sUid = $this->getSentenceId($message->sentence);
- //写入句子 discussion
- $topicId = $this->taskDiscussion(
- $sUid,
- 'sentence',
- $this->task->title,
- $this->task->category,
- null
- );
- if ($topicId) {
- Log::info($this->queue . ' discussion create topic successful');
- $data['parent'] = $topicId;
- unset($data['title']);
- $topicChildren = [];
- //提示词
- $topicChildren[] = $message->prompt;
- //任务结果
- $topicChildren[] = $responseLLM['content'];
- //推理过程写入discussion
- if (
- isset($responseLLM['reasoningContent']) &&
- !empty($responseLLM['reasoningContent'])
- ) {
- $topicChildren[] = $responseLLM['reasoningContent'];
- }
- foreach ($topicChildren as $content) {
- Log::debug($this->queue . ' discussion child request', ['data' => $data]);
- $dId = $this->taskDiscussion($sUid, 'sentence', $this->task->title, $content, $topicId);
- if ($dId) {
- Log::info($this->queue . ' discussion child successful');
- }
- }
- } else {
- Log::error($this->queue . ' discussion create topic response is null');
- }
- //修改task 完成度
- $progress = $this->setTaskProgress($message->task->progress);
- $taskDiscussionContent[] = "- progress=" . $progress;
- //写入task discussion
- if ($this->taskTopicId) {
- $content = implode("\n", $taskDiscussionContent);
- $dId = $this->taskDiscussion(
- $this->task->id,
- 'task',
- $this->task->title,
- $content,
- $this->taskTopicId
- );
- } else {
- Log::error('no task discussion root');
- }
- //计算剩余时间是否足够再做一次
- $time[] = time() - $start;
- rsort($time);
- $remain = $this->mqTimeout - (time() - $start);
- if ($remain < $time[0]) {
- throw new SectionTimeoutException;
- }
- }
- //任务完成 修改任务状态为 done
- if ($i === count($messages)) {
- $this->setTaskStatus($this->task->id, 'done');
- Cache::forget($pointerKey);
- Log::info('ai translate task complete');
- }
- return true;
- }
- private function setTaskStatus($taskId, $status)
- {
- $url = config('app.url') . '/api/v2/task-status/' . $taskId;
- $data = [
- 'status' => $status,
- ];
- Log::debug('ai_translate task status request', ['url' => $url, 'data' => $data]);
- $response = Http::timeout($this->apiTimeout)->withToken($this->modelToken)->patch($url, $data);
- //判断状态码
- if ($response->failed()) {
- Log::error('ai_translate task status error', ['data' => $response->json()]);
- } else {
- Log::info('ai_translate task status done');
- }
- }
- private function saveModelLog($token, $data)
- {
- $url = config('app.url') . '/api/v2/model-log';
- $response = Http::timeout($this->apiTimeout)->withToken($token)->post($url, $data);
- if ($response->failed()) {
- Log::error('ai-translate model log create failed', ['data' => $response->json()]);
- return false;
- }
- return true;
- }
- private function taskDiscussion($resId, $resType, $title, $content, $parentId = null)
- {
- $url = config('app.url') . '/api/v2/discussion';
- $taskDiscussionData = [
- 'res_id' => $resId,
- 'res_type' => $resType,
- 'content' => $content,
- 'content_type' => 'markdown',
- 'type' => 'discussion',
- 'notification' => false,
- ];
- if ($parentId) {
- $taskDiscussionData['parent'] = $parentId;
- } else {
- $taskDiscussionData['title'] = $title;
- }
- Log::debug($this->queue . ' discussion create', ['url' => $url, 'data' => json_encode($taskDiscussionData)]);
- $response = Http::timeout($this->apiTimeout)
- ->withToken($this->modelToken)
- ->post($url, $taskDiscussionData);
- if ($response->failed()) {
- Log::error($this->queue . ' discussion create error', ['data' => $response->json()]);
- return false;
- }
- Log::debug($this->queue . ' discussion create', ['data' => json_encode($response->json())]);
- if (isset($response->json()['data']['id'])) {
- return $response->json()['data']['id'];
- }
- return false;
- }
- private function requestLLM($message)
- {
- $param = [
- "model" => $message->model->model,
- "messages" => [
- ["role" => "system", "content" => $message->model->system_prompt ?? ''],
- ["role" => "user", "content" => $message->prompt],
- ],
- "temperature" => 0.3, # 低随机性,确保准确
- "top_k" => 20, # 限制候选词范围
- "stream" => false
- ];
- if ($this->openaiProxy) {
- $requestUrl = $this->openaiProxy;
- $body = [
- 'open_ai_url' => $message->model->url,
- 'api_key' => $message->model->key,
- 'payload' => $param,
- ];
- } else {
- $requestUrl = $message->model->url;
- $body = $param;
- }
- Log::info($this->queue . ' LLM request ' . $message->model->url . ' model:' . $param['model']);
- Log::debug($this->queue . ' LLM api request', [
- 'url' => $message->model->url,
- 'data' => json_encode($param),
- ]);
- //写入 model log
- $modelLogData = [
- 'model_id' => $message->model->uid,
- 'request_at' => now(),
- 'request_data' => json_encode($param, JSON_UNESCAPED_UNICODE),
- ];
- //失败重试
- $maxRetries = 3;
- $attempt = 0;
- try {
- while ($attempt < $maxRetries) {
- try {
- $response = Http::withToken($message->model->key)
- ->timeout($this->llmTimeout)
- ->post($requestUrl, $body);
- // 如果状态码是 4xx 或 5xx,会自动抛出 RequestException
- $response->throw();
- Log::info($this->queue . ' LLM request successful');
- $modelLogData['request_headers'] = json_encode($response->handlerStats(), JSON_UNESCAPED_UNICODE);
- $modelLogData['response_headers'] = json_encode($response->headers(), JSON_UNESCAPED_UNICODE);
- $modelLogData['status'] = $response->status();
- $modelLogData['response_data'] = json_encode($response->json(), JSON_UNESCAPED_UNICODE);
- self::saveModelLog($this->modelToken, $modelLogData);
- break; // 跳出 while 循环
- } catch (RequestException $e) {
- Log::error($this->queue . ' LLM request exception: ' . $e->getMessage());
- $failResponse = $e->response;
- $modelLogData['request_headers'] = json_encode($failResponse->handlerStats(), JSON_UNESCAPED_UNICODE);
- $modelLogData['response_headers'] = json_encode($failResponse->headers(), JSON_UNESCAPED_UNICODE);
- $modelLogData['status'] = $failResponse->status();
- $modelLogData['response_data'] = $response->body();
- $modelLogData['success'] = false;
- self::saveModelLog($this->modelToken, $modelLogData);
- $attempt++;
- $status = $e->response->status();
- // 某些错误不需要重试
- if (in_array($status, [400, 401, 403, 404, 422])) {
- Log::warning("客户端错误,不重试: {$status}\n");
- throw new TaskFailException; // 重新抛出异常
- }
- // 服务器错误或网络错误可以重试
- if ($attempt < $maxRetries) {
- $delay = pow(2, $attempt); // 指数退避
- Log::warning("请求失败(第 {$attempt} 次),{$delay} 秒后重试...\n");
- sleep($delay);
- } else {
- Log::error("达到最大重试次数,请求最终失败\n");
- throw new TaskFailException;
- }
- } catch (\Exception $e) {
- throw $e;
- }
- }
- } catch (\Exception $e) {
- throw $e;
- }
- Log::info($this->queue . ' model log saved');
- $aiData = $response->json();
- Log::debug($this->queue . ' LLM http response', ['data' => $response->json()]);
- $responseContent = $aiData['choices'][0]['message']['content'];
- if (isset($aiData['choices'][0]['message']['reasoning_content'])) {
- $reasoningContent = $aiData['choices'][0]['message']['reasoning_content'];
- }
- $output = ['content' => $responseContent];
- Log::debug($this->queue . ' LLM response content=' . $responseContent);
- if (empty($reasoningContent)) {
- Log::debug($this->queue . ' no reasoningContent');
- } else {
- Log::debug($this->queue . ' reasoning=' . $reasoningContent);
- $output['reasoningContent'] = $reasoningContent;
- }
- return $output;
- }
- /**
- * 写入句子库
- */
- private function saveSentence($sentence)
- {
- $url = config('app.url') . '/api/v2/sentence';
- Log::info($this->queue . " sentence update {$url}");
- $response = Http::timeout($this->apiTimeout)->withToken($this->modelToken)->post($url, [
- 'sentences' => [$sentence],
- ]);
- if ($response->failed()) {
- Log::error($this->queue . ' sentence update failed', [
- 'url' => $url,
- 'data' => $response->json(),
- ]);
- throw new DatabaseException("sentence 数据库写入错误");
- }
- $count = $response->json()['data']['count'];
- Log::info("{$this->queue} sentence update {$count} successful");
- }
- private function savePr($sentence, $content)
- {
- $url = config('app.url') . '/api/v2/sentpr';
- Log::info($this->queue . " sentence update {$url}");
- $response = Http::timeout($this->apiTimeout)->withToken($this->modelToken)->post($url, [
- 'book' => $sentence->book_id,
- 'para' => $sentence->paragraph,
- 'begin' => $sentence->word_start,
- 'end' => $sentence->word_end,
- 'channel' => $sentence->channel_uid,
- 'text' => $content,
- 'notification' => false,
- 'webhook' => false,
- ]);
- if ($response->failed()) {
- Log::error($this->queue . ' sentence update failed', [
- 'url' => $url,
- 'data' => $response->json(),
- ]);
- throw new DatabaseException("pr 数据库写入错误");
- }
- if ($response->json()['ok']) {
- Log::info("{$this->queue} sentence suggest update successful");
- } else {
- Log::error("{$this->queue} sentence suggest update failed", [
- 'url' => $url,
- 'data' => $response->json(),
- ]);
- }
- }
- private function getSentenceId($sentence)
- {
- $url = config('app.url') . '/api/v2/sentence-info/aa';
- Log::info('ai translate', ['url' => $url]);
- $response = Http::timeout($this->apiTimeout)->withToken($this->modelToken)->get($url, [
- 'book' => $sentence->book_id,
- 'par' => $sentence->paragraph,
- 'start' => $sentence->word_start,
- 'end' => $sentence->word_end,
- 'channel' => $sentence->channel_uid
- ]);
- if (!$response->json()['ok']) {
- Log::error($this->queue . ' sentence id error', ['data' => $response->json()]);
- return false;
- }
- $sUid = $response->json()['data']['id'];
- Log::debug("sentence id={$sUid}");
- return $sUid;
- }
- private function setTaskProgress($current)
- {
- $taskProgress = $current;
- if ($taskProgress->total > 0) {
- $progress = (int)($taskProgress->current * 100 / $taskProgress->total);
- } else {
- $progress = 100;
- Log::error($this->queue . ' progress total is zero', ['task_id' => $this->task->id]);
- }
- $url = config('app.url') . '/api/v2/task/' . $this->task->id;
- $data = [
- 'progress' => $progress,
- ];
- Log::debug($this->queue . ' task progress request', ['url' => $url, 'data' => $data]);
- $response = Http::timeout($this->apiTimeout)->withToken($this->modelToken)->patch($url, $data);
- if ($response->failed()) {
- Log::error($this->queue . ' task progress error', ['data' => $response->json()]);
- } else {
- Log::info($this->queue . ' task progress successful progress=' . $response->json()['data']['progress']);
- }
- return $progress;
- }
- public function handleFailedTranslate(string $messageId, array $translateData, \Exception $exception): void
- {
- try {
- // 彻底失败时的业务逻辑
- // 设置task为失败状态
- $this->setTaskStatus($this->task->id, 'stop');
- //将故障信息写入task discussion
- if ($this->taskTopicId) {
- $dId = $this->taskDiscussion(
- $this->task->id,
- 'task',
- $this->task->title,
- "**处理失败ai任务时出错** 请重启任务 message id={$messageId} 错误信息:" . $exception->getMessage(),
- $this->taskTopicId
- );
- }
- } catch (\Exception $e) {
- Log::error('处理失败ai任务时出错', ['error' => $e->getMessage()]);
- }
- }
- /**
- * 读取task信息,将任务拆解为单句小任务
- *
- * @param string $taskId 任务uuid
- * @return array 拆解后的提示词数组
- */
- public static function makeByTask(string $taskId, $aiAssistantId)
- {
- $task = Task::findOrFail($taskId);
- $description = $task->description;
- $rows = explode("\n", $description);
- $params = [];
- foreach ($rows as $key => $row) {
- if (strpos($row, '=') !== false) {
- $param = explode('=', trim($row, '|'));
- $params[$param[0]] = $param[1];
- }
- }
- if (!isset($params['type'])) {
- Log::error('no $params.type');
- return false;
- }
- //get sentences in article
- $sentences = array();
- $totalLen = 0;
- switch ($params['type']) {
- case 'sentence':
- if (!isset($params['id'])) {
- Log::error('no $params.id');
- return false;
- }
- $sentences[] = explode('-', $params['id']);
- break;
- case 'para':
- if (!isset($params['book']) || !isset($params['paragraphs'])) {
- Log::error('no $params.book or paragraphs');
- return false;
- }
- $sent = PaliSentence::where('book', $params['book'])
- ->where('paragraph', $params['paragraphs'])->orderBy('word_begin')->get();
- foreach ($sent as $key => $value) {
- $sentences[] = [
- 'id' => [
- $value->book,
- $value->paragraph,
- $value->word_begin,
- $value->word_end,
- ],
- 'strlen' => $value->length
- ];
- $totalLen += $value->length;
- }
- break;
- case 'chapter':
- if (!isset($params['book']) || !isset($params['paragraphs'])) {
- Log::error('no $params.book or paragraphs');
- return false;
- }
- $chapterLen = PaliText::where('book', $params['book'])
- ->where('paragraph', $params['paragraphs'])->value('chapter_len');
- $sent = PaliSentence::where('book', $params['book'])
- ->whereBetween('paragraph', [$params['paragraphs'], $params['paragraphs'] + $chapterLen - 1])
- ->orderBy('paragraph')
- ->orderBy('word_begin')->get();
- foreach ($sent as $key => $value) {
- $sentences[] = [
- 'id' => [
- $value->book,
- $value->paragraph,
- $value->word_begin,
- $value->word_end,
- ],
- 'strlen' => $value->length
- ];
- $totalLen += $value->length;
- }
- break;
- default:
- return false;
- break;
- }
- //render prompt
- $mdRender = new MdRender([
- 'format' => 'prompt',
- 'footnote' => false,
- 'paragraph' => false,
- ]);
- $m = new \Mustache_Engine(array(
- 'entity_flags' => ENT_QUOTES,
- 'escape' => function ($value) {
- return $value;
- }
- ));
- # ai model
- $aiModel = AiModel::findOrFail($aiAssistantId);
- $modelToken = AuthController::getUserToken($aiModel->uid);
- $aiModel['token'] = $modelToken;
- $sumLen = 0;
- $mqData = [];
- foreach ($sentences as $key => $sentence) {
- $sumLen += $sentence['strlen'];
- $sid = implode('-', $sentence['id']);
- Log::debug($sid);
- $sentChannelInfo = explode('@', $params['channel']);
- $channelId = $sentChannelInfo[0];
- $data = [];
- $data['origin'] = '{{' . $sid . '}}';
- $data['translation'] = '{{sent|id=' . $sid;
- $data['translation'] .= '|channel=' . $channelId;
- $data['translation'] .= '|text=translation}}';
- if (isset($params['nissaya']) && !empty($params['nissaya'])) {
- $nissayaChannel = explode('@', $params['nissaya']);
- $channelInfo = ChannelApi::getById($nissayaChannel[0]);
- if ($channelInfo) {
- //查看句子是否存在
- $nissayaSent = Sentence::where('book_id', $sentence['id'][0])
- ->where('paragraph', $sentence['id'][1])
- ->where('word_start', $sentence['id'][2])
- ->where('word_end', $sentence['id'][3])
- ->where('channel_uid', $nissayaChannel[0])->first();
- if ($nissayaSent && !empty($nissayaSent->content)) {
- $nissayaData = [];
- $nissayaData['channel'] = $channelInfo;
- $nissayaData['data'] = '{{sent|id=' . $sid;
- $nissayaData['data'] .= '|channel=' . $nissayaChannel[0];
- $nissayaData['data'] .= '|text=translation}}';
- $data['nissaya'] = $nissayaData;
- }
- }
- }
- $content = $m->render($description, $data);
- $prompt = $mdRender->convert($content, []);
- //gen mq
- $aiMqData = [
- 'model' => $aiModel,
- 'task' => [
- 'info' => $task,
- 'progress' => [
- 'current' => $sumLen,
- 'total' => $totalLen
- ],
- ],
- 'prompt' => $prompt,
- 'sentence' => [
- 'book_id' => $sentence['id'][0],
- 'paragraph' => $sentence['id'][1],
- 'word_start' => $sentence['id'][2],
- 'word_end' => $sentence['id'][3],
- 'channel_uid' => $channelId,
- 'content' => $prompt,
- 'content_type' => 'markdown',
- 'access_token' => $sentChannelInfo[1] ?? $params['token'],
- ],
- ];
- array_push($mqData, $aiMqData);
- }
- $output = [
- 'model' => $aiModel->toArray(),
- 'task' => $task,
- ];
- $us = ['openai.com', 'googleapis.com', 'x.ai', 'anthropic.com'];
- $found = array_filter($us, function ($value) use ($output) {
- return str_contains($output['model']['url'], $value);
- });
- if ($found) {
- $output['area'] = 'us';
- } else {
- $output['area'] = 'cn';
- }
- $output['payload'] = $mqData;
- return $output;
- }
- public function stop()
- {
- $this->stop = true;
- }
- }
|