1、环境
- jdk1.8
- python2.7.x
-
datax下载
https://github.com/alibaba/DataX/blob/master/userGuid.md
windows推荐方法一下载解压即可,我的目录是D:\datax -
自检脚本
来到datx的bin目录下cmd执行
python datax.py ../job/job.json
出现这个页面就ok;乱码看着不舒服可以先执行
chcp 65001
更改一下字符编码;
2、配置示例
datax是通过json文件配置job的运行规则的
通过
python datax.py -r streamreader -w streamwriter
可获得示例脚本
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [],
"sliceRecordCount": ""
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
根据此模板在bin下新建stream.json文件
例如:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界====="
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
-
执行脚本
cd 到bin目录
python datax.py stream.json
成功打印即可。 -
实际配置在github上有具体的详解
3、读取mysql输出到控制台
新建
mysqlreader.json
{
"job": {
"setting": {
"speed": {
"channel": 3,
"interval": 30000,
"times": 0
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123qwe",
"column": ["*"],
"where":"day>=current_date()",
"splitPk": "",
"connection": [
{
"table": [
"sczb_30"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/poc?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&&nullCatalogMeansCurrent=true&allowMultiQueries=true&rewriteBatchedStatements=true"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
数据库的连接信息就不说了,
sczb_30
为要读取的表名,
column
为字段名,我这里配的
*
通配符,
where
可以做筛选条件,官网说不能跟
limit
,其他的
order
,
group
什么的我也没试过,没有
where
就是全量读取。
保存后执行
python datax.py mysqlreader.json
4、mysql同步到mysql
新建
mysql2mysql.json
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123qwe",
"splitPk": "",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/poc?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&&nullCatalogMeansCurrent=true&allowMultiQueries=true&rewriteBatchedStatements=true"
],
"querySql": [
"select * from TGEQUIPRECORDSUB where BEGINTIME>DATE_SUB(NOW(), INTERVAL 20 MINUTE);"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "update",
"username": "root",
"password": "root",
"column": [
"*"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
""
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.31.55:3306/datax?useUnicode=true&characterEncoding=utf8",
"table": [
"TGEQUIPRECORDSUB"
]
}
]
}
}
}
]
}
}
reader
是读,
writer
是写,这里要注意的是读取时我用了
querySql
来筛选数据,另外写入是
preSql
这里可以在写入之前执行sql,例如把表清空,但是要注意账号权限,
writeMode
是写入模式,insert是追加,update是有则更新无则追加。
执行测试前先看一下数据
源表:
我把这个字段由333改为444
目标表:
执行同步
python datax.py mysql2mysql.json
可以看到已经同步了。
5、定时执行达到延时同步的效果
新建bat脚本
job.bat
@echo off
title "job"
set INTERVAL=60000
::定时,单位是什么我给忘了,好像是秒?可以自己实验一下
timeout %INTERVAL%
:Again
for /F "tokens=1-3 delims=/ " %%d in ('date /t') do (
set "day=%%d"
set "month=%%e"
set "year=%%f"
)
d:
cd D:\datax\bin
:: 创建文件夹名,格式为YYYY-MM-DD
set "foldername=%year%-%month%-%day%"
:: 检查文件夹是否存在
if not exist "%foldername%" (
:: 创建文件夹
md "%foldername%"
) else (
echo 文件夹已存在,无需创建。
)
:: 这里的文件夹是日志输出文件按照日期划分
start /b cmd /k D: && cd D:\datax\bin && python datax.py ./mysql2mysql.json >> "%foldername%/test.log" 2>&1
echo %date% %time:~0,8%
timeout %INTERVAL%
ax.py ./mysql2mysql.json >> "%foldername%/test.log" 2>&1
echo %date% %time:~0,8%
timeout %INTERVAL%
goto Again
双击bat便可执行,在同目录下可观察到日志文件生成
看来单位是秒
但是怎么能容忍这个窗口的存在呢,看着巨丑,影响心情
新建
job.vbs
CreateObject("Wscript.Shell").Run "D:\datax\bin\job.bat", 0, False
以后只运行此脚本就行了,什么窗口也不弹出,但是日志文件在持续增加。
6、结语
一定要避免全量同步,要根据字段进行增量更新
多个任务要写多个脚本,但是都能集成到同一个bat脚本里启动
其他数据库同步都大差不差