Kettle Job机制
1 Job机制
一个job项代表ETL控制流中的一项逻辑任务。Job项将会顺序执行,每个job项会产生一个结果,能作为别的分支上job项的条件。
1.1 Job类图简介
1.2 JobEntryInteface接口
JobEntryInterface是Job Entry插件的主要实现接口。主要包含以下功能:
1)
保存Job Entry设置
实现类使用私有变量保存设置的参数,通过get、set方法获取和设置。Dialog实现类会通过这些方法,保存或设置设置界面上的参数。同时,需要提供一个深度拷贝的方法,因为在一些保存参数且可能修改的地方会调用。
2) 序列化插件
插件要实现对本插件的序列化,实现两种方式xml与数据库。
3) 输出信息提供
一个job entry支持三种类型的输出:true、flase和无条件。这三种情况不是所有的job entry插件都会同时支持的,例如dummy job entry仅支持true和false。所以,插件必须显现两类函数,来查看支持哪种结果。
public booleanevaluates()//是否支持true、false
public booleanisUnconditional()//是否支持无条件执行
4) 执行任务
负责工作的执行。
public Resultexecute()
//执行具体的逻辑,需要结果和开始到该项的距离
prev_result.setNrErrors()//设置执行过程中的异常数
prev_result.setResult()//设置结果,如果不知道true/false,结果不设置
最后返回prev_result。
1.3 JobEntryDialogInteface接口
负责构建和打开参数设置对话框。Spoon通过调用open函数打开该对话框,spoon是使用swt框架的,所以对话框也应使用swt来实现。
2 Job entry交互通信类
Result: 每一个jobEntryInterface的实现类在完成相应功能时,返回结果的类型, 主要成员变量:
private booleanresult;执行是否出现异常
private intexitStatus; 执行结果状态
privateList<RowMetaAndData> rows;
一个jobEntry完成处理后的数据(若存在)
privateMap<String, ResultFile> resultFiles;
3 Job配置及开启
Job开启时序图
Job的开启与Trans相类似,配置执行的参数,检查.kjb文件是否发生变化,实例化一个Job对象,开启该线程。
4 Job执行
4.1 初始执行excute1( )
主要工作是从JobMeta的JobHopMeta找到job入口jobentry信息,根据开始条件调用真正执行jobentry的execute方法2,代码如下所示:
startpoint=jobMeta.findJobEntry(JobMeta.STRING_SPECIAL_START,0, false);// 找到Job开始组件
JobEntrySpecialjes = (JobEntrySpecial) startpoint.getEntry();
//JobEntrySpecial是启动job的job项目
Result res =null;
while ((jes.isRepeat() || isFirst) && !isStopped()){
//符合开始条件时,调用execute方法2
isFirst = false;
res = execute(0,null, startpoint, null,
Messages.getString(“Job.Reason.Started”));
}
4.2 实际执行excute2()
execute()方法包含,的参数有执行次数(START不算,从0开始,顺序执行)、接一个Entry执行结果、当前Entry的拷贝、前一个Entry拷贝和原因。
主要功能是根据参数startpoint,提取对应的jobentry,执行对应的jobentry操作,再根据JobMeta的hop信息依次得到下一个jobentry,递归调用。具体的执行步骤如下所示:
Job执行步骤:
5 JobEntry执行
5.1 JobEntry类
具体每个组件的执行体对应org.pentaho.di.job.entries包内每个entry的具体实现。
execute()方法2中调用jobEntry的execute()完成jobEntry的具体功能。
5.2不同jobEntry的实现
final
Resultresult = cloneJei.execute(prevResult, nr, rep,
this
);
不同的Job项目(JobEntry)实现差别很大。
JobEntrySpecial:
功能是开启一个job,只是简单地对传递来的preResult设置它的的result属性值为true,(Job项目据此判断前一结果执行完毕)。返回该对象即可。
JobEntryTableExit:
功能是判断一个table是否存在数据库中。JobEntryTableExitJob项目有属性tablename和DatabaseMeta(对数据库的元数据信息描述)根据DatabaseMeta得到一个Dabase对象 db,建立连接db.connect(); 调用db.checkTableExists(tablename)根据此返回值设置preResult的result属性为否为true。返回 preResult对象。
JobEntryTrans:
JobEntryJob和JobEntryTrans是嵌套job或trans的Job项目(JobEntry)。它们是比较复杂的job项目。
作用是执行一个trans。首先实例化一个TransMeta,之后实例化Trans。调用trans.start(),当执行完毕后调用函数trans.getResult(),并把结果加到preResult中,返回该对象即可。
补充说明:
Result中也可以有处理数据,这些处理数据可以作为下一个Job项目(JobEntry)的输入。但是容量受内存容量限制。