20211207052901_sent_copy.php 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. <?php
  2. /*
  3. 迁移 sentence库
  4. 从旧数据表中提取数据插入到新的表
  5. 插入时用uuid判断是否曾经插入
  6. 曾经插入就不插入了
  7. */
  8. require_once __DIR__."/../../../public/app/config.php";
  9. require_once __DIR__."/../../../public/app/public/snowflakeid.php";
  10. set_exception_handler(function($e){
  11. fwrite(STDERR,"error-msg:".$e->getMessage().PHP_EOL);
  12. fwrite(STDERR,"error-file:".$e->getFile().PHP_EOL);
  13. fwrite(STDERR,"error-line:".$e->getLine().PHP_EOL);
  14. exit;
  15. });
  16. $start = time();
  17. # 雪花id
  18. $snowflake = new SnowFlakeId();
  19. $fpError = fopen(__DIR__.'/log/'.basename($_SERVER['PHP_SELF'],'.php').".err.data.csv",'w');
  20. # 更新数据表
  21. $src_db=_SQLITE_DB_SENTENCE_;#源数据库
  22. $src_table=_SQLITE_TABLE_SENTENCE_;#源表名
  23. $dest_db=_PG_DB_SENTENCE_;#目标数据库
  24. $dest_table=_PG_TABLE_SENTENCE_;#目标表名
  25. echo "migarating sentence".PHP_EOL;
  26. #打开源数据库
  27. $PDO_SRC = new PDO($src_db,_DB_USERNAME_,_DB_PASSWORD_,array(PDO::ATTR_PERSISTENT=>true));
  28. $PDO_SRC->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  29. fwrite(STDOUT,"open src table".PHP_EOL);
  30. #打开目标数据库
  31. $PDO_DEST = new PDO($dest_db,_DB_USERNAME_,_DB_PASSWORD_,array(PDO::ATTR_PERSISTENT=>true));
  32. $PDO_DEST->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  33. fwrite(STDOUT,"open dest table".PHP_EOL);
  34. // 开始一个事务,关闭自动提交
  35. $queryInsert = "INSERT INTO ".$dest_table." (
  36. id,
  37. uid,
  38. parent_uid,
  39. block_uid ,
  40. channel_uid,
  41. book_id,
  42. paragraph,
  43. word_start,
  44. word_end,
  45. author,
  46. editor_uid,
  47. content,
  48. language,
  49. version,
  50. status,
  51. strlen,
  52. create_time,
  53. modify_time,
  54. created_at,
  55. updated_at) VALUES (? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? ,to_timestamp(?),to_timestamp(?))";
  56. $commitData = [];
  57. $allInsertCount = 0;
  58. $allSrcCount = 0;
  59. $count = 0;
  60. #从源数据表中读取
  61. $query = "SELECT * FROM ".$src_table;
  62. $stmtSrc = $PDO_SRC->prepare($query);
  63. $stmtSrc->execute();
  64. while($srcData = $stmtSrc->fetch(PDO::FETCH_ASSOC)){
  65. $allSrcCount++;
  66. #插入目标表
  67. $uuid = $srcData["id"];
  68. #查询目标表中是否有相同数据
  69. $queryExsit = "SELECT id FROM ".$dest_table." WHERE uid = ? ";
  70. $getExist = $PDO_DEST->prepare($queryExsit);
  71. $getExist->execute(array($uuid));
  72. $exist = $getExist->fetch(PDO::FETCH_ASSOC);
  73. if(!$exist){
  74. if(strlen($srcData["editor"])>36){
  75. fwrite(STDERR,"error: {$uuid} editor {$srcData["editor"]} too long".PHP_EOL);
  76. continue;
  77. }
  78. if(empty($srcData["language"]) ){
  79. $srcData["language"]="zh-hans";
  80. }
  81. if(empty($srcData["strlen"]) ){
  82. $srcData["strlen"]=mb_strlen($srcData["strlen"],"UTF-8");
  83. }
  84. if(empty($srcData["status"]) ){
  85. $srcData["status"]=10;
  86. }
  87. if(empty($srcData["ver"]) ){
  88. $srcData["ver"]=1;
  89. }
  90. if(empty($srcData["book"]) || !is_numeric($srcData["book"])){
  91. fwrite(STDERR,"book is error id=".$uuid.PHP_EOL);
  92. fputcsv($fpError,$srcData);
  93. continue;
  94. }
  95. if(empty($srcData["paragraph"]) || !is_numeric($srcData["paragraph"])){
  96. fwrite(STDERR,"paragraph is error id=".$uuid.PHP_EOL);
  97. fputcsv($fpError,$srcData);
  98. continue;
  99. }
  100. if(empty($srcData["begin"]) || !is_numeric($srcData["begin"]) ){
  101. fwrite(STDERR,"begin is error id=".$uuid.PHP_EOL);
  102. fputcsv($fpError,$srcData);
  103. continue;
  104. }
  105. if(empty($srcData["end"]) || !is_numeric($srcData["end"]) ){
  106. fwrite(STDERR,"end is error id=".$uuid.PHP_EOL);
  107. fputcsv($fpError,$srcData);
  108. continue;
  109. }
  110. if(empty($srcData["create_time"]) || $srcData["create_time"]<1532590551000){
  111. $srcData["create_time"]=1532590551000;
  112. }
  113. if(empty($srcData["modify_time"]) || $srcData["modify_time"]<1532590551000){
  114. $srcData["modify_time"]=1532590551000;
  115. }
  116. $commitData[] = array(
  117. $snowflake->id(),
  118. $uuid,
  119. $srcData["parent"],
  120. $srcData["block_id"],
  121. $srcData["channal"],
  122. $srcData["book"],
  123. $srcData["paragraph"],
  124. $srcData["begin"],
  125. $srcData["end"],
  126. $srcData["author"],
  127. $srcData["editor"],
  128. $srcData["text"],
  129. $srcData["language"],
  130. $srcData["ver"],
  131. $srcData["status"],
  132. $srcData["strlen"],
  133. $srcData["create_time"],
  134. $srcData["modify_time"],
  135. $srcData["create_time"]/1000,
  136. $srcData["modify_time"]/1000
  137. );
  138. $count++;
  139. $allInsertCount++;
  140. if($count === 10000){
  141. $PDO_DEST->beginTransaction();
  142. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  143. foreach ($commitData as $key => $value) {
  144. # code...
  145. try{
  146. $stmtDEST->execute($value);
  147. }catch(PDOException $e){
  148. fwrite(STDERR,$e->getMessage().PHP_EOL);
  149. fwrite(STDERR,implode(',',$value).PHP_EOL);
  150. continue;
  151. }
  152. }
  153. // 提交更改
  154. $PDO_DEST->commit();
  155. echo "finished $count".PHP_EOL;
  156. $commitData = [];
  157. $count = 0;
  158. }
  159. }
  160. if($allSrcCount % 10000 ==0){
  161. echo "find from src table $allSrcCount / $allInsertCount is new.".PHP_EOL;
  162. }
  163. }
  164. if($count>0){
  165. #最后的没有到10000的数据插入
  166. $PDO_DEST->beginTransaction();
  167. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  168. foreach ($commitData as $key => $value) {
  169. # code...
  170. $stmtDEST->execute($value);
  171. }
  172. // 提交更改
  173. $PDO_DEST->commit();
  174. fwrite(STDOUT,"finished $count".PHP_EOL) ;
  175. }
  176. fwrite(STDOUT,"insert done $allInsertCount in $allSrcCount ".PHP_EOL);
  177. fwrite(STDOUT, "all done in ".(time()-$start)."s".PHP_EOL);
  178. fclose($fpError);