2
0

20211210160700_user_dict_copy.php 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. <?php
  2. /*
  3. 从旧数据表中提取数据插入到新的表
  4. 插入时用uuid判断是否曾经插入
  5. 曾经插入就不插入了
  6. */
  7. require_once __DIR__."/../../../public/app/config.php";
  8. require_once __DIR__."/../../../public/app/public/snowflakeid.php";
  9. set_exception_handler(function($e){
  10. fwrite(STDERR,"error-msg:".$e->getMessage().PHP_EOL);
  11. fwrite(STDERR,"error-file:".$e->getFile().PHP_EOL);
  12. fwrite(STDERR,"error-line:".$e->getLine().PHP_EOL);
  13. exit;
  14. });
  15. $start = time();
  16. # 雪花id
  17. $snowflake = new SnowFlakeId();
  18. $fpError = fopen(__DIR__.'/log/'.basename($_SERVER['PHP_SELF'],'.php').".err.data.csv",'w');
  19. # 更新索引表
  20. $src_db=_SQLITE_DB_WBW_;#源数据库
  21. $src_table=_SQLITE_TABLE_DICT_WBW_;#源表名
  22. $dest_db=_PG_DB_WBW_;#目标数据库
  23. $dest_table=_PG_TABLE_DICT_WBW_;#目标表名
  24. echo "migarate user dict".PHP_EOL;
  25. #打开源数据库
  26. $PDO_SRC = new PDO($src_db,_DB_USERNAME_,_DB_PASSWORD_,array(
  27. PDO::ATTR_PERSISTENT=>true,
  28. PDO::SQLITE_ATTR_OPEN_FLAGS => PDO::SQLITE_OPEN_READONLY
  29. ));
  30. $PDO_SRC->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  31. echo "open src".PHP_EOL;
  32. #打开目标数据库
  33. $PDO_DEST = new PDO($dest_db,_DB_USERNAME_,_DB_PASSWORD_,array(PDO::ATTR_PERSISTENT=>true));
  34. $PDO_DEST->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  35. echo "open dest".PHP_EOL;
  36. #删除目标数据表中全部数据
  37. fwrite(STDOUT,"delete dest".PHP_EOL);
  38. $query = "delete from $dest_table where true;";
  39. $stmtDest = $PDO_DEST->prepare($query);
  40. $stmtDest->execute();
  41. $queryInsert = "INSERT INTO ".$dest_table."
  42. (
  43. id,
  44. word ,
  45. type,
  46. grammar,
  47. parent,
  48. mean,
  49. note,
  50. factors,
  51. factormean,
  52. status,
  53. source,
  54. language,
  55. confidence,
  56. creator_id,
  57. ref_counter,
  58. create_time,
  59. created_at,
  60. updated_at
  61. )
  62. VALUES (? , ? , ? , ? ,? ,? ,? ,? ,? ,? ,? , ?,?,?,?,?,to_timestamp(?),to_timestamp(?))";
  63. echo "read from orginal".PHP_EOL;
  64. $commitData = [];
  65. $allInsertCount = 0;
  66. $allSrcCount = 0;
  67. $count = 0;
  68. #从源数据表中读取
  69. $query = "SELECT * FROM ".$src_table;
  70. $stmtSrc = $PDO_SRC->prepare($query);
  71. $stmtSrc->execute();
  72. while($srcData = $stmtSrc->fetch(PDO::FETCH_ASSOC)){
  73. $allSrcCount++;
  74. #插入目标表
  75. if(empty($srcData["pali"]) ){
  76. fwrite(STDERR,"pali is null id=".$srcData["id"].PHP_EOL);
  77. fputcsv($fpError,$srcData);
  78. continue;
  79. }
  80. if(empty($srcData["creator"]) ){
  81. fwrite(STDERR,"creator is null id=".$srcData["id"].PHP_EOL);
  82. fputcsv($fpError,$srcData);
  83. continue;
  84. }
  85. $commitData[] = array(
  86. $snowflake->id(),
  87. $srcData["pali"],
  88. $srcData["type"],
  89. $srcData["gramma"],
  90. $srcData["parent"],
  91. $srcData["mean"],
  92. $srcData["note"],
  93. $srcData["factors"],
  94. $srcData["factormean"],
  95. $srcData["status"],
  96. "_SYS_USER_WBW_",
  97. $srcData["language"],
  98. $srcData["confidence"],
  99. $srcData["creator"],
  100. $srcData["ref_counter"],
  101. $srcData["time"],
  102. $srcData["time"],
  103. $srcData["time"]
  104. );
  105. $count++;
  106. $allInsertCount++;
  107. if($count ==10000){
  108. #10000行插入一次
  109. $PDO_DEST->beginTransaction();
  110. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  111. foreach ($commitData as $key => $value) {
  112. try{
  113. $stmtDEST->execute($value);
  114. }catch(PDOException $e){
  115. fwrite(STDERR,$e->getMessage().PHP_EOL);
  116. fwrite(STDERR,implode(',',$value).PHP_EOL);
  117. exit;
  118. }
  119. }
  120. // 提交更改
  121. $PDO_DEST->commit();
  122. $commitData = [];
  123. echo "finished $count".PHP_EOL;
  124. $count=0;
  125. }
  126. if($allSrcCount % 10000 ==0){
  127. echo "find from src table $allSrcCount / $allInsertCount is new.".PHP_EOL;
  128. }
  129. }
  130. if($count>0){
  131. #最后的没有到10000的数据插入
  132. $PDO_DEST->beginTransaction();
  133. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  134. foreach ($commitData as $key => $value) {
  135. $stmtDEST->execute($value);
  136. }
  137. // 提交更改
  138. $PDO_DEST->commit();
  139. $commitData = [];
  140. echo "finished $count".PHP_EOL;
  141. }
  142. echo "insert done $allInsertCount in $allSrcCount ".PHP_EOL;
  143. # 更新索引表
  144. $src_table=_SQLITE_TABLE_DICT_WBW_INDEX_;#源表名
  145. $src_word_table=_SQLITE_TABLE_DICT_WBW_;#源word表名
  146. echo "migarating usr dict index ".PHP_EOL;
  147. // 开始一个事务,关闭自动提交
  148. $commitData = [];
  149. $allInsertCount = 0;
  150. $allSrcCount = 0;
  151. $count = 0;
  152. #从源数据表中读取
  153. $query = "SELECT * FROM ".$src_table;
  154. $stmtSrc = $PDO_SRC->prepare($query);
  155. $stmtSrc->execute();
  156. while($srcData = $stmtSrc->fetch(PDO::FETCH_ASSOC)){
  157. $allSrcCount++;
  158. #插入目标表
  159. $wordIndex = $srcData["word_index"];
  160. #查询目标表中的数据
  161. $queryExsit = "SELECT * FROM ".$src_word_table." WHERE id = ? ";
  162. $getWord = $PDO_SRC->prepare($queryExsit);
  163. $getWord->execute(array($wordIndex));
  164. $exist = $getWord->fetch(PDO::FETCH_ASSOC);
  165. if($exist){
  166. if(empty($srcData["user_id"]) ){
  167. fwrite(STDERR,"index user_id is null id=".$srcData["id"].PHP_EOL);
  168. fputcsv($fpError,$srcData);
  169. continue;
  170. }
  171. if(empty($exist["pali"]) ){
  172. fwrite(STDERR,"pali is null id=".$srcData["id"].PHP_EOL);
  173. continue;
  174. }
  175. if(empty($exist["creator"]) ){
  176. fwrite(STDERR,"creator is null id=".$srcData["id"].PHP_EOL);
  177. continue;
  178. }
  179. $commitData[] = array(
  180. $snowflake->id(),
  181. $exist["pali"],
  182. $exist["type"],
  183. $exist["gramma"],
  184. $exist["parent"],
  185. $exist["mean"],
  186. $exist["note"],
  187. $exist["factors"],
  188. $exist["factormean"],
  189. (int)$exist["status"],
  190. "_USER_WBW_",
  191. $exist["language"],
  192. $exist["confidence"],
  193. $srcData["user_id"],
  194. $exist["ref_counter"],
  195. $exist["time"],
  196. $exist["time"],
  197. $exist["time"]
  198. );
  199. $count++;
  200. $allInsertCount++;
  201. if($count === 10000){
  202. $PDO_DEST->beginTransaction();
  203. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  204. foreach ($commitData as $key => $value) {
  205. # code...
  206. $stmtDEST->execute($value);
  207. }
  208. // 提交更改
  209. $PDO_DEST->commit();
  210. $commitData = [];
  211. echo "finished $count".PHP_EOL;
  212. $count = 0;
  213. }
  214. }else{
  215. fwrite(STDERR,"error: no word index - $wordIndex".PHP_EOL);
  216. }
  217. if($allSrcCount % 10000 ==0){
  218. fwrite(STDOUT,"find from src table $allSrcCount / $allInsertCount is new.".PHP_EOL) ;
  219. }
  220. }
  221. if($count>0){
  222. #最后的没有到10000的数据插入
  223. $PDO_DEST->beginTransaction();
  224. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  225. foreach ($commitData as $key => $value) {
  226. # code...
  227. $stmtDEST->execute($value);
  228. }
  229. // 提交更改
  230. $PDO_DEST->commit();
  231. $commitData = [];
  232. fwrite(STDOUT,"finished $count".PHP_EOL);
  233. }
  234. fwrite(STDOUT,"insert done $allInsertCount in $allSrcCount ".PHP_EOL);
  235. fwrite(STDOUT, "all done in ".(time()-$start)."s".PHP_EOL);