20211210160700_user_dict_copy.php 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. <?php
  2. /*
  3. 从旧数据表中提取数据插入到新的表
  4. 插入时用uuid判断是否曾经插入
  5. 曾经插入就不插入了
  6. */
  7. require_once __DIR__."/../../../public/app/config.php";
  8. require_once __DIR__."/../../../public/app/public/snowflakeid.php";
  9. set_exception_handler(function($e){
  10. fwrite(STDERR,"error-msg:".$e->getMessage().PHP_EOL);
  11. fwrite(STDERR,"error-file:".$e->getFile().PHP_EOL);
  12. fwrite(STDERR,"error-line:".$e->getLine().PHP_EOL);
  13. exit;
  14. });
  15. $start = time();
  16. # 雪花id
  17. $snowflake = new SnowFlakeId();
  18. $fpError = fopen(__DIR__.'/log/'.basename($_SERVER['PHP_SELF'],'.php').".err.data.csv",'w');
  19. # 更新索引表
  20. $src_db=_SQLITE_DB_WBW_;#源数据库
  21. $src_table=_SQLITE_TABLE_DICT_WBW_;#源表名
  22. $dest_db=_PG_DB_WBW_;#目标数据库
  23. $dest_table=_PG_TABLE_DICT_WBW_;#目标表名
  24. echo "migarate user dict".PHP_EOL;
  25. #打开源数据库
  26. $PDO_SRC = new PDO($src_db,_DB_USERNAME_,_DB_PASSWORD_,array(PDO::ATTR_PERSISTENT=>true));
  27. $PDO_SRC->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  28. echo "open src".PHP_EOL;
  29. #打开目标数据库
  30. $PDO_DEST = new PDO($dest_db,_DB_USERNAME_,_DB_PASSWORD_,array(PDO::ATTR_PERSISTENT=>true));
  31. $PDO_DEST->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  32. echo "open dest".PHP_EOL;
  33. #删除目标数据表中全部数据
  34. fwrite(STDOUT,"delete dest".PHP_EOL);
  35. $query = "delete from $dest_table where true;";
  36. $stmtDest = $PDO_DEST->prepare($query);
  37. $stmtDest->execute();
  38. $queryInsert = "INSERT INTO ".$dest_table."
  39. (
  40. id,
  41. word ,
  42. type,
  43. gramma,
  44. parent,
  45. mean,
  46. note,
  47. factors,
  48. factormean,
  49. status,
  50. source,
  51. language,
  52. confidence,
  53. creator_id,
  54. ref_counter,
  55. create_time,
  56. created_at,
  57. updated_at
  58. )
  59. VALUES (? , ? , ? , ? ,? ,? ,? ,? ,? ,? ,? , ?,?,?,?,?,to_timestamp(?),to_timestamp(?))";
  60. echo "read from orginal".PHP_EOL;
  61. $commitData = [];
  62. $allInsertCount = 0;
  63. $allSrcCount = 0;
  64. $count = 0;
  65. #从源数据表中读取
  66. $query = "SELECT * FROM ".$src_table." WHERE true ";
  67. $stmtSrc = $PDO_SRC->prepare($query);
  68. $stmtSrc->execute();
  69. while($srcData = $stmtSrc->fetch(PDO::FETCH_ASSOC)){
  70. $allSrcCount++;
  71. #插入目标表
  72. if(empty($srcData["pali"]) ){
  73. fwrite(STDERR,"pali is null id=".$srcData["id"].PHP_EOL);
  74. fputcsv($fpError,$srcData);
  75. continue;
  76. }
  77. if(empty($srcData["creator"]) ){
  78. fwrite(STDERR,"creator is null id=".$srcData["id"].PHP_EOL);
  79. fputcsv($fpError,$srcData);
  80. continue;
  81. }
  82. $commitData[] = array(
  83. $snowflake->id(),
  84. $srcData["pali"],
  85. $srcData["type"],
  86. $srcData["gramma"],
  87. $srcData["parent"],
  88. $srcData["mean"],
  89. $srcData["note"],
  90. $srcData["factors"],
  91. $srcData["factormean"],
  92. $srcData["status"],
  93. "_SYS_USER_WBW_",
  94. $srcData["language"],
  95. $srcData["confidence"],
  96. $srcData["creator"],
  97. $srcData["ref_counter"],
  98. $srcData["time"],
  99. $srcData["time"],
  100. $srcData["time"]
  101. );
  102. $count++;
  103. $allInsertCount++;
  104. if($count ==10000){
  105. #10000行插入一次
  106. $PDO_DEST->beginTransaction();
  107. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  108. foreach ($commitData as $key => $value) {
  109. try{
  110. $stmtDEST->execute($value);
  111. }catch(PDOException $e){
  112. fwrite(STDERR,$e->getMessage().PHP_EOL);
  113. fwrite(STDERR,implode(',',$value).PHP_EOL);
  114. exit;
  115. }
  116. }
  117. // 提交更改
  118. $PDO_DEST->commit();
  119. $commitData = [];
  120. echo "finished $count".PHP_EOL;
  121. $count=0;
  122. }
  123. if($allSrcCount % 10000 ==0){
  124. echo "find from src table $allSrcCount / $allInsertCount is new.".PHP_EOL;
  125. }
  126. }
  127. if($count>0){
  128. #最后的没有到10000的数据插入
  129. $PDO_DEST->beginTransaction();
  130. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  131. foreach ($commitData as $key => $value) {
  132. $stmtDEST->execute($value);
  133. }
  134. // 提交更改
  135. $PDO_DEST->commit();
  136. $commitData = [];
  137. echo "finished $count".PHP_EOL;
  138. }
  139. echo "insert done $allInsertCount in $allSrcCount ".PHP_EOL;
  140. # 更新索引表
  141. $src_table=_SQLITE_TABLE_DICT_WBW_INDEX_;#源表名
  142. $src_word_table=_SQLITE_TABLE_DICT_WBW_;#源word表名
  143. echo "migarating usr dict index ".PHP_EOL;
  144. // 开始一个事务,关闭自动提交
  145. $commitData = [];
  146. $allInsertCount = 0;
  147. $allSrcCount = 0;
  148. $count = 0;
  149. #从源数据表中读取
  150. $query = "SELECT * FROM ".$src_table;
  151. $stmtSrc = $PDO_SRC->prepare($query);
  152. $stmtSrc->execute();
  153. while($srcData = $stmtSrc->fetch(PDO::FETCH_ASSOC)){
  154. $allSrcCount++;
  155. #插入目标表
  156. $wordIndex = $srcData["word_index"];
  157. #查询目标表中的数据
  158. $queryExsit = "SELECT * FROM ".$src_word_table." WHERE id = ? ";
  159. $getWord = $PDO_SRC->prepare($queryExsit);
  160. $getWord->execute(array($wordIndex));
  161. $exist = $getWord->fetch(PDO::FETCH_ASSOC);
  162. if($exist){
  163. if(empty($srcData["user_id"]) ){
  164. fwrite(STDERR,"index user_id is null id=".$srcData["id"].PHP_EOL);
  165. fputcsv($fpError,$srcData);
  166. continue;
  167. }
  168. if(empty($exist["pali"]) ){
  169. fwrite(STDERR,"pali is null id=".$srcData["id"].PHP_EOL);
  170. continue;
  171. }
  172. if(empty($exist["creator"]) ){
  173. fwrite(STDERR,"creator is null id=".$srcData["id"].PHP_EOL);
  174. continue;
  175. }
  176. $commitData[] = array(
  177. $snowflake->id(),
  178. $exist["pali"],
  179. $exist["type"],
  180. $exist["gramma"],
  181. $exist["parent"],
  182. $exist["mean"],
  183. $exist["note"],
  184. $exist["factors"],
  185. $exist["factormean"],
  186. (int)$exist["status"],
  187. "_USER_WBW_",
  188. $exist["language"],
  189. $exist["confidence"],
  190. $srcData["user_id"],
  191. $exist["ref_counter"],
  192. $exist["time"],
  193. $exist["time"],
  194. $exist["time"]
  195. );
  196. $count++;
  197. $allInsertCount++;
  198. if($count === 10000){
  199. $PDO_DEST->beginTransaction();
  200. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  201. foreach ($commitData as $key => $value) {
  202. # code...
  203. $stmtDEST->execute($value);
  204. }
  205. // 提交更改
  206. $PDO_DEST->commit();
  207. $commitData = [];
  208. echo "finished $count".PHP_EOL;
  209. $count = 0;
  210. }
  211. }else{
  212. fwrite(STDERR,"error: no word index - $wordIndex".PHP_EOL);
  213. }
  214. if($allSrcCount % 10000 ==0){
  215. fwrite(STDOUT,"find from src table $allSrcCount / $allInsertCount is new.".PHP_EOL) ;
  216. }
  217. }
  218. if($count>0){
  219. #最后的没有到10000的数据插入
  220. $PDO_DEST->beginTransaction();
  221. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  222. foreach ($commitData as $key => $value) {
  223. # code...
  224. $stmtDEST->execute($value);
  225. }
  226. // 提交更改
  227. $PDO_DEST->commit();
  228. $commitData = [];
  229. fwrite(STDOUT,"finished $count".PHP_EOL);
  230. }
  231. fwrite(STDOUT,"insert done $allInsertCount in $allSrcCount ".PHP_EOL);
  232. fwrite(STDOUT, "all done in ".(time()-$start)."s".PHP_EOL);