2
0

20211207171500_sent_historay_copy.php 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. <?php
  2. /*
  3. 迁移 sentence pr 库
  4. 从旧数据表中提取数据插入到新的表
  5. 插入时用uuid判断是否曾经插入
  6. 曾经插入就不插入了
  7. */
  8. require_once __DIR__."/../../../public/app/config.php";
  9. require_once __DIR__."/../../../public/app/public/snowflakeid.php";
  10. set_exception_handler(function($e){
  11. fwrite(STDERR,"error-msg:".$e->getMessage().PHP_EOL);
  12. fwrite(STDERR,"error-file:".$e->getFile().PHP_EOL);
  13. fwrite(STDERR,"error-line:".$e->getLine().PHP_EOL);
  14. exit;
  15. });
  16. $fpError = fopen(__DIR__.'/log/'.basename($_SERVER['PHP_SELF'],'.php').".err.data.csv",'w');
  17. $start = time();
  18. # 雪花id
  19. $snowflake = new SnowFlakeId();
  20. # 更新索引表
  21. $src_db=_SQLITE_DB_USER_SENTENCE_HISTORAY_;#源数据库
  22. $src_table=_SQLITE_TABLE_SENTENCE_HISTORAY_;#源表名
  23. $dest_db=_PG_DB_USER_SENTENCE_HISTORAY_;#目标数据库
  24. $dest_table=_PG_TABLE_SENTENCE_HISTORAY_;#目标表名
  25. fwrite(STDOUT,"migarate sent_historay".PHP_EOL);
  26. #打开源数据库
  27. $PDO_SRC = new PDO($src_db,_DB_USERNAME_,_DB_PASSWORD_,array(PDO::ATTR_PERSISTENT=>true));
  28. $PDO_SRC->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  29. fwrite(STDOUT,"open src table".PHP_EOL);
  30. #打开目标数据库
  31. $PDO_DEST = new PDO($dest_db,_DB_USERNAME_,_DB_PASSWORD_,array(PDO::ATTR_PERSISTENT=>true));
  32. $PDO_DEST->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  33. fwrite(STDOUT,"open dest table".PHP_EOL);
  34. $queryInsert = "INSERT INTO ".$dest_table."
  35. (
  36. id,
  37. sent_uid,
  38. user_uid,
  39. content,
  40. landmark,
  41. create_time,
  42. created_at)
  43. VALUES ( ? , ? , ? , ? , ? , ? , to_timestamp(?))";
  44. $commitData = [];
  45. $allInsertCount = 0;
  46. $allSrcCount = 0;
  47. $count = 0;
  48. #从源数据表中读取
  49. $query = "SELECT * FROM ".$src_table;
  50. $stmtSrc = $PDO_SRC->prepare($query);
  51. $stmtSrc->execute();
  52. while($srcData = $stmtSrc->fetch(PDO::FETCH_ASSOC)){
  53. $allSrcCount++;
  54. #插入目标表
  55. #查询目标表中是否有相同数据
  56. $queryExsit = "SELECT id FROM ".$dest_table." WHERE sent_uid = ? and user_uid = ? and content=? and create_time=? ";
  57. $getExist = $PDO_DEST->prepare($queryExsit);
  58. $getExist->execute(array($srcData["sent_id"],$srcData["user_id"],$srcData["text"],$srcData["date"]));
  59. $exist = $getExist->fetch(PDO::FETCH_ASSOC);
  60. if(!$exist){
  61. #没有相同数据
  62. $commitData[] = array(
  63. $snowflake->id(),
  64. $srcData["sent_id"],
  65. $srcData["user_id"],
  66. $srcData["text"],
  67. $srcData["landmark"],
  68. $srcData["date"],
  69. $srcData["date"]/1000
  70. );
  71. $count++;
  72. $allInsertCount++;
  73. }
  74. if($count ==10000){
  75. #10000行插入一次
  76. // 开始一个事务,关闭自动提交
  77. $PDO_DEST->beginTransaction();
  78. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  79. foreach ($commitData as $key => $value) {
  80. $stmtDEST->execute($value);
  81. }
  82. // 提交更改
  83. $PDO_DEST->commit();
  84. $commitData = [];
  85. echo "finished $count".PHP_EOL;
  86. $count=0;
  87. }
  88. if($allSrcCount % 10000 ==0){
  89. echo "find from src table $allSrcCount / $allInsertCount is new.".PHP_EOL;
  90. }
  91. }
  92. if($count>0){
  93. #最后的没有到10000的数据插入
  94. $PDO_DEST->beginTransaction();
  95. $stmtDEST = $PDO_DEST->prepare($queryInsert);
  96. foreach ($commitData as $key => $value) {
  97. $stmtDEST->execute($value);
  98. }
  99. // 提交更改
  100. $PDO_DEST->commit();
  101. $commitData = [];
  102. echo "finished $count".PHP_EOL;
  103. }
  104. echo "insert done $allInsertCount in $allSrcCount ".PHP_EOL;
  105. fwrite(STDOUT, "all done in ".(time()-$start)."s".PHP_EOL);