Datax实现异构数据库全量同步和增量同步

  • Post author:
  • Post category:其他




1、环境

  • jdk1.8
  • python2.7.x
  • datax下载


    https://github.com/alibaba/DataX/blob/master/userGuid.md




    windows推荐方法一下载解压即可,我的目录是D:\datax
  • 自检脚本

    来到datx的bin目录下cmd执行

    python datax.py ../job/job.json




    出现这个页面就ok;乱码看着不舒服可以先执行

    chcp 65001

    更改一下字符编码;



2、配置示例

datax是通过json文件配置job的运行规则的

通过

python datax.py -r streamreader -w streamwriter

可获得示例脚本

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column": [],
                        "sliceRecordCount": ""
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

根据此模板在bin下新建stream.json文件

例如:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,你好,世界====="
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 5
       }
    }
  }
}
  • 执行脚本

    cd 到bin目录

    python datax.py stream.json




    成功打印即可。
  • 实际配置在github上有具体的详解



3、读取mysql输出到控制台

新建

mysqlreader.json

{
 "job": {
    "setting": {
        "speed": {
             "channel": 3,
             "interval": 30000,
             "times": 0
         },
         "errorLimit": {
         "record": 0,
         "percentage": 0.02
        }
    },
     "content": [
            {
         "reader": {
             "name": "mysqlreader",
             "parameter": {
             "username": "root",
             "password": "123qwe",
             "column": ["*"],
             "where":"day>=current_date()",
             "splitPk": "",
             "connection": [
                            {
                             "table": [
                                 "sczb_30"
                               ],
             "jdbcUrl": [
             "jdbc:mysql://127.0.0.1:3306/poc?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&&nullCatalogMeansCurrent=true&allowMultiQueries=true&rewriteBatchedStatements=true"
                                ]
                            }
                        ]
                    }
                },
             "writer": {
                 "name": "streamwriter",
                 "parameter": {
                     "print": true
                    }
                }
            }
        ]
    }
}

数据库的连接信息就不说了,

sczb_30

为要读取的表名,

column

为字段名,我这里配的

*

通配符,

where

可以做筛选条件,官网说不能跟

limit

,其他的

order

,

group

什么的我也没试过,没有

where

就是全量读取。

保存后执行

python datax.py mysqlreader.json




4、mysql同步到mysql

新建

mysql2mysql.json

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123qwe",
                        "splitPk": "",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://127.0.0.1:3306/poc?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&&nullCatalogMeansCurrent=true&allowMultiQueries=true&rewriteBatchedStatements=true"
                                ],
                                "querySql": [
                                    "select * from TGEQUIPRECORDSUB where BEGINTIME>DATE_SUB(NOW(), INTERVAL 20 MINUTE);"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "update",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "*"
                        ],
                        "session": [
                            "set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            ""
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.31.55:3306/datax?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "TGEQUIPRECORDSUB"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}


reader

是读,

writer

是写,这里要注意的是读取时我用了

querySql

来筛选数据,另外写入是

preSql

这里可以在写入之前执行sql,例如把表清空,但是要注意账号权限,

writeMode

是写入模式,insert是追加,update是有则更新无则追加。

执行测试前先看一下数据

源表:



我把这个字段由333改为444

目标表:



执行同步

python datax.py mysql2mysql.json




可以看到已经同步了。



5、定时执行达到延时同步的效果

新建bat脚本

job.bat

@echo off
title "job"
set INTERVAL=60000
::定时,单位是什么我给忘了,好像是秒?可以自己实验一下
timeout %INTERVAL%
 
:Again
 
for /F "tokens=1-3 delims=/ " %%d in ('date /t') do (
    set "day=%%d"
    set "month=%%e"
    set "year=%%f"
)
d:
cd D:\datax\bin
:: 创建文件夹名,格式为YYYY-MM-DD
set "foldername=%year%-%month%-%day%"

:: 检查文件夹是否存在
if not exist "%foldername%" (
    :: 创建文件夹
    md "%foldername%"
) else (
    echo 文件夹已存在,无需创建。
)

 :: 这里的文件夹是日志输出文件按照日期划分
start /b cmd /k D: && cd D:\datax\bin &&  python datax.py ./mysql2mysql.json >> "%foldername%/test.log" 2>&1
 
 
echo %date% %time:~0,8%
 
timeout %INTERVAL%
 
ax.py ./mysql2mysql.json >> "%foldername%/test.log" 2>&1
 
 
echo %date% %time:~0,8%
 
timeout %INTERVAL%
 
goto Again

双击bat便可执行,在同目录下可观察到日志文件生成





看来单位是秒


但是怎么能容忍这个窗口的存在呢,看着巨丑,影响心情


新建

job.vbs

CreateObject("Wscript.Shell").Run "D:\datax\bin\job.bat", 0, False

以后只运行此脚本就行了,什么窗口也不弹出,但是日志文件在持续增加。



6、结语

一定要避免全量同步,要根据字段进行增量更新

多个任务要写多个脚本,但是都能集成到同一个bat脚本里启动

其他数据库同步都大差不差



版权声明:本文为m0_66425707原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。