DataX源码解析与插件开发
-
DataX是个啥
-
框架设计
-
源码下载与工程编译
-
Job&Task概念
-
物理运行模型
-
源码解析(不包括重入锁和有界阻塞队列)
-
插件开发和调试
-
插件部署
-
Datax源码解析与Writer插件开发
Datax是个啥
DataX 是阿里巴巴集团内被广泛使用的
离线数据
同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种
异构数据源之间
高效的数据同步功能。
框架设计
DataX本身作为离线数据同步框架,采用
Framework + plugin
架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
-
Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
-
Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
-
Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理
缓冲,流控,并发,数据转换
等核心技术问题。
源码下载和工程编译
源码下载地址:
https://github.com/alibaba/DataX
编译命令:
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
Job&Task概念
在DataX的逻辑模型中包括job、task两个维度,通过将job进行task拆分,然后将task合并到taskGroup进行运行。
-
job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报,但它并不做实际的数据同步操作。
-
Job: Job是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。比如:从一张mysql的表同步到odps的一个表的特定分区。
-
Task: Task是为最大化而把Job拆分得到的最小执行单元。比如:读一张有1024个分表的mysql表的Job,拆分成1024个读Task,用若干个并发执行。
-
TaskGroup: 描述的是一组Task集合。在同一个TaskGroupContainer执行下的Task集合称之为TaskGroup。
-
JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker。
-
TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker。
简而言之, Job拆分成Task,分别在框架提供的容器中执行,插件只需要实现Job和Task两部分逻辑。
物理运行模型
框架为插件提供物理上的执行能力(线程)。DataX框架有三种运行模式:
Standalone
: 单进程运行,没有外部依赖。
Local: 单进程运行,统计信息、错误信息汇报到集中存储。
Distrubuted: 分布式多进程运行,
依赖DataX Service服务
。
当然,上述三种模式对插件的编写而言没有什么区别,你只需要避开一些小错误,插件就能够在单机/分布式之间无缝切换了。 当JobContainer和TaskGroupContainer运行在同一个进程内时,就是单机模式(Standalone和Local);当它们分布在不同的进程中执行时,就是分布式(Distributed)模式。
源码解析
1、启动流程:
说明: 黄色表示 Job 部分的执行阶段,蓝色表示 Task 部分的执行阶段,绿色表示框架执行阶段。
2、工程入口:
DataX的入口是
Engine
这个类,直接进去看它的
main
方法,main方法的主要功能就是接收传入的运行参数,然后运行Engine的
entry
方法来启动整个工程;
运行参数:
-mode standalone
-jobid -1
-job C:\Users\Administrator\Desktop\19年新功能\datax-json\test_datax.json
main方法:
public static void main(String[] args) throws Exception {
int exitCode = 0;
try {
LOG.info("##### 请求参数 #####");
for(String arg:args){
LOG.info(arg);
}
LOG.info("##### 请求参数 #####");
System.setProperty("datax.home","D:\\Code\\ideaCode\\DataX-master\\target\\datax\\datax");
Engine.entry(args);
} catch (Throwable e) {
exitCode = 1;
LOG.error("经DataX智能分析,该任务最可能的错误原因是:" + ExceptionTracker.trace(e));
if (e instanceof DataXException) {
DataXException tempException = (DataXException) e;
ErrorCode errorCode = tempException.getErrorCode();
if (errorCode instanceof FrameworkErrorCode) {
FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode;
exitCode = tempErrorCode.toExitValue();
}
}
System.exit(exitCode);
}
System.exit(exitCode);
}
3、Entry方法工作:
主要做了2件事情,运行
ConfigParser.parse(String path)
方法解析生成configration生成一个新的Engine然后启动Engine的
start()
方法;
public static void entry(final String[] args) throws Throwable {
Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");
BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);
String jobPath = cl.getOptionValue("job");
// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode");
/*
1. 解析job.json的数据抽取文件,生成一个configuration
整个解析配置文件的过程分为三部分
1.解析job的配置信息,由启动参数指定job.json文件。
2. 解析DataX自带配置信息,由默认指定的core.json文件。
3.解析读写插件配置信息,由job.json指定的reader和writer插件信息
4.加载对应的插件信息
*/
Configuration configuration = ConfigParser.parse(jobPath);
long jobId;
if (!"-1".equalsIgnoreCase(jobIdString)) {
jobId = Long.parseLong(jobIdString);
} else {
// only for dsc & ds & datax 3 update
String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml";
String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";
String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";
List<String> patternStringList = Arrays.asList(dscJobUrlPatternString,
dsJobUrlPatternString, dsTaskGroupUrlPatternString);
jobId = parseJobIdFromUrl(patternStringList, jobPath);
}
boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
if (!isStandAloneMode && jobId == -1) {
// 如果不是 standalone 模式,那么 jobId 一定不能为-1
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId.");
}
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
//打印vmInfo
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
LOG.info(vmInfo.toString());
}
LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");
LOG.debug(configuration.toJSON());
ConfigurationValidate.doValidate(configuration);
//2. 新建一个engine, 然后调用start方法启动
Engine engine = new Engine();
engine.start(configuration);
}
解析job.json文件生成configuration的过程主要有三步:
-
解析job的配置信息,由启动参数指定job.json文件。
-
解析DataX自带配置信息,由默认指定的core.json文件。
-
解析读写插件配置信息,由job.json指定的reader和writer插件信息。
public static Configuration parse(final String jobPath) {
//1.解析job的配置信息,由启动参数指定job.json文件。
Configuration configuration = ConfigParser.parseJobConfig(jobPath);
//2. 解析DataX自带配置信息,由默认指定的core.json文件。
configuration.merge(
ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH),
false);
// todo config优化,只捕获需要的plugin
//3.解析读写插件配置信息,由job.json指定的reader和writer插件信息
String readerPluginName = configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
String writerPluginName = configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
String preHandlerName = configuration.getString(
CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
String postHandlerName = configuration.getString(
CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);
Set<String> pluginList = new HashSet<String>();
pluginList.add(readerPluginName);
pluginList.add(writerPluginName);
if(StringUtils.isNotEmpty(preHandlerName)) {
pluginList.add(preHandlerName);
}
if(StringUtils.isNotEmpty(postHandlerName)) {
pluginList.add(postHandlerName);
}
try {
// 4.加载对应的插件信息
configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
}catch (Exception e){
//吞掉异常,保持log干净。这里message足够。
LOG.warn(String.format("插件[%s,%s]加载失败,1s后重试... Exception:%s ", readerPluginName, writerPluginName, e.getMessage()));
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
//
}
configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
}
return configuration;
}
Configuration的配置文件:有job.json,core.json,plugin.json,以上代码的作用就是读取这三个配置文件并合并成项目需要的Configuration对象;
job.json
{
"job":{
"content":[{
"reader":{
"parameter":{
"password":"iflytek",
"column":[
"sed_account",
"rec_account",
"sed_zjhm",
"sed_name",
"rec_zjhm",
"rec_name",
"trans_type",
"channel",
"amount",
"detail",
"trans_date",
"increment_id"
],
"connection":[{
"jdbcUrl":[
"jdbc:mysql://xxxxx:3306/zstp"
],
"table":[
"r_transfer"
]
}],
"where":"increment_id>0 and increment_id<=44",
"splitPk":"increment_id",
"username":"root"
},
"name":"mysqlreader"
},
"writer":{
"parameter":{
"writerParams":{
"columnsMapping":{
"amount":{"name":"交易_xx"},
"trans_date":{"name":"交易_xx"},
"sed_zjhm":{"name":"交xx易_xx"},
"channel":{
"name":"交易_xx"},
"increment_id":{
"name":"交易_xxID"},
"sed_name":{
"name":"交易_xx"},
"rec_account":{
"name":"账号_xx",
"type":"end"},
"detail":{
"name":"交易_xx"},
"sed_name":{
"name":"交易_xx"},
"rec_account":{
"name":"账号_xx",
"type":"start"},
"rec_zjhm":{
"name":"交易_xx"},
"trans_type":{
"name":"交易_xx"}
},
"direction":"positive",
"graphConfigInfo":{
"cachedbCache":"true",
"cachedbCacheCleanWait":"20",
"cachedbCacheSize":"0.5",
"cachedbCacheTime":"180000",
"gremlinGraph":"org.janusgraph.core.JanusGraphFactory",
"indexSearchBackend":"elasticsearch",
"indexSearchClientOnly":"true",
"indexSearchHostname":"yyyyyyy",
"storageBackend":"hbase",
"storageBatchLoading":"true",
"storageHbaseTable":"JDModel",
"storageHostname":"yyyyyyyy"
},
"importType":"relation",
"insertType":"edge",
"isPeriod":"2",
"kafkaConf":{
"brokerHost":"yyyyyy:9092",
"dirtyDataTopic":"datax_plugin_task_dirtyData",
"staticTopic":"datax_plugin_task_importStatic"},
"moveDuplicate":"true",
"ownerId":"229061068735066112",
"sourceColumns":[
{"name":"sed_account","type":"String"},
{"name":"rec_account","type":"String"},
{"name":"sed_zjhm","type":"String"},
{"name":"sed_name","type":"String"},
{"name":"rec_zjhm","type":"String"},
{"name":"rec_name","type":"String"},
{"name":"trans_type","type":"String"},
{"name":"channel","type":"String"},
{"name":"amount","type":"Float"},
{"name":"detail","type":"String"},
{"name":"trans_date","type":"Date"},
{"name":"increment_id","type":"Integer"}
],
"tableName":"交易",
"taskId":"230918959959814144"
}
},
"name":"janusgraphwriter"
}
}],
"setting":{
"speed":{
"channel":4
}
}
}
}
core.json
{
"entry": {
"jvm": "-Xms1G -Xmx1G",
"environment": {}
},
"common": {
"column": {
"datetimeFormat": "yyyy-MM-dd HH:mm:ss",
"timeFormat": "HH:mm:ss",
"dateFormat": "yyyy-MM-dd",
"extraFormats":["yyyyMMdd"],
"timeZone": "GMT+8",
"encoding": "utf-8"
}
},
"core": {
"dataXServer": {
"address": "http://localhost:7001/api",
"timeout": 10000,
"reportDataxLog": false,
"reportPerfLog": false
},
"transport": {
"channel": {
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
"speed": {
"byte": -1,
"record": -1
},
"flowControlInterval": 20,
"capacity": 512,
"byteCapacity": 67108864
},
"exchanger": {
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
"bufferSize": 32
}
},
"container": {
"job": {
"reportInterval": 10000
},
"taskGroup": {
"channel": 5
},
"trace": {
"enable": "false"
}
},
"statistics": {
"collector": {
"plugin": {
"taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector",
"maxDirtyNumber": 10
}
}
}
}
}
plugin.json
{
"name": "mysqlreader",
"class": "com.alibaba.datax.plugin.reader.mysqlreader.MysqlReader",
"description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
"developer": "alibaba"
}
{
"name": "janusgraphwriter",
"class": "com.alibaba.datax.plugin.writer.janusgraphwriter.JanusgraphWriter",
"description": "useScene: dev. mechanism: via org.janusgraph.core.JanusGraphFactory.class connect Janusgraph to write data .",
"developer": "wangzhou"
}
合并之后的Configuration
{
"common": {
"column": {
"dateFormat": "yyyy-MM-dd",
"datetimeFormat": "yyyy-MM-dd HH:mm:ss",
"encoding": "utf-8",
"extraFormats": ["yyyyMMdd"],
"timeFormat": "HH:mm:ss",
"timeZone": "GMT+8"
}
},
"core": {
"container": {
"job": {
"id": -1,
"reportInterval": 10000
},
"taskGroup": {
"channel": 5
},
"trace": {
"enable": "false"
}
},
"dataXServer": {
"address": "http://localhost:7001/api",
"reportDataxLog": false,
"reportPerfLog": false,
"timeout": 10000
},
"statistics": {
"collector": {
"plugin": {
"maxDirtyNumber": 10,
"taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector"
}
}
},
"transport": {
"channel": {
"byteCapacity": 67108864,
"capacity": 512,
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
"flowControlInterval": 20,
"speed": {
"byte": -1,
"record": -1
}
},
"exchanger": {
"bufferSize": 32,
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger"
}
}
},
"entry": {
"jvm": "-Xms1G -Xmx1G"
},
"job": {
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["sed_account", "rec_account", "sed_zjhm", "sed_name", "rec_zjhm", "rec_name", "trans_type", "channel", "amount", "detail", "trans_date", "increment_id"],
"connection": [{
"jdbcUrl": ["jdbc:mysql://172.31.95.34:3306/zstp"],
"table": ["r_transfer"]
}],
"password": "iflytek",
"splitPk": "increment_id",
"username": "root",
"where": "increment_id>0 and increment_id<=44"
}
},
"writer": {
"name": "janusgraphwriter",
"parameter": {
"writerParams": {
"columnsMapping": {
"amount": {
"name": "交易_xx"
},
"channel": {
"name": "交易_xx"
},
"detail": {
"name": "交易_xx"
},
"increment_id": {
"name": "交易_xxID"
},
"rec_account": {
"name": "账号_xx",
"type": "start"
},
"rec_zjhm": {
"name": "交易_xx"
},
"sed_name": {
"name": "交易_xx"
},
"sed_zjhm": {
"name": "交易_xx"
},
"trans_date": {
"name": "交易_xx"
},
"trans_type": {
"name": "交易_xx"
}
},
"direction": "positive",
"graphConfigInfo": {
"cachedbCache": "true",
"cachedbCacheCleanWait": "20",
"cachedbCacheSize": "0.5",
"cachedbCacheTime": "180000",
"gremlinGraph": "org.janusgraph.core.JanusGraphFactory",
"indexSearchBackend": "elasticsearch",
"indexSearchClientOnly": "true",
"indexSearchHostname": "171.31.95.50",
"storageBackend": "hbase",
"storageBatchLoading": "true",
"storageHbaseTable": "JDModel",
"storageHostname": "171.31.95.50"
},
"importType": "relation",
"insertType": "edge",
"isPeriod": "2",
"kafkaConf": {
"brokerHost": "171.31.95.50:9092",
"dirtyDataTopic": "datax_plugin_task_dirtyData",
"staticTopic": "datax_plugin_task_importStatic"
},
"moveDuplicate": "true",
"ownerId": "229061068735066112",
"sourceColumns": [{
"name": "sed_account",
"type": "String"
}, {
"name": "rec_account",
"type": "String"
}, {
"name": "sed_zjhm",
"type": "String"
}, {
"name": "sed_name",
"type": "String"
}, {
"name": "rec_zjhm",
"type": "String"
}, {
"name": "rec_name",
"type": "String"
}, {
"name": "trans_type",
"type": "String"
}, {
"name": "channel",
"type": "String"
}, {
"name": "amount",
"type": "Float"
}, {
"name": "detail",
"type": "String"
}, {
"name": "trans_date",
"type": "Date"
}, {
"name": "increment_id",
"type": "Integer"
}],
"tableName": "交易",
"taskId": "230918959959814144"
}
}
}
}],
"setting": {
"speed": {
"channel": 4
}
}
},
"plugin": {
"reader": {
"mysqlreader": {
"class": "com.alibaba.datax.plugin.reader.mysqlreader.MysqlReader",
"description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
"developer": "alibaba",
"name": "mysqlreader",
"path": "D:\\Code\\ideaCode\\DataX-master\\target\\datax\\datax\\plugin\\reader\\mysqlreader"
}
},
"writer": {
"janusgraphwriter": {
"class": "com.alibaba.datax.plugin.writer.janusgraphwriter.JanusgraphWriter",
"description": "useScene: dev. mechanism: via org.janusgraph.core.JanusGraphFactory.class connect Janusgraph to write data .",
"developer": "wangzhou",
"name": "janusgraphwriter",
"path": "D:\\Code\\ideaCode\\DataX-master\\target\\datax\\datax\\plugin\\writer\\janusgraphwriter"
}
}
}
}
**Engine.start()**方法主要目的就是创建JobContainer对象,调用JobContainer.start()的方法启动JobContainer对象:
public void start(Configuration allConf) {
// 绑定column转换信息
ColumnCast.bind(allConf);
/**
* 初始化PluginLoader,可以获取各种插件配置
*/
LoadUtil.bind(allConf);
boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
//JobContainer会在schedule后再行进行设置和调整值
int channelNumber =0;
AbstractContainer container;
long instanceId;
int taskGroupId = -1;
if (isJob) {
allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
container = new JobContainer(allConf);
instanceId = allConf.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);
} else {
container = new TaskGroupContainer(allConf);
instanceId = allConf.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
taskGroupId = allConf.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
channelNumber = allConf.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
}
//缺省打开perfTrace
boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true);
boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);
//standlone模式的datax shell任务不进行汇报
if(instanceId == -1){
perfReportEnable = false;
}
int priority = 0;
try {
priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
}catch (NumberFormatException e){
LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
}
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
//初始化PerfTrace
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
container.start();
}
4、JobContainer.start():
JobContainer.start方法是整个框架的核心, 依次执行job的preHandler()、init()、prepare()、split()、schedule()、 post()、postHandle()等方法, 最重要的就是
init(),
split()
,
schedule()
,其主要过程如下:
-
执行job的preHandle()操作, 暂时不关注。
-
执行job的
init()
操作,需重点关注 。 -
执行job的prepare()操作, 涉及到初始化reader和writer插件的初始化,保存当前classLoader,并将当前线程的classLoader设置为所给classLoader,再将当前线程的类加载器设置为保存的类加载,通过调用插件的prepare()方法实现,每个插件都有自己的jarLoader,是通过集成URLClassloader实现而来, 暂时不关注。
-
执行job的
split()
操作, 通过adjustChannelNumber()方法调整channel个数,同时执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,达到切分后数目相等,才能满足1:1的通道模型;channel的计数主要是根据byte和record的限速来实现的,在split()的函数中第一步就是计算channel的大小;split()方法reader插件会根据channel的值进行拆分,但是有些reader插件可能不会参考channel的值,writer插件会完全根据reader的插件1:1进行返回;split()方法内部的mergeReaderAndWriterTaskConfigs()负责合并reader、writer、以及transformer三者关系,生成task的配置,并且重写job.content的配置, 需重点关注。 -
执行job的
schedule()
操作, 根据split()拆分生成的task配置分配生成taskGroup对象,根据task的数量和单个taskGroup支持的task数量进行配置,两者相除就可以得出taskGroup的数量;schdule()内部通过AbstractScheduler的schedule()执行,继续执行startAllTaskGroup()方法创建所有的TaskGroupContainer组织相关的task,TaskGroupContainerRunner负责运行TaskGroupContainer执行分配的task;taskGroupContainerExecutorService启动固定的线程池用以执行TaskGroupContainerRunner对象,TaskGroupContainerRunner的run()方法调用taskGroupContainer.start()方法,针对每个channel创建一个TaskExecutor,通过taskExecutor.doStart()启动任务 , 需重点关注。 -
执行job的post()和postHandle()操作,暂时不关注。
public void start() {
LOG.info("DataX jobContainer starts job.");
boolean hasException = false;
boolean isDryRun = false;
try {
this.startTimeStamp = System.currentTimeMillis();
isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
if(isDryRun) {
LOG.info("jobContainer starts to do preCheck ...");
this.preCheck();
} else {
userConf = configuration.clone();
LOG.debug("jobContainer starts to do preHandle ...");
this.preHandle();
LOG.debug("jobContainer starts to do init ...");
this.init();
LOG.info("jobContainer starts to do prepare ...");
this.prepare();
LOG.info("jobContainer starts to do split ...");
this.totalStage = this.split();
LOG.info("jobContainer starts to do schedule ...");
this.schedule();
LOG.debug("jobContainer starts to do post ...");
this.post();
LOG.debug("jobContainer starts to do postHandle ...");
this.postHandle();
LOG.info("DataX jobId [{}] completed successfully.", this.jobId);
this.invokeHooks();
}
} catch (Throwable e) {
LOG.error("Exception when job run", e);
hasException = true;
if (e instanceof OutOfMemoryError) {
this.destroy();
System.gc();
}
if (super.getContainerCommunicator() == null) {
// 由于 containerCollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containerCollector 进行初始化
AbstractContainerCommunicator tempContainerCollector;
// standalone
tempContainerCollector = new StandAloneJobContainerCommunicator(configuration);
super.setContainerCommunicator(tempContainerCollector);
}
Communication communication = super.getContainerCommunicator().collect();
// 汇报前的状态,不需要手动进行设置
// communication.setState(State.FAILED);
communication.setThrowable(e);
communication.setTimestamp(this.endTimeStamp);
Communication tempComm = new Communication();
tempComm.setTimestamp(this.startTransferTimeStamp);
Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
super.getContainerCommunicator().report(reportCommunication);
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
} finally {
if(!isDryRun) {
this.destroy();
this.endTimeStamp = System.currentTimeMillis();
if (!hasException) {
//最后打印cpu的平均消耗,GC的统计
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
vmInfo.getDelta(false);
LOG.info(vmInfo.totalString());
}
LOG.info(PerfTrace.getInstance().summarizeNoException());
this.logStatistics();
}
}
}
}
Job的初始化过程
init()方法中 涉及到根据configuration来 初始化reader和writer插件 ,这里涉及到通过 URLClassLoader实现类加载, jar包热加载以及调用插件init()操作方法,同时设置reader和writer的configuration信息, initJobReader()和initJobWriter()方法差不多,主要就是根据json配置文件获取插件具体的class,然后进行加载class,获取具体的插件对象(write类似),然后执行插件的init()方法并返回这个插件:
private void init() {
this.jobId = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);
if (this.jobId < 0) {
LOG.info("Set jobId = 0");
this.jobId = 0;
this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
this.jobId);
}
Thread.currentThread().setName("job-" + this.jobId);
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
this.getContainerCommunicator());
//必须先Reader ,后Writer
this.jobReader = this.initJobReader(jobPluginCollector);
this.jobWriter = this.initJobWriter(jobPluginCollector);
}
Job的split过程
spilt()主要做三件事:
-
adjustChannelNumber() 调整channel个数
-
doReaderSplit() & doWriterSplit()
-
mergeReaderAndWriterTaskConfigs
private int split() {
this.adjustChannelNumber();
if (this.needChannelNumber <= 0) {
this.needChannelNumber = 1;
}
System.out.println("**********通道数:"+needChannelNumber);
List<Configuration> readerTaskConfigs = this
.doReaderSplit(this.needChannelNumber);
int taskNumber = readerTaskConfigs.size();
System.out.println("**********taskNumber:"+taskNumber);
List<Configuration> writerTaskConfigs = this
.doWriterSplit(taskNumber);
List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
/**
* 输入是reader和writer的parameter list,输出是content下面元素的list
*/
List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
readerTaskConfigs, writerTaskConfigs, transformerList);
LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig));
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);
return contentConfig.size();
}
adjustChannelNumber的过程根据按照字节限流和record限流计算channel的个数
private void adjustChannelNumber() {
int needChannelNumberByByte = Integer.MAX_VALUE;
int needChannelNumberByRecord = Integer.MAX_VALUE;
boolean isByteLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
if (isByteLimit) {
long globalLimitedByteSpeed = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);
// 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!
Long channelLimitedByteSpeed = this.configuration
.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) {
DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR,
"在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
}
needChannelNumberByByte =
(int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
needChannelNumberByByte =
needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes.");
}
boolean isRecordLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
if (isRecordLimit) {
long globalLimitedRecordSpeed = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);
Long channelLimitedRecordSpeed = this.configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0) {
DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,
"在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");
}
needChannelNumberByRecord =
(int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
needChannelNumberByRecord =
needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
}
// 取较小值
this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?
needChannelNumberByByte : needChannelNumberByRecord;
// 如果从byte或record上设置了needChannelNumber则退出
if (this.needChannelNumber < Integer.MAX_VALUE) {
return;
}
boolean isChannelLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
if (isChannelLimit) {
this.needChannelNumber = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);
LOG.info("Job set Channel-Number to " + this.needChannelNumber
+ " channels.");
return;
}
throw DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR,
"Job运行速度必须设置");
}
doReaderSplit & doWriterSplit的主要工作就是根据配置加载插件的类和jar包,然后调用插件内部的split()方法根据通道数对插件进行拆分,返回复制的插件配置信息, reader的通道数根据channel数确认,writer的split个数是根据reader的split个数确认的,以保证reader和writer的1:1的个数
private List<Configuration> doReaderSplit(int adviceNumber) {
LOG.info("*********** 传入reader的通道数:"+adviceNumber);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));
List<Configuration> readerSlicesConfigs =
this.jobReader.split(adviceNumber);
if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
"reader切分的task数目不能小于等于0");
}
LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.",
this.readerPluginName, readerSlicesConfigs.size());
classLoaderSwapper.restoreCurrentThreadClassLoader();
return readerSlicesConfigs;
}
private List<Configuration> doWriterSplit(int readerTaskNumber) {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName));
List<Configuration> writerSlicesConfigs = this.jobWriter
.split(readerTaskNumber);
if (writerSlicesConfigs == null || writerSlicesConfigs.size() <= 0) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
"writer切分的task不能小于等于0");
}
LOG.info("DataX Writer.Job [{}] splits to [{}] tasks.",
this.writerPluginName, writerSlicesConfigs.size());
classLoaderSwapper.restoreCurrentThreadClassLoader();
return writerSlicesConfigs;
}
//mergeReaderAndWriterTaskConfigs方法生成reader+writer的task的configuration
private List<Configuration> mergeReaderAndWriterTaskConfigs(
List<Configuration> readerTasksConfigs,
List<Configuration> writerTasksConfigs,
List<Configuration> transformerConfigs) {
if (readerTasksConfigs.size() != writerTasksConfigs.size()) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].",
readerTasksConfigs.size(), writerTasksConfigs.size())
);
}
List<Configuration> contentConfigs = new ArrayList<Configuration>();
for (int i = 0; i < readerTasksConfigs.size(); i++) {
Configuration taskConfig = Configuration.newDefault();
taskConfig.set(CoreConstant.JOB_READER_NAME,
this.readerPluginName);
taskConfig.set(CoreConstant.JOB_READER_PARAMETER,
readerTasksConfigs.get(i));
taskConfig.set(CoreConstant.JOB_WRITER_NAME,
this.writerPluginName);
taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,
writerTasksConfigs.get(i));
if(transformerConfigs!=null && transformerConfigs.size()>0){
taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
}
taskConfig.set(CoreConstant.TASK_ID, i);
contentConfigs.add(taskConfig);
}
return contentConfigs;
}
Job的schedule过程
Job的schedule的过程主要做了两件事,分别是
分组
和
启动
:
-
将task拆分成taskGroup,生成List taskGroupConfigs。
-
启动taskgroup的对象, scheduler.schedule(taskGroupConfigs)。
private void schedule() {
/**
* 这里的全局speed和每个channel的速度设置为B/s
*/
int channelsPerTaskGroup = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
int taskNumber = this.configuration.getList(
CoreConstant.DATAX_JOB_CONTENT).size();
this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
PerfTrace.getInstance().setChannelNumber(needChannelNumber);
/**
* 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务
*/
List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
this.needChannelNumber, channelsPerTaskGroup);
LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());
ExecuteMode executeMode = null;
AbstractScheduler scheduler;
try {
executeMode = ExecuteMode.STANDALONE;
scheduler = initStandaloneScheduler(this.configuration);
//设置 executeMode
for (Configuration taskGroupConfig : taskGroupConfigs) {
taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
}
if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) {
if (this.jobId <= 0) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
"在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 .");
}
}
LOG.info("Running by {} Mode.", executeMode);
this.startTransferTimeStamp = System.currentTimeMillis();
scheduler.schedule(taskGroupConfigs);
this.endTransferTimeStamp = System.currentTimeMillis();
} catch (Exception e) {
LOG.error("运行scheduler 模式[{}]出错.", executeMode);
this.endTransferTimeStamp = System.currentTimeMillis();
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
}
/**
* 检查任务执行情况
*/
this.checkLimit();
}
先来看看分组,在分组之前,先调整needChannelNumber的值,从上一步split()方法获得的needChannelNumber和task的数量,这两个值取一个小的重新赋值给needChannelNumber。然后进入这个assignFairly(),assignFamily方法主要做了三件事:
-
确定任务分组号taskGroupNumber, 根据task的数量和单个taskGroup支持的task数量进行配置,两者相除就可以得出taskGroup的数量
-
做分组分配
-
做分组优化
public static List<Configuration> assignFairly(Configuration configuration, int channelNumber, int channelsPerTaskGroup) {
Validate.isTrue(configuration != null, "框架获得的 Job 不能为 null.");
List<Configuration> contentConfig = configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
Validate.isTrue(contentConfig.size() > 0, "框架获得的切分后的 Job 无内容.");
Validate.isTrue(channelNumber > 0 && channelsPerTaskGroup > 0,
"每个channel的平均task数[averTaskPerChannel],channel数目[channelNumber],每个taskGroup的平均channel数[channelsPerTaskGroup]都应该为正数");
int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);
Configuration aTaskConfig = contentConfig.get(0);
String readerResourceMark = aTaskConfig.getString(CoreConstant.JOB_READER_PARAMETER + "." +
CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
String writerResourceMark = aTaskConfig.getString(CoreConstant.JOB_WRITER_PARAMETER + "." +
CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
boolean hasLoadBalanceResourceMark = StringUtils.isNotBlank(readerResourceMark) ||
StringUtils.isNotBlank(writerResourceMark);
if (!hasLoadBalanceResourceMark) {
// fake 一个固定的 key 作为资源标识(在 reader 或者 writer 上均可,此处选择在 reader 上进行 fake)
for (Configuration conf : contentConfig) {
conf.set(CoreConstant.JOB_READER_PARAMETER + "." +
CommonConstant.LOAD_BALANCE_RESOURCE_MARK, "aFakeResourceMarkForLoadBalance");
}
// 是为了避免某些插件没有设置 资源标识 而进行了一次随机打乱操作
Collections.shuffle(contentConfig, new Random(System.currentTimeMillis()));
}
LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap = parseAndGetResourceMarkAndTaskIdMap(contentConfig);
List<Configuration> taskGroupConfig = doAssign(resourceMarkAndTaskIdMap, configuration, taskGroupNumber);
// 调整 每个 taskGroup 对应的 Channel 个数(属于优化范畴)
adjustChannelNumPerTaskGroup(taskGroupConfig, channelNumber);
return taskGroupConfig;
}
doAssign()方法,这个方法主要就是根据传入的resourceMarkAndTaskIdMap和taskGroupNumber来把task分配到taskGroup中去。
private static List<Configuration> doAssign(LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap, Configuration jobConfiguration, int taskGroupNumber) {
List<Configuration> contentConfig = jobConfiguration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
Configuration taskGroupTemplate = jobConfiguration.clone();
taskGroupTemplate.remove(CoreConstant.DATAX_JOB_CONTENT);
List<Configuration> result = new LinkedList<Configuration>();
List<List<Configuration>> taskGroupConfigList = new ArrayList<List<Configuration>>(taskGroupNumber);
for (int i = 0; i < taskGroupNumber; i++) {
taskGroupConfigList.add(new LinkedList<Configuration>());
}
int mapValueMaxLength = -1;
List<String> resourceMarks = new ArrayList<String>();
for (Map.Entry<String, List<Integer>> entry : resourceMarkAndTaskIdMap.entrySet()) {
resourceMarks.add(entry.getKey());
if (entry.getValue().size() > mapValueMaxLength) {
mapValueMaxLength = entry.getValue().size();
}
}
int taskGroupIndex = 0;
for (int i = 0; i < mapValueMaxLength; i++) {
for (String resourceMark : resourceMarks) {
if (resourceMarkAndTaskIdMap.get(resourceMark).size() > 0) {
int taskId = resourceMarkAndTaskIdMap.get(resourceMark).get(0);
taskGroupConfigList.get(taskGroupIndex % taskGroupNumber).add(contentConfig.get(taskId));
taskGroupIndex++;
resourceMarkAndTaskIdMap.get(resourceMark).remove(0);
}
}
}
Configuration tempTaskGroupConfig;
for (int i = 0; i < taskGroupNumber; i++) {
tempTaskGroupConfig = taskGroupTemplate.clone();
tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i));
tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);
result.add(tempTaskGroupConfig);
}
return result;
}
assignFairly()中的adjustChannelNumPerTaskGroup(),这个方法主要就是把整除之后多余的余数个task的组多加一个channel,从而使整个分配最优化
private static void adjustChannelNumPerTaskGroup(List<Configuration> taskGroupConfig, int channelNumber) {
int taskGroupNumber = taskGroupConfig.size();
int avgChannelsPerTaskGroup = channelNumber / taskGroupNumber;
int remainderChannelCount = channelNumber % taskGroupNumber;
// 表示有 remainderChannelCount 个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup + 1;
// (taskGroupNumber - remainderChannelCount)个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup
int i = 0;
for (; i < remainderChannelCount; i++) {
taskGroupConfig.get(i).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup + 1);
}
for (int j = 0; j < taskGroupNumber - remainderChannelCount; j++) {
taskGroupConfig.get(i + j).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup);
}
}
接下来便是taskGroup对象的启动,也就是具体的Task数据抽取代码的启动,通过调用 **StandAloneScheduler的schedule()**方法,也就是AbstractScheduler.schedule(),而这个方法中最主要的就是调用了startAllTaskGroup(configurations)来启动所有任务组。
public void schedule(List<Configuration> configurations) {
Validate.notNull(configurations,
"scheduler配置不能为空");
int jobReportIntervalInMillSec = configurations.get(0).getInt(
CoreConstant.DATAX_CORE_CONTAINER_JOB_REPORTINTERVAL, 30000);
int jobSleepIntervalInMillSec = configurations.get(0).getInt(
CoreConstant.DATAX_CORE_CONTAINER_JOB_SLEEPINTERVAL, 10000);
this.jobId = configurations.get(0).getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
errorLimit = new ErrorRecordChecker(configurations.get(0));
/**
* 给 taskGroupContainer 的 Communication 注册
*/
this.containerCommunicator.registerCommunication(configurations);
int totalTasks = calculateTaskCount(configurations);
startAllTaskGroup(configurations);
Communication lastJobContainerCommunication = new Communication();
long lastReportTimeStamp = System.currentTimeMillis();
try {
while (true) {
/**
* step 1: collect job stat
* step 2: getReport info, then report it
* step 3: errorLimit do check
* step 4: dealSucceedStat();
* step 5: dealKillingStat();
* step 6: dealFailedStat();
* step 7: refresh last job stat, and then sleep for next while
*
* above steps, some ones should report info to DS
*
*/
Communication nowJobContainerCommunication = this.containerCommunicator.collect();
nowJobContainerCommunication.setTimestamp(System.currentTimeMillis());
LOG.debug(nowJobContainerCommunication.toString());
//汇报周期
long now = System.currentTimeMillis();
if (now - lastReportTimeStamp > jobReportIntervalInMillSec) {
Communication reportCommunication = CommunicationTool
.getReportCommunication(nowJobContainerCommunication, lastJobContainerCommunication, totalTasks);
this.containerCommunicator.report(reportCommunication);
lastReportTimeStamp = now;
lastJobContainerCommunication = nowJobContainerCommunication;
}
errorLimit.checkRecordLimit(nowJobContainerCommunication);
if (nowJobContainerCommunication.getState() == State.SUCCEEDED) {
LOG.info("Scheduler accomplished all tasks.");
break;
}
if (isJobKilling(this.getJobId())) {
dealKillingStat(this.containerCommunicator, totalTasks);
} else if (nowJobContainerCommunication.getState() == State.FAILED) {
dealFailedStat(this.containerCommunicator, nowJobContainerCommunication.getThrowable());
}
Thread.sleep(jobSleepIntervalInMillSec);
}
} catch (InterruptedException e) {
// 以 failed 状态退出
LOG.error("捕获到InterruptedException异常!", e);
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
}
}
整个框架的核心
startAllTaskGroup(configurations),用来执行taskGroup中的Task
TaskGroupContainerRunner负责运行TaskGroupContainer执行分配的task;taskGroupContainerExecutorService启动固定的线程池用以执行TaskGroupContainerRunner对象,TaskGroupContainerRunner的run()方法调用taskGroupContainer.start()方法,针对每个channel创建一个TaskExecutor,通过taskExecutor.doStart()启动任务
public void startAllTaskGroup(List<Configuration> configurations) {
//根据taskGroup的大小创建一个固定线程池用以执行TaskGroupContainerRunner对象
this.taskGroupContainerExecutorService = Executors
.newFixedThreadPool(configurations.size());
for (Configuration taskGroupConfiguration : configurations) {
//TaskGroupContainerRunner负责运行TaskGroupContainer执行分配的task
TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
}
this.taskGroupContainerExecutorService.shutdown();
}
private TaskGroupContainerRunner newTaskGroupContainerRunner(
Configuration configuration) {
TaskGroupContainer taskGroupContainer = new TaskGroupContainer(configuration);
return new TaskGroupContainerRunner(taskGroupContainer);
}
public class TaskGroupContainerRunner implements Runnable {
private TaskGroupContainer taskGroupContainer;
private State state;
public TaskGroupContainerRunner(TaskGroupContainer taskGroup) {
this.taskGroupContainer = taskGroup;
this.state = State.SUCCEEDED;
}
@Override
public void run() {
try {
Thread.currentThread().setName(
String.format("taskGroup-%d", this.taskGroupContainer.getTaskGroupId()));
this.taskGroupContainer.start();
this.state = State.SUCCEEDED;
} catch (Throwable e) {
this.state = State.FAILED;
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
}
}
public TaskGroupContainer getTaskGroupContainer() {
return taskGroupContainer;
}
public State getState() {
return state;
}
public void setState(State state) {
this.state = state;
}
}
可以看到最后线程执行的是 TaskGroupContaine.start()方法,它有两部分工作,首先创建TaskExecutor,生成reader和writer运行的线程,然后执行taskExecutor.doStart()方法调用线程run方法;去启动task的读写。
taskExecutor.doStart()方法调用 this.writerThread.start() 和this.readerThread.start() 也就是调用了插件Writer.Task和Reader.Task开始读写了。
public void start() {
try {
/**
* 状态check时间间隔,较短,可以把任务及时分发到对应channel中
*/
int sleepIntervalInMillSec = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_SLEEPINTERVAL, 100);
/**
* 状态汇报时间间隔,稍长,避免大量汇报
*/
long reportIntervalInMillSec = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_REPORTINTERVAL,
10000);
/**
* 2分钟汇报一次性能统计
*/
// 获取channel数目
int channelNumber = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
int taskMaxRetryTimes = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXRETRYTIMES, 1);
long taskRetryIntervalInMsec = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000);
long taskMaxWaitInMsec = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000);
List<Configuration> taskConfigs = this.configuration
.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
if(LOG.isDebugEnabled()) {
LOG.debug("taskGroup[{}]'s task configs[{}]", this.taskGroupId,
JSON.toJSONString(taskConfigs));
}
int taskCountInThisTaskGroup = taskConfigs.size();
LOG.info(String.format(
"taskGroupId=[%d] start [%d] channels for [%d] tasks.",
this.taskGroupId, channelNumber, taskCountInThisTaskGroup));
this.containerCommunicator.registerCommunication(taskConfigs);
Map<Integer, Configuration> taskConfigMap = buildTaskConfigMap(taskConfigs); //taskId与task配置
List<Configuration> taskQueue = buildRemainTasks(taskConfigs); //待运行task列表
Map<Integer, TaskExecutor> taskFailedExecutorMap = new HashMap<Integer, TaskExecutor>(); //taskId与上次失败实例
List<TaskExecutor> runTasks = new ArrayList<TaskExecutor>(channelNumber); //正在运行task
Map<Integer, Long> taskStartTimeMap = new HashMap<Integer, Long>(); //任务开始时间
long lastReportTimeStamp = 0;
Communication lastTaskGroupContainerCommunication = new Communication();
while (true) {
//1.判断task状态
boolean failedOrKilled = false;
Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap();
for(Map.Entry<Integer, Communication> entry : communicationMap.entrySet()){
Integer taskId = entry.getKey();
Communication taskCommunication = entry.getValue();
if(!taskCommunication.isFinished()){
continue;
}
TaskExecutor taskExecutor = removeTask(runTasks, taskId);
//上面从runTasks里移除了,因此对应在monitor里移除
taskMonitor.removeTask(taskId);
//失败,看task是否支持failover,重试次数未超过最大限制
if(taskCommunication.getState() == State.FAILED){
taskFailedExecutorMap.put(taskId, taskExecutor);
if(taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes){
taskExecutor.shutdown(); //关闭老的executor
containerCommunicator.resetCommunication(taskId); //将task的状态重置
Configuration taskConfig = taskConfigMap.get(taskId);
taskQueue.add(taskConfig); //重新加入任务列表
}else{
failedOrKilled = true;
break;
}
}else if(taskCommunication.getState() == State.KILLED){
failedOrKilled = true;
break;
}else if(taskCommunication.getState() == State.SUCCEEDED){
Long taskStartTime = taskStartTimeMap.get(taskId);
if(taskStartTime != null){
Long usedTime = System.currentTimeMillis() - taskStartTime;
LOG.info("taskGroup[{}] taskId[{}] is successed, used[{}]ms",
this.taskGroupId, taskId, usedTime);
//usedTime*1000*1000 转换成PerfRecord记录的ns,这里主要是简单登记,进行最长任务的打印。因此增加特定静态方法
PerfRecord.addPerfRecord(taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL,taskStartTime, usedTime * 1000L * 1000L);
taskStartTimeMap.remove(taskId);
taskConfigMap.remove(taskId);
}
}
}
// 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误
if (failedOrKilled) {
lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());
}
//3.有任务未执行,且正在运行的任务数小于最大通道限制
Iterator<Configuration> iterator = taskQueue.iterator();
while(iterator.hasNext() && runTasks.size() < channelNumber){
Configuration taskConfig = iterator.next();
Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
int attemptCount = 1;
TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);
if(lastExecutor!=null){
attemptCount = lastExecutor.getAttemptCount() + 1;
long now = System.currentTimeMillis();
long failedTime = lastExecutor.getTimeStamp();
if(now - failedTime < taskRetryIntervalInMsec){ //未到等待时间,继续留在队列
continue;
}
if(!lastExecutor.isShutdown()){ //上次失败的task仍未结束
if(now - failedTime > taskMaxWaitInMsec){
markCommunicationFailed(taskId);
reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
throw DataXException.asDataXException(CommonErrorCode.WAIT_TIME_EXCEED, "task failover等待超时");
}else{
lastExecutor.shutdown(); //再次尝试关闭
continue;
}
}else{
LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown",
this.taskGroupId, taskId, lastExecutor.getAttemptCount());
}
}
Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
taskStartTimeMap.put(taskId, System.currentTimeMillis());
taskExecutor.doStart();
iterator.remove();
runTasks.add(taskExecutor);
//上面,增加task到runTasks列表,因此在monitor里注册。
taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));
taskFailedExecutorMap.remove(taskId);
LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started",
this.taskGroupId, taskId, attemptCount);
}
//4.任务列表为空,executor已结束, 搜集状态为success--->成功
if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) {
// 成功的情况下,也需要汇报一次。否则在任务结束非常快的情况下,采集的信息将会不准确
lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId);
break;
}
// 5.如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报
long now = System.currentTimeMillis();
if (now - lastReportTimeStamp > reportIntervalInMillSec) {
lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
lastReportTimeStamp = now;
//taskMonitor对于正在运行的task,每reportIntervalInMillSec进行检查
for(TaskExecutor taskExecutor:runTasks){
taskMonitor.report(taskExecutor.getTaskId(),this.containerCommunicator.getCommunication(taskExecutor.getTaskId()));
}
}
Thread.sleep(sleepIntervalInMillSec);
}
//6.最后还要汇报一次
reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
} catch (Throwable e) {
Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
if (nowTaskGroupContainerCommunication.getThrowable() == null) {
nowTaskGroupContainerCommunication.setThrowable(e);
}
nowTaskGroupContainerCommunication.setState(State.FAILED);
this.containerCommunicator.report(nowTaskGroupContainerCommunication);
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
}finally {
if(!PerfTrace.getInstance().isJob()){
//最后打印cpu的平均消耗,GC的统计
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
vmInfo.getDelta(false);
LOG.info(vmInfo.totalString());
}
LOG.info(PerfTrace.getInstance().summarizeNoException());
}
}
}
TaskExecutor
class TaskExecutor {
private Configuration taskConfig;
private int taskId;
private int attemptCount;
private Channel channel;
private Thread readerThread;
private Thread writerThread;
private ReaderRunner readerRunner;
private WriterRunner writerRunner;
/**
* 该处的taskCommunication在多处用到:
* 1. channel
* 2. readerRunner和writerRunner
* 3. reader和writer的taskPluginCollector
*/
private Communication taskCommunication;
public TaskExecutor(Configuration taskConf, int attemptCount) {
// 获取该taskExecutor的配置
this.taskConfig = taskConf;
Validate.isTrue(null != this.taskConfig.getConfiguration(CoreConstant.JOB_READER)
&& null != this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER),
"[reader|writer]的插件参数不能为空!");
// 得到taskId
this.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID);
this.attemptCount = attemptCount;
/**
* 由taskId得到该taskExecutor的Communication
* 要传给readerRunner和writerRunner,同时要传给channel作统计用
*/
this.taskCommunication = containerCommunicator
.getCommunication(taskId);
Validate.notNull(this.taskCommunication,
String.format("taskId[%d]的Communication没有注册过", taskId));
this.channel = ClassUtil.instantiate(channelClazz,
Channel.class, configuration);
this.channel.setCommunication(this.taskCommunication);
/**
* 获取transformer的参数
*/
List<TransformerExecution> transformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig);
/**
* 生成writerThread
*/
writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
this.writerThread = new Thread(writerRunner,
String.format("%d-%d-%d-writer",
jobId, taskGroupId, this.taskId));
//通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器
this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.taskConfig.getString(
CoreConstant.JOB_WRITER_NAME)));
/**
* 生成readerThread
*/
readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);
this.readerThread = new Thread(readerRunner,
String.format("%d-%d-%d-reader",
jobId, taskGroupId, this.taskId));
/**
* 通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器
*/
this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.taskConfig.getString(
CoreConstant.JOB_READER_NAME)));
}
public void doStart() {
this.writerThread.start();
// reader没有起来,writer不可能结束
if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
this.taskCommunication.getThrowable());
}
this.readerThread.start();
// 这里reader可能很快结束
if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
// 这里有可能出现Reader线上启动即挂情况 对于这类情况 需要立刻抛出异常
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
this.taskCommunication.getThrowable());
}
}
ReaderRunner
public class ReaderRunner extends AbstractRunner implements Runnable {
private static final Logger LOG = LoggerFactory
.getLogger(ReaderRunner.class);
private RecordSender recordSender;
public void setRecordSender(RecordSender recordSender) {
this.recordSender = recordSender;
}
public ReaderRunner(AbstractTaskPlugin abstractTaskPlugin) {
super(abstractTaskPlugin);
}
@Override
public void run() {
assert null != this.recordSender;
Reader.Task taskReader = (Reader.Task) this.getPlugin();
//统计waitWriterTime,并且在finally才end。
PerfRecord channelWaitWrite = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WAIT_WRITE_TIME);
try {
channelWaitWrite.start();
LOG.debug("task reader starts to do init ...");
PerfRecord initPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_INIT);
initPerfRecord.start();
taskReader.init();
initPerfRecord.end();
LOG.debug("task reader starts to do prepare ...");
PerfRecord preparePerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_PREPARE);
preparePerfRecord.start();
taskReader.prepare();
preparePerfRecord.end();
LOG.debug("task reader starts to read ...");
PerfRecord dataPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DATA);
dataPerfRecord.start();
taskReader.startRead(recordSender);
recordSender.terminate();
dataPerfRecord.addCount(CommunicationTool.getTotalReadRecords(super.getRunnerCommunication()));
dataPerfRecord.addSize(CommunicationTool.getTotalReadBytes(super.getRunnerCommunication()));
dataPerfRecord.end();
LOG.debug("task reader starts to do post ...");
PerfRecord postPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_POST);
postPerfRecord.start();
taskReader.post();
postPerfRecord.end();
// automatic flush
// super.markSuccess(); 这里不能标记为成功,成功的标志由 writerRunner 来标志(否则可能导致 reader 先结束,而 writer 还没有结束的严重 bug)
} catch (Throwable e) {
LOG.error("Reader runner Received Exceptions:", e);
super.markFail(e);
} finally {
LOG.debug("task reader starts to do destroy ...");
PerfRecord desPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DESTROY);
desPerfRecord.start();
super.destroy();
desPerfRecord.end();
channelWaitWrite.end(super.getRunnerCommunication().getLongCounter(CommunicationTool.WAIT_WRITER_TIME));
long transformerUsedTime = super.getRunnerCommunication().getLongCounter(CommunicationTool.TRANSFORMER_USED_TIME);
if (transformerUsedTime > 0) {
PerfRecord transformerRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.TRANSFORMER_TIME);
transformerRecord.start();
transformerRecord.end(transformerUsedTime);
}
}
}
public void shutdown(){
recordSender.shutdown();
}
}
WriterRunner
public class WriterRunner extends AbstractRunner implements Runnable {
private static final Logger LOG = LoggerFactory
.getLogger(WriterRunner.class);
private RecordReceiver recordReceiver;
public void setRecordReceiver(RecordReceiver receiver) {
this.recordReceiver = receiver;
}
public WriterRunner(AbstractTaskPlugin abstractTaskPlugin) {
super(abstractTaskPlugin);
}
@Override
public void run() {
Validate.isTrue(this.recordReceiver != null);
Writer.Task taskWriter = (Writer.Task) this.getPlugin();
//统计waitReadTime,并且在finally end
PerfRecord channelWaitRead = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WAIT_READ_TIME);
try {
channelWaitRead.start();
LOG.debug("task writer starts to do init ...");
PerfRecord initPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_INIT);
initPerfRecord.start();
taskWriter.init();
initPerfRecord.end();
LOG.debug("task writer starts to do prepare ...");
PerfRecord preparePerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_PREPARE);
preparePerfRecord.start();
taskWriter.prepare();
preparePerfRecord.end();
LOG.debug("task writer starts to write ...");
PerfRecord dataPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_DATA);
dataPerfRecord.start();
taskWriter.startWrite(recordReceiver);
dataPerfRecord.addCount(CommunicationTool.getTotalReadRecords(super.getRunnerCommunication()));
dataPerfRecord.addSize(CommunicationTool.getTotalReadBytes(super.getRunnerCommunication()));
dataPerfRecord.end();
LOG.debug("task writer starts to do post ...");
PerfRecord postPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_POST);
postPerfRecord.start();
taskWriter.post();
postPerfRecord.end();
super.markSuccess();
} catch (Throwable e) {
LOG.error("Writer Runner Received Exceptions:", e);
super.markFail(e);
} finally {
LOG.debug("task writer starts to do destroy ...");
PerfRecord desPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_DESTROY);
desPerfRecord.start();
super.destroy();
desPerfRecord.end();
channelWaitRead.end(super.getRunnerCommunication().getLongCounter(CommunicationTool.WAIT_READER_TIME));
}
}
public boolean supportFailOver(){
Writer.Task taskWriter = (Writer.Task) this.getPlugin();
return taskWriter.supportFailOver();
}
public void shutdown(){
recordReceiver.shutdown();
}
}
5、DataX的数据传输
Reader插件和Writer插件之间也是通过channel来实现数据的传输的。channel可以是内存的,也可能是持久化的,插件不必关心。插件通过
RecordSender
往channel写入数据,通过
RecordReceiver
从channel读取数据。channel中的一条数据为一个
Record
的对象,Record中可以放多个Column对象,这可以简单理解为数据库中的记录和列。主要是 对column的操作和对占用内存的记录。
public class DefaultRecord implements Record {
private static final int RECORD_AVERGAE_COLUMN_NUMBER = 16;
private List<Column> columns;
private int byteSize;
// 首先是Record本身需要的内存
private int memorySize = ClassSize.DefaultRecordHead; //这个东西看似在这里没啥用,实际上是作为往channel里面写数据时候的重入锁的其中一个condition的计算条件
public DefaultRecord() {
this.columns = new ArrayList<Column>(RECORD_AVERGAE_COLUMN_NUMBER);
}
@Override
public void addColumn(Column column) {
columns.add(column);
incrByteSize(column);
}
@Override
public Column getColumn(int i) {
if (i < 0 || i >= columns.size()) {
return null;
}
return columns.get(i);
}
@Override
public void setColumn(int i, final Column column) {
if (i < 0) {
throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR,
"不能给index小于0的column设置值");
}
if (i >= columns.size()) {
expandCapacity(i + 1);
}
decrByteSize(getColumn(i));
this.columns.set(i, column);
incrByteSize(getColumn(i));
}
@Override
public String toString() {
Map<String, Object> json = new HashMap<String, Object>();
json.put("size", this.getColumnNumber());
json.put("data", this.columns);
return JSON.toJSONString(json);
}
@Override
public int getColumnNumber() {
return this.columns.size();
}
@Override
public int getByteSize() {
return byteSize;
}
public int getMemorySize(){
return memorySize;
}
private void decrByteSize(final Column column) {
if (null == column) {
return;
}
byteSize -= column.getByteSize();
//内存的占用是column对象的头 再加实际大小
memorySize = memorySize - ClassSize.ColumnHead - column.getByteSize();
}
private void incrByteSize(final Column column) {
if (null == column) {
return;
}
byteSize += column.getByteSize();
//内存的占用是column对象的头 再加实际大小
memorySize = memorySize + ClassSize.ColumnHead + column.getByteSize();
}
private void expandCapacity(int totalSize) {
if (totalSize <= 0) {
return;
}
int needToExpand = totalSize - columns.size();
while (needToExpand-- > 0) {
this.columns.add(null);
}
}
}
数据与channel的交互是通过调用
generateRunner
方法生成ReaderRunner和writerRunner开始的。
private AbstractRunner generateRunner(PluginType pluginType, List<TransformerExecution> transformerInfoExecs) {
AbstractRunner newRunner = null;
TaskPluginCollector pluginCollector;
switch (pluginType) {
case READER:
newRunner = LoadUtil.loadPluginRunner(pluginType,
this.taskConfig.getString(CoreConstant.JOB_READER_NAME));
newRunner.setJobConf(this.taskConfig.getConfiguration(
CoreConstant.JOB_READER_PARAMETER));
pluginCollector = ClassUtil.instantiate(
taskCollectorClass, AbstractTaskPluginCollector.class,
configuration, this.taskCommunication,
PluginType.READER);
RecordSender recordSender;
if (transformerInfoExecs != null && transformerInfoExecs.size() > 0) {
recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,this.taskCommunication ,pluginCollector, transformerInfoExecs);
} else {
recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
}
//将数据与线程绑定
((ReaderRunner) newRunner).setRecordSender(recordSender);
/**
* 设置taskPlugin的collector,用来处理脏数据和job/task通信
*/
newRunner.setTaskPluginCollector(pluginCollector);
break;
case WRITER:
newRunner = LoadUtil.loadPluginRunner(pluginType,
this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME));
newRunner.setJobConf(this.taskConfig
.getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));
pluginCollector = ClassUtil.instantiate(
taskCollectorClass, AbstractTaskPluginCollector.class,
configuration, this.taskCommunication,
PluginType.WRITER);
//将reader写入channel的数据与线程绑定
((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger(
this.channel, pluginCollector));
/**
* 设置taskPlugin的collector,用来处理脏数据和job/task通信
*/
newRunner.setTaskPluginCollector(pluginCollector);
break;
default:
throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR, "Cant generateRunner for:" + pluginType);
}
newRunner.setTaskGroupId(taskGroupId);
newRunner.setTaskId(this.taskId);
newRunner.setRunnerCommunication(this.taskCommunication);
return newRunner;
}
reader与channel交互
reader与channel的数据交互是通过BufferdRecordExchanher类来实现的
public class BufferedRecordExchanger implements RecordSender, RecordReceiver {
private final Channel channel;
private final Configuration configuration;
private final List<Record> buffer; //用来缓存数据,批量提交,通过buffersize来控制批量提交个数
private int bufferSize ;
protected final int byteCapacity;
private final AtomicInteger memoryBytes = new AtomicInteger(0);
private final AtomicInteger dirtyDataNum = new AtomicInteger(0); //脏数据数量
private final AtomicInteger totalDataNum = new AtomicInteger(0); //总数量
private int bufferIndex = 0;
private static Class<? extends Record> RECORD_CLASS;
private volatile boolean shutdown = false;
private final TaskPluginCollector pluginCollector;
@Override
public AtomicInteger getDirtyDataNum(){
return dirtyDataNum;
}
@Override
public AtomicInteger getTotalDataNum() {
return totalDataNum;
}
@SuppressWarnings("unchecked")
public BufferedRecordExchanger(final Channel channel, final TaskPluginCollector pluginCollector) {
assert null != channel;
assert null != channel.getConfiguration();
this.channel = channel;
this.pluginCollector = pluginCollector;
this.configuration = channel.getConfiguration();
this.bufferSize = configuration
.getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE);
this.buffer = new ArrayList<Record>(bufferSize);
//channel的queue默认大小为8M,原来为64M
this.byteCapacity = configuration.getInt(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024);
try {
BufferedRecordExchanger.RECORD_CLASS = ((Class<? extends Record>) Class
.forName(configuration.getString(
CoreConstant.DATAX_CORE_TRANSPORT_RECORD_CLASS,
"com.alibaba.datax.core.transport.record.DefaultRecord")));
} catch (Exception e) {
throw DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR, e);
}
}
@Override
public Record createRecord() {
try {
return BufferedRecordExchanger.RECORD_CLASS.newInstance();
} catch (Exception e) {
throw DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR, e);
}
}
@Override
//在reader的task.startRead方法中被调用,用来往channel中写数据
public void sendToWriter(Record record) {
if(shutdown){
throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
}
Validate.notNull(record, "record不能为空.");
if (record.getMemorySize() > this.byteCapacity) {
this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity)));
return;
}
boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
if (isFull) {
flush();
}
this.buffer.add(record);
this.bufferIndex++;
memoryBytes.addAndGet(record.getMemorySize());
}
@Override
public void flush() {
if(shutdown){
throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
}
this.channel.pushAll(this.buffer);
this.buffer.clear();
this.bufferIndex = 0;
this.memoryBytes.set(0);
}
@Override
public void terminate() {
if(shutdown){
throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
}
flush();
this.channel.pushTerminate(TerminateRecord.get());
}
@Override
public Record getFromReader() {
if(shutdown){
throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
}
boolean isEmpty = (this.bufferIndex >= this.buffer.size());
if (isEmpty) {
receive();
}
Record record = this.buffer.get(this.bufferIndex++);
if (record instanceof TerminateRecord) {
record = null;
}
return record;
}
@Override
public void shutdown(){
shutdown = true;
try{
buffer.clear();
channel.clear();
}catch(Throwable t){
t.printStackTrace();
}
}
private void receive() {
this.channel.pullAll(this.buffer);
this.bufferIndex = 0;
this.bufferSize = this.buffer.size();
}
}
下面我们以mysqlReader为例看看在startReader方法中如何与channel交互
public void startRead(RecordSender recordSender) {
int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
super.getTaskPluginCollector(), fetchSize);
}
public void startRead(Configuration readerSliceConfig,
RecordSender recordSender,
TaskPluginCollector taskPluginCollector, int fetchSize) {
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
String table = readerSliceConfig.getString(Key.TABLE);
PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);
LOG.info("Begin to read record by Sql: [{}\n] {}.",
querySql, basicMsg);
PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY);
queryPerfRecord.start();
Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,
username, password);
// session config .etc related
DBUtil.dealWithSessionConfig(conn, readerSliceConfig,
this.dataBaseType, basicMsg);
int columnNumber = 0;
ResultSet rs = null;
try {
rs = DBUtil.query(conn, querySql, fetchSize);
queryPerfRecord.end();
ResultSetMetaData metaData = rs.getMetaData();
columnNumber = metaData.getColumnCount();
//这个统计干净的result_Next时间
PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);
allResultPerfRecord.start();
long rsNextUsedTime = 0;
long lastTime = System.nanoTime();
while (rs.next()) {
rsNextUsedTime += (System.nanoTime() - lastTime);
this.transportOneRecord(recordSender, rs,
metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
lastTime = System.nanoTime();
}
allResultPerfRecord.end(rsNextUsedTime);
//目前大盘是依赖这个打印,而之前这个Finish read record是包含了sql查询和result next的全部时间
LOG.info("Finished read record by Sql: [{}\n] {}.",
querySql, basicMsg);
}catch (Exception e) {
throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
} finally {
DBUtil.closeDBResources(null, conn);
}
}
protected Record transportOneRecord(RecordSender recordSender, ResultSet rs,
ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding,
TaskPluginCollector taskPluginCollector) {
Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector);
recordSender.sendToWriter(record);
return record;
}
至此我们发现,reader与channel的交互是通过sendtoWriter方法来实现的,而此方法最后调用了抽象类 Channel的pushAll(this.buffer)方法,将数据push到channel的队列中,至此reader和channel的交互就完成了,意味着取数据的时候就是将队列中的数据pull出来。
public void pushAll(final Collection<Record> rs) {
Validate.notNull(rs);
Validate.noNullElements(rs);
this.doPushAll(rs);
this.statPush(rs.size(), this.getByteSize(rs));
}
@Override
//一个是重入锁,一个是数组实现的有界阻塞线程安全队列(为什么使用有界阻塞队列并且加上重入锁具体原理再说)
private ArrayBlockingQueue<Record> queue = null;
private ReentrantLock lock;
protected void doPushAll(Collection<Record> rs) {
try {
long startTime = System.nanoTime();
lock.lockInterruptibly();
int bytes = getRecordBytes(rs);
while (memoryBytes.get() + bytes > this.byteCapacity || rs.size() > this.queue.remainingCapacity()) {
notInsufficient.await(200L, TimeUnit.MILLISECONDS);
}
this.queue.addAll(rs);
waitWriterTime += System.nanoTime() - startTime;
memoryBytes.addAndGet(bytes);
notEmpty.signalAll();
} catch (InterruptedException e) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
} finally {
lock.unlock();
}
}
private void statPush(long recordSize, long byteSize) {
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
recordSize);
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
byteSize);
//在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数
currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
return;
}
long lastTimestamp = lastCommunication.getTimestamp();
long nowTimestamp = System.currentTimeMillis();
long interval = nowTimestamp - lastTimestamp;
if (interval - this.flowControlInterval >= 0) {
long byteLimitSleepTime = 0;
long recordLimitSleepTime = 0;
//channel的限流操作
if (isChannelByteSpeedLimit) {
long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
if (currentByteSpeed > this.byteSpeed) {
// 计算根据byteLimit得到的休眠时间
byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
- interval;
}
}
if (isChannelRecordSpeedLimit) {
long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
if (currentRecordSpeed > this.recordSpeed) {
// 计算根据recordLimit得到的休眠时间
recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
- interval;
}
}
// 休眠时间取较大值
long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
recordLimitSleepTime : byteLimitSleepTime;
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
lastCommunication.setTimestamp(nowTimestamp);
}
}
writer与channel交互
reader将数据写入channel的ArrayBlockingQueue队列中,那么writer是如何取到channel中的数据的呢?有个很巧妙的设计TaskExecutor将task执行线程通过channel成员变量和channel绑定到一起,
BufferedRecordExchanger
,
BufferedRecordExchanger 实现了 RecordSender, RecordReceiver,
class TaskExecutor {
private Configuration taskConfig;
private int taskId;
private int attemptCount;
private Channel channel; //绑定channel
private Thread readerThread;
private Thread writerThread;
private ReaderRunner readerRunner;
private WriterRunner writerRunner;
/**
* 该处的taskCommunication在多处用到:
* 1. channel
* 2. readerRunner和writerRunner
* 3. reader和writer的taskPluginCollector
*/
private Communication taskCommunication;
public TaskExecutor(Configuration taskConf, int attemptCount) {
// 获取该taskExecutor的配置
this.taskConfig = taskConf;
Validate.isTrue(null != this.taskConfig.getConfiguration(CoreConstant.JOB_READER)
&& null != this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER),
"[reader|writer]的插件参数不能为空!");
// 得到taskId
this.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID);
this.attemptCount = attemptCount;
/**
* 由taskId得到该taskExecutor的Communication
* 要传给readerRunner和writerRunner,同时要传给channel作统计用
*/
this.taskCommunication = containerCommunicator
.getCommunication(taskId);
Validate.notNull(this.taskCommunication,
String.format("taskId[%d]的Communication没有注册过", taskId));
this.channel = ClassUtil.instantiate(channelClazz,
Channel.class, configuration);
this.channel.setCommunication(this.taskCommunication);
/**
* 获取transformer的参数
*/
List<TransformerExecution> transformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig);
/**
* 生成writerThread
*/
writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
this.writerThread = new Thread(writerRunner,
String.format("%d-%d-%d-writer",
jobId, taskGroupId, this.taskId));
//通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器
this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.taskConfig.getString(
CoreConstant.JOB_WRITER_NAME)));
/**
* 生成readerThread
*/
readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);
this.readerThread = new Thread(readerRunner,
String.format("%d-%d-%d-reader",
jobId, taskGroupId, this.taskId));
/**
* 通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器
*/
this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.taskConfig.getString(
CoreConstant.JOB_READER_NAME)));
}
public void doStart() {
this.writerThread.start();
// reader没有起来,writer不可能结束
if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
this.taskCommunication.getThrowable());
}
this.readerThread.start();
// 这里reader可能很快结束
if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
// 这里有可能出现Reader线上启动即挂情况 对于这类情况 需要立刻抛出异常
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
this.taskCommunication.getThrowable());
}
}
private AbstractRunner generateRunner(PluginType pluginType) {
return generateRunner(pluginType, null);
}
private AbstractRunner generateRunner(PluginType pluginType, List<TransformerExecution> transformerInfoExecs) {
AbstractRunner newRunner = null;
TaskPluginCollector pluginCollector;
switch (pluginType) {
case READER:
newRunner = LoadUtil.loadPluginRunner(pluginType,
this.taskConfig.getString(CoreConstant.JOB_READER_NAME));
newRunner.setJobConf(this.taskConfig.getConfiguration(
CoreConstant.JOB_READER_PARAMETER));
pluginCollector = ClassUtil.instantiate(
taskCollectorClass, AbstractTaskPluginCollector.class,
configuration, this.taskCommunication,
PluginType.READER);
RecordSender recordSender;
if (transformerInfoExecs != null && transformerInfoExecs.size() > 0) {
recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,this.taskCommunication ,pluginCollector, transformerInfoExecs);
} else {
recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
}
((ReaderRunner) newRunner).setRecordSender(recordSender);
/**
* 设置taskPlugin的collector,用来处理脏数据和job/task通信
*/
newRunner.setTaskPluginCollector(pluginCollector);
break;
case WRITER:
newRunner = LoadUtil.loadPluginRunner(pluginType,
this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME));
newRunner.setJobConf(this.taskConfig
.getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));
pluginCollector = ClassUtil.instantiate(
taskCollectorClass, AbstractTaskPluginCollector.class,
configuration, this.taskCommunication,
PluginType.WRITER);
//将reader写过数据的channel与writerRunner绑定
((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger(
this.channel, pluginCollector));
/**
* 设置taskPlugin的collector,用来处理脏数据和job/task通信
*/
newRunner.setTaskPluginCollector(pluginCollector);
break;
default:
throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR, "Cant generateRunner for:" + pluginType);
}
newRunner.setTaskGroupId(taskGroupId);
newRunner.setTaskId(this.taskId);
newRunner.setRunnerCommunication(this.taskCommunication);
return newRunner;
}
// 检查任务是否结束
private boolean isTaskFinished() {
// 如果reader 或 writer没有完成工作,那么直接返回工作没有完成
if (readerThread.isAlive() || writerThread.isAlive()) {
return false;
}
if(taskCommunication==null || !taskCommunication.isFinished()){
return false;
}
return true;
}
private int getTaskId(){
return taskId;
}
private long getTimeStamp(){
return taskCommunication.getTimestamp();
}
private int getAttemptCount(){
return attemptCount;
}
private boolean supportFailOver(){
return writerRunner.supportFailOver();
}
private void shutdown(){
writerRunner.shutdown();
readerRunner.shutdown();
if(writerThread.isAlive()){
writerThread.interrupt();
}
if(readerThread.isAlive()){
readerThread.interrupt();
}
}
private boolean isShutdown(){
return !readerThread.isAlive() && !writerThread.isAlive();
}
}
}
以janusGrpahWriter为例子,看数据如何取出,查看WriterRunner的run方法,终止会盗用writer的startWriter(lineReceiver)方法,发现是调用了lineReceiver.getFromReader()方法来获取数据,最终调用channel的 pullAll方法获取reader对象push到队列中的数据;
public Record getFromReader() {
if(shutdown){
throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
}
boolean isEmpty = (this.bufferIndex >= this.buffer.size());
if (isEmpty) {
receive();
}
Record record = this.buffer.get(this.bufferIndex++);
if (record instanceof TerminateRecord) {
record = null;
}
return record;
}
private void receive() {
this.channel.pullAll(this.buffer);
this.bufferIndex = 0;
this.bufferSize = this.buffer.size();
}
public void pullAll(final Collection<Record> rs) {
Validate.notNull(rs);
this.doPullAll(rs);
this.statPull(rs.size(), this.getByteSize(rs));
}
@Override
protected void doPullAll(Collection<Record> rs) {
assert rs != null;
rs.clear();
try {
long startTime = System.nanoTime();
lock.lockInterruptibly();
while (this.queue.drainTo(rs, bufferSize) <= 0) {
notEmpty.await(200L, TimeUnit.MILLISECONDS);
}
waitReaderTime += System.nanoTime() - startTime;
int bytes = getRecordBytes(rs);
memoryBytes.addAndGet(-bytes);
notInsufficient.signalAll();
} catch (InterruptedException e) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
} finally {
lock.unlock();
}
}
至此,整个DataX运行的大概流程全部完成。
插件开发
整个datax执行流程清楚之后基本插件的开发就没有太大的难度了,大概工作就是重写 **Reader.startRead()**和 **Writer.startWriter()**方法,基本都是与业务相关的逻辑,此处不再赘述,开发流程见
阿里Datax插件开发文档
,演示见janusGraphWriter或elasticsearchWriter插件代码。