20211207164600_sent_pr_copy.php 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. <?php
  2. /*
  3. 迁移 sentence pr 库
  4. 从旧数据表中提取数据插入到新的表
  5. 插入时用uuid判断是否曾经插入
  6. 曾经插入就不插入了
  7. */
  8. require_once __DIR__."/../../../public/app/config.php";
  9. require_once __DIR__."/../../../public/app/public/snowflakeid.php";
  10. set_exception_handler(function($e){
  11. fwrite(STDERR,"error-msg:".$e->getMessage().PHP_EOL);
  12. fwrite(STDERR,"error-file:".$e->getFile().PHP_EOL);
  13. fwrite(STDERR,"error-line:".$e->getLine().PHP_EOL);
  14. exit;
  15. });
  16. $fpError = fopen(__DIR__.'/log/'.basename($_SERVER['PHP_SELF'],'.php').".err.data.csv",'w');
  17. $start = time();
  18. # 雪花id
  19. $snowflake = new SnowFlakeId();
  20. # 更新索引表
  21. $src_db=_SQLITE_DB_SENTENCE_;#源数据库
  22. $src_table=_SQLITE_TABLE_SENTENCE_PR_;#源表名
  23. $dest_db=_PG_DB_SENTENCE_;#目标数据库
  24. $dest_table=_PG_TABLE_SENTENCE_PR_;#目标表名
  25. fwrite(STDOUT,"migarate sent_pr".PHP_EOL);
  26. #打开源数据库
  27. $PDO_SRC = new PDO($src_db,_DB_USERNAME_,_DB_PASSWORD_,array(PDO::ATTR_PERSISTENT=>true));
  28. $PDO_SRC->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  29. fwrite(STDOUT,"open src table".PHP_EOL);
  30. #打开目标数据库
  31. $PDO_DEST = new PDO($dest_db,_DB_USERNAME_,_DB_PASSWORD_,array(PDO::ATTR_PERSISTENT=>true));
  32. $PDO_DEST->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  33. fwrite(STDOUT,"open dest table".PHP_EOL);
  34. $queryInsert = "INSERT INTO ".$dest_table."
  35. (
  36. id,
  37. old_id,
  38. book_id,
  39. paragraph,
  40. word_start,
  41. word_end,
  42. channel_uid,
  43. author,
  44. editor_uid,
  45. content,
  46. language,
  47. status,
  48. strlen,
  49. create_time,
  50. modify_time,
  51. created_at,
  52. updated_at)
  53. VALUES ( ? , ? , ? , ? , ? ,? ,? ,? ,? ,? ,? ,?,?,?,?,to_timestamp(?),to_timestamp(?))";
  54. $commitData = [];
  55. $allInsertCount = 0;
  56. $allSrcCount = 0;
  57. $count = 0;
  58. #从源数据表中读取
  59. $query = "SELECT * FROM ".$src_table;
  60. $stmtSrc = $PDO_SRC->prepare($query);
  61. $stmtSrc->execute();
  62. while($srcData = $stmtSrc->fetch(PDO::FETCH_ASSOC)){
  63. $allSrcCount++;
  64. #插入目标表
  65. if(substr($srcData["book"],0,1)==="p"){
  66. $srcData["book"] = (int)substr($srcData["book"],1);
  67. }
  68. {
  69. $uuid = $srcData["id"];
  70. #查询目标表中是否有相同数据
  71. $queryExsit = "SELECT id FROM ".$dest_table." WHERE old_id = ? ";
  72. $getExist = $PDO_DEST->prepare($queryExsit);
  73. $getExist->execute(array($uuid));
  74. $exist = $getExist->fetch(PDO::FETCH_ASSOC);
  75. if(!$exist){
  76. #没有相同数据
  77. if(strlen($srcData["editor"])>36){
  78. fwrite(STDERR,"error: {$uuid} editor {$srcData["editor"]} too long".PHP_EOL);
  79. continue;
  80. }
  81. if(empty($srcData["language"]) ){
  82. $srcData["language"]="zh-hans";
  83. }
  84. if(empty($srcData["strlen"]) ){
  85. $srcData["strlen"]=mb_strlen($srcData["strlen"],"UTF-8");
  86. }
  87. if(empty($srcData["status"]) ){
  88. $srcData["status"]=10;
  89. }
  90. if(empty($srcData["book"]) || !is_numeric($srcData["book"])){
  91. fwrite(STDERR,"book is error id=".$uuid.PHP_EOL);
  92. fputcsv($fpError,$srcData);
  93. continue;
  94. }
  95. if(empty($srcData["paragraph"]) || !is_numeric($srcData["paragraph"])){
  96. fwrite(STDERR,"paragraph is error id=".$uuid.PHP_EOL);
  97. fputcsv($fpError,$srcData);
  98. continue;
  99. }
  100. if(empty($srcData["begin"]) || !is_numeric($srcData["begin"]) ){
  101. fwrite(STDERR,"begin is error id=".$uuid.PHP_EOL);
  102. fputcsv($fpError,$srcData);
  103. continue;
  104. }
  105. if(empty($srcData["end"]) || !is_numeric($srcData["end"]) ){
  106. fwrite(STDERR,"end is error id=".$uuid.PHP_EOL);
  107. fputcsv($fpError,$srcData);
  108. continue;
  109. }
  110. if(empty($srcData["channel"]) ){
  111. fwrite(STDERR,"end is error id=".$uuid.PHP_EOL);
  112. fputcsv($fpError,$srcData);
  113. continue;
  114. }
  115. if(empty($srcData["create_time"]) || $srcData["create_time"]<1532590551000){
  116. $srcData["create_time"]=1532590551000;
  117. }
  118. if(empty($srcData["modify_time"]) || $srcData["modify_time"]<1532590551000){
  119. $srcData["modify_time"]=1532590551000;
  120. }
  121. $commitData[] = array(
  122. $snowflake->id(),
  123. $uuid,
  124. $srcData["book"],
  125. $srcData["paragraph"],
  126. $srcData["begin"],
  127. $srcData["end"],
  128. $srcData["channel"],
  129. $srcData["author"],
  130. $srcData["editor"],
  131. $srcData["text"],
  132. $srcData["language"],
  133. $srcData["status"],
  134. $srcData["strlen"],
  135. $srcData["create_time"],
  136. $srcData["modify_time"],
  137. $srcData["create_time"]/1000,
  138. $srcData["modify_time"]/1000
  139. );
  140. $count++;
  141. $allInsertCount++;
  142. }
  143. if($count ==10000){
  144. #10000行插入一次
  145. // 开始一个事务,关闭自动提交
  146. $PDO_DEST->beginTransaction();
  147. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  148. foreach ($commitData as $key => $value) {
  149. try{
  150. $stmtDEST->execute($value);
  151. }catch(PDOException $e){
  152. fwrite(STDERR,$e->getMessage().PHP_EOL);
  153. fwrite(STDERR,implode(',',$value).PHP_EOL);
  154. exit;
  155. }
  156. }
  157. // 提交更改
  158. $PDO_DEST->commit();
  159. $commitData = [];
  160. fwrite(STDOUT, "finished $count".PHP_EOL);
  161. $count=0;
  162. }
  163. if($allSrcCount % 10000 ==0){
  164. echo "find from src table $allSrcCount / $allInsertCount is new.".PHP_EOL;
  165. }
  166. }
  167. }
  168. if($count>0){
  169. #最后的没有到10000的数据插入
  170. $PDO_DEST->beginTransaction();
  171. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  172. foreach ($commitData as $key => $value) {
  173. try{
  174. $stmtDEST->execute($value);
  175. }catch(PDOException $e){
  176. fwrite(STDERR,$e->getMessage().PHP_EOL);
  177. fwrite(STDERR,implode(',',$value).PHP_EOL);
  178. exit;
  179. }
  180. }
  181. // 提交更改
  182. $PDO_DEST->commit();
  183. $commitData = [];
  184. fwrite(STDOUT, "finished $count".PHP_EOL);
  185. }
  186. fwrite(STDOUT, "insert done $allInsertCount in $allSrcCount ".PHP_EOL);
  187. fwrite(STDOUT, "all done in ".(time()-$start)."s".PHP_EOL);
  188. fclose($fpError);