20211204120600_wbw_blocks_copy.php 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. <?php
  2. /*
  3. 从旧数据表中提取数据插入到新的表
  4. 插入时用uuid判断是否曾经插入
  5. 曾经插入就不插入了
  6. */
  7. // Require Composer's autoloader.
  8. require_once __DIR__.'/../../../public/vendor/autoload.php';
  9. require_once __DIR__."/../../../public/app/config.php";
  10. require_once __DIR__."/../../../public/app/public/snowflakeid.php";
  11. set_exception_handler(function($e){
  12. fwrite(STDERR,"error-msg:".$e->getMessage().PHP_EOL);
  13. fwrite(STDERR,"error-file:".$e->getFile().PHP_EOL);
  14. fwrite(STDERR,"error-line:".$e->getLine().PHP_EOL);
  15. exit;
  16. });
  17. $fpError = fopen(__DIR__.'/log/'.basename($_SERVER['PHP_SELF'],'.php').".err.data.csv",'w');
  18. # 更新索引表
  19. #user info
  20. $user_db=_FILE_DB_USERINFO_;#user数据库
  21. $user_table=_TABLE_USER_INFO_;#user表名
  22. $src_db=_SQLITE_DB_USER_WBW_;#源数据库
  23. $src_table=_SQLITE_TABLE_USER_WBW_BLOCK_;#源表名
  24. $dest_db=_PG_DB_USER_WBW_;#目标数据库
  25. $dest_table=_PG_TABLE_USER_WBW_BLOCK_;#目标表名
  26. # 雪花id
  27. $snowflake = new SnowFlakeId();
  28. fwrite(STDOUT, "migarate wbw_block".PHP_EOL);
  29. #打开user数据库
  30. $PDO_USER = new PDO($user_db,_DB_USERNAME_,_DB_PASSWORD_,array(PDO::ATTR_PERSISTENT=>true));
  31. $PDO_USER->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  32. fwrite(STDOUT,"open user table".PHP_EOL);
  33. #打开源数据库
  34. $PDO_SRC = new PDO($src_db,_DB_USERNAME_,_DB_PASSWORD_,array(PDO::ATTR_PERSISTENT=>true));
  35. $PDO_SRC->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  36. fwrite(STDOUT, "open src".PHP_EOL);
  37. #打开目标数据库
  38. $PDO_DEST = new PDO($dest_db,_DB_USERNAME_,_DB_PASSWORD_,array(PDO::ATTR_PERSISTENT=>true));
  39. $PDO_DEST->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  40. fwrite(STDOUT, "open dest".PHP_EOL);
  41. // 开始一个事务,关闭自动提交
  42. fwrite(STDOUT, "begin Transaction".PHP_EOL);
  43. $queryInsert = "INSERT INTO ".$dest_table."
  44. (
  45. id,
  46. uid,
  47. parent_id ,
  48. channel_uid,
  49. parent_channel_uid,
  50. creator_uid,
  51. editor_id,
  52. book_id,
  53. paragraph,
  54. style,
  55. lang,
  56. status,
  57. create_time,
  58. modify_time,
  59. created_at,
  60. updated_at
  61. )
  62. VALUES ( ? , ? , ? , ? , ? , ? ,? , ? , ? , ? , ? , ? , ? , ? , to_timestamp(?),to_timestamp(?))";
  63. $commitData = [];
  64. $allInsertCount = 0;
  65. $allSrcCount = 0;
  66. $count = 0;
  67. #从源数据表中读取
  68. $query = "SELECT * FROM ".$src_table;
  69. $stmtSrc = $PDO_SRC->prepare($query);
  70. $stmtSrc->execute();
  71. #从user数据表中读取
  72. $query = "SELECT id ,userid FROM ".$user_table." WHERE userid = ? or username = ? ";
  73. $stmtUser = $PDO_USER->prepare($query);
  74. while($srcData = $stmtSrc->fetch(PDO::FETCH_ASSOC)){
  75. $allSrcCount++;
  76. if($srcData["owner"]=='test6'){
  77. $srcData["owner"] = 'f81c7140-64b4-4025-b58c-45a3b386324a';
  78. }
  79. if($srcData["owner"]=='test28'){
  80. $srcData["owner"] = 'df0ad9bc-c0cd-4cd9-af05-e43d23ed57f0';
  81. }
  82. if($srcData["owner"]=='290fd808-2f46-4b8c-b300-0367badd67ed'){
  83. $srcData["owner"] = 'f81c7140-64b4-4025-b58c-45a3b386324a';
  84. }
  85. if($srcData["owner"]=='BA837178-9ABD-4DD4-96A0-D2C21B756DC4'){
  86. $srcData["owner"] = 'ba5463f3-72d1-4410-858e-eadd10884713';
  87. }
  88. $stmtUser->execute(array($srcData["owner"],$srcData["owner"]));
  89. $userId = $stmtUser->fetch(PDO::FETCH_ASSOC);
  90. if(!$userId){
  91. fwrite(STDERR,"no user id {$srcData["owner"]}".PHP_EOL);
  92. continue;
  93. }
  94. #插入目标表
  95. if(empty($srcData["book"])){
  96. fwrite(STDERR,"book is null {$uuid}".PHP_EOL);
  97. continue;
  98. }
  99. if(substr($srcData["book"],0,1)==="p"){
  100. $srcData["book"] = substr($srcData["book"],1);
  101. }
  102. if(empty($srcData["paragraph"])){
  103. fwrite(STDERR,"paragraph is null {$uuid}".PHP_EOL);
  104. continue;
  105. }
  106. if(empty($srcData["parent_id"])){
  107. $srcData["parent_id"] = NULL;
  108. }
  109. if(empty($srcData["channal"])){
  110. $srcData["channal"] = NULL;
  111. }
  112. if(empty($srcData["status"])){
  113. $srcData["status"] = 10;
  114. }
  115. if(empty($srcData["create_time"]) || $srcData["create_time"]<1532590551000){
  116. $srcData["create_time"]=1532590551000;
  117. }
  118. if(empty($srcData["modify_time"]) || $srcData["modify_time"]<1532590551000){
  119. $srcData["modify_time"]=1532590551000;
  120. }
  121. $uuid = $srcData["id"];
  122. #查询目标表中是否有相同数据
  123. $queryExsit = "SELECT id FROM ".$dest_table." WHERE uid = ? ";
  124. $getExist = $PDO_DEST->prepare($queryExsit);
  125. $getExist->execute(array($uuid));
  126. $exist = $getExist->fetch(PDO::FETCH_ASSOC);
  127. if(!$exist){
  128. #没有相同数据
  129. if(strlen($srcData["parent_id"])>36){
  130. fwrite(STDERR, "parent_id too long ".$srcData["parent_id"].PHP_EOL);
  131. continue;
  132. }
  133. if(strlen($srcData["channal"])>36){
  134. fwrite(STDERR, "channal too long ".$srcData["channal"].PHP_EOL);
  135. continue;
  136. }
  137. if(strlen($srcData["parent_channel"])>36){
  138. fwrite(STDERR, "parent_channel too long ".$srcData["parent_channel"].PHP_EOL);
  139. continue;
  140. }
  141. if(strlen($srcData["owner"])>36){
  142. fwrite(STDERR, "owner too long ".$srcData["owner"].PHP_EOL);
  143. continue;
  144. }
  145. if(empty($srcData["book"]) || !is_numeric($srcData["book"])){
  146. fwrite(STDERR,"book is error id=".$uuid.PHP_EOL);
  147. fputcsv($fpError,$srcData);
  148. continue;
  149. }
  150. if(empty($srcData["paragraph"]) || !is_numeric($srcData["paragraph"])){
  151. fwrite(STDERR,"paragraph is error id=".$uuid.PHP_EOL);
  152. fputcsv($fpError,$srcData);
  153. continue;
  154. }
  155. $commitData[] = array(
  156. $snowflake->id(),
  157. $uuid,
  158. $srcData["parent_id"],
  159. $srcData["channal"],
  160. $srcData["parent_channel"],
  161. $userId["userid"],
  162. $userId["id"],
  163. $srcData["book"],
  164. $srcData["paragraph"],
  165. $srcData["style"],
  166. $srcData["lang"],
  167. $srcData["status"],
  168. $srcData["create_time"],
  169. $srcData["modify_time"],
  170. $srcData["create_time"]/1000,
  171. $srcData["modify_time"]/1000
  172. );
  173. $count++;
  174. $allInsertCount++;
  175. }
  176. if($count ==10000){
  177. #10000行插入一次
  178. $PDO_DEST->beginTransaction();
  179. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  180. foreach ($commitData as $key => $value) {
  181. $stmtDEST->execute($value);
  182. }
  183. // 提交更改
  184. $PDO_DEST->commit();
  185. $commitData = [];
  186. fwrite(STDOUT, "finished $count".PHP_EOL);
  187. $count=0;
  188. }
  189. }
  190. if($count>0){
  191. #最后的剩余的数据插入
  192. $PDO_DEST->beginTransaction();
  193. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  194. foreach ($commitData as $key => $value) {
  195. $stmtDEST->execute($value);
  196. }
  197. // 提交更改
  198. $PDO_DEST->commit();
  199. $commitData = [];
  200. fwrite(STDOUT, "finished $count".PHP_EOL);
  201. }
  202. fwrite(STDOUT,"insert done $allInsertCount in $allSrcCount ".PHP_EOL);
  203. fwrite(STDOUT,"all done".PHP_EOL);
  204. fclose($fpError);