2
0

20211207052900_sent_block_copy.php 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. <?php
  2. /*
  3. 迁移 sentence库
  4. 从旧数据表中提取数据插入到新的表
  5. 插入时用uuid判断是否曾经插入
  6. 曾经插入就不插入了
  7. */
  8. require_once __DIR__."/../../../public/app/config.php";
  9. require_once __DIR__."/../../../public/app/public/snowflakeid.php";
  10. set_exception_handler(function($e){
  11. fwrite(STDERR,"error-msg:".$e->getMessage().PHP_EOL);
  12. fwrite(STDERR,"error-file:".$e->getFile().PHP_EOL);
  13. fwrite(STDERR,"error-line:".$e->getLine().PHP_EOL);
  14. exit;
  15. });
  16. $start = time();
  17. # 雪花id
  18. $snowflake = new SnowFlakeId();
  19. # 更新索引表
  20. $src_db = _SQLITE_DB_SENTENCE_;#源数据库
  21. $src_table = _SQLITE_TABLE_SENTENCE_BLOCK_;#源表名
  22. $dest_db = _PG_DB_SENTENCE_;#目标数据库
  23. $dest_table = _PG_TABLE_SENTENCE_BLOCK_;#目标表名
  24. fwrite(STDOUT,"migarate sent_block ".PHP_EOL);
  25. #打开源数据库
  26. $PDO_SRC = new PDO($src_db,_DB_USERNAME_,_DB_PASSWORD_,array(PDO::ATTR_PERSISTENT=>true));
  27. $PDO_SRC->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  28. fwrite(STDOUT,"open src table".PHP_EOL);
  29. #打开目标数据库
  30. $PDO_DEST = new PDO($dest_db,_DB_USERNAME_,_DB_PASSWORD_,array(PDO::ATTR_PERSISTENT=>true));
  31. $PDO_DEST->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  32. fwrite(STDOUT,"open dest table".PHP_EOL);
  33. $queryInsert = "INSERT INTO ".$dest_table." (
  34. id,
  35. uid,
  36. parent_uid ,
  37. book_id,
  38. paragraph,
  39. owner_uid,
  40. lang,
  41. author,
  42. editor_uid,
  43. status,
  44. create_time,
  45. modify_time,
  46. created_at,
  47. updated_at)
  48. VALUES ( ? , ? , ? , ? , ? ,? ,? ,? ,? ,? ,? ,?,to_timestamp(?),to_timestamp(?))";
  49. $commitData = [];
  50. $allInsertCount = 0;
  51. $allSrcCount = 0;
  52. $count = 0;
  53. #从源数据表中读取
  54. $query = "SELECT * FROM ".$src_table;
  55. $stmtSrc = $PDO_SRC->prepare($query);
  56. $stmtSrc->execute();
  57. while($srcData = $stmtSrc->fetch(PDO::FETCH_ASSOC)){
  58. $allSrcCount++;
  59. #插入目标表
  60. if(substr($srcData["book"],0,1)==="p"){
  61. $srcData["book"] = (int)substr($srcData["book"],1);
  62. }
  63. if(strlen($srcData["id"])>10 && strlen($srcData["owner"])>30){
  64. $uuid = $srcData["id"];
  65. #查询目标表中是否有相同数据
  66. $queryExsit = "SELECT id FROM ".$dest_table." WHERE uid = ? ";
  67. $getExist = $PDO_DEST->prepare($queryExsit);
  68. $getExist->execute(array($uuid));
  69. $exist = $getExist->fetch(PDO::FETCH_ASSOC);
  70. if(!$exist){
  71. #没有相同数据
  72. if(strlen($srcData["editor"])>36){
  73. fwrite(STDERR,"error: {$uuid} editor {$srcData["editor"]} is too long".PHP_EOL);
  74. continue;
  75. }
  76. if(empty($srcData["lang"]) ){
  77. $srcData["lang"]="zh-hans";
  78. }
  79. if(empty($srcData["status"]) ){
  80. $srcData["status"]=10;
  81. }
  82. if(empty($srcData["book"]) || !is_numeric($srcData["book"])){
  83. fwrite(STDERR,"book is error id=".$uuid.PHP_EOL);
  84. fputcsv($fpError,$srcData);
  85. continue;
  86. }
  87. if(empty($srcData["paragraph"]) || !is_numeric($srcData["paragraph"])){
  88. fwrite(STDERR,"paragraph is error id=".$uuid.PHP_EOL);
  89. fputcsv($fpError,$srcData);
  90. continue;
  91. }
  92. if(empty($srcData["modify_time"]) || $srcData["modify_time"]<1532590551000){
  93. $srcData["modify_time"]=1532590551000;
  94. }
  95. $commitData[] = array(
  96. $snowflake->id(),
  97. $uuid,
  98. $srcData["parent_id"],
  99. $srcData["book"],
  100. $srcData["paragraph"],
  101. $srcData["owner"],
  102. $srcData["lang"],
  103. $srcData["author"],
  104. $srcData["editor"],
  105. $srcData["status"],
  106. $srcData["modify_time"],
  107. $srcData["modify_time"],
  108. $srcData["modify_time"]/1000,
  109. $srcData["modify_time"]/1000
  110. );
  111. $count++;
  112. $allInsertCount++;
  113. }
  114. if($count ==10000){
  115. #10000行插入一次
  116. // 开始一个事务,关闭自动提交
  117. $PDO_DEST->beginTransaction();
  118. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  119. foreach ($commitData as $key => $value) {
  120. try{
  121. $stmtDEST->execute($value);
  122. }catch(PDOException $e){
  123. fwrite(STDERR,$e->getMessage().PHP_EOL);
  124. fwrite(STDERR,implode(',',$value).PHP_EOL);
  125. continue;
  126. }
  127. }
  128. // 提交更改
  129. $PDO_DEST->commit();
  130. $commitData = [];
  131. fwrite(STDOUT,"finished $count".PHP_EOL) ;
  132. $count=0;
  133. }
  134. }
  135. }
  136. if($count>0){
  137. #最后的没有到10000的数据插入
  138. $PDO_DEST->beginTransaction();
  139. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  140. foreach ($commitData as $key => $value) {
  141. $stmtDEST->execute($value);
  142. }
  143. // 提交更改
  144. $PDO_DEST->commit();
  145. $commitData = [];
  146. echo "finished $count".PHP_EOL;
  147. }
  148. echo "insert done $allInsertCount in $allSrcCount ".PHP_EOL;