有时候我们在没有ETL工具的情况下,想实现简单的数据同步(ETL)功能,本文利用Oracle本身的功能(存储过程、函数、job等,不需要任何额外工具),从0到1搭建了一套ETL,可实现数据全量&增量同步、日志记录、自动调度等功能。
一、准备工作
1. 建立ETL日志记录表:用于记录ETL运行日志
CREATE TABLE ETL_LOG --ETL日志记录表
(TABLE_NAME VARCHAR2(100),
VIEW_NAME VARCHAR2(100) ,
UPDATE_CONDITION VARCHAR2(500),
PRO_NAME VARCHAR2(100),
START_TIMEKEY VARCHAR2(100),
END_TIMEKEY VARCHAR2(100),
INSERT_DATA NUMBER,
ETL_START_TIME VARCHAR2(100),
ETL_END_TIME VARCHAR2(100),
ETL_TIME NUMBER,
DELETE_CNT NUMBER,
UPDATE_CNT NUMBER,
INSERT_CNT NUMBER,
MESSAGE VARCHAR2(4000) ,
OWNER VARCHAR2(100)
);
2. 建立ETL增量值记录表:记录每个表的增量值,作为下次同步该表的起点
CREATE TABLE ETL_INCREMENTAL --增量值记录表
(TABLE_NAME VARCHAR2(100) ,
VIEW_NAME VARCHAR2(100) ,
UPDATE_CONDITION VARCHAR2(500) ,
INCREMENTAL VARCHAR2(100) ,
INTERFACE_TIME DATE
) ;
3. 建立getName函数:获取存储过程自身名称,用于日志记录
CREATE OR REPLACE FUNCTION P4SPOTUSER.getName
RETURN VARCHAR2
AS
l_owner varchar2(50);
l_name varchar2(50);
l_lineno number;
l_type varchar2(50);
BEGIN
OWA_UTIL.who_called_me (l_owner, l_name, l_lineno, l_type);
return l_owner || '.'|| l_name;
END;
二、ETL抽数程序(存储过程)建立
Oracle一般通过存储过程实现数据同步,根据特定同步需求,可在存储过程中做相应的实现,这里提供一个通用的存储过程,实现根据增量字段(一般为时间戳)增量抽取数据的功能,使用时只需传入目标表和源表(或试图)参数即可。
--创建一个通用的存储过程,实现增量抽取数据的功能。
CREATE OR REPLACE PROCEDURE PRO_COPY_BY_TIME_JO
(tablename in VARCHAR2, --目标表
viewname in VARCHAR2, --源表/视图
owner IN VARCHAR2 DEFAULT 'PUBLIC', --作者名字
starttimekey IN VARCHAR2 DEFAULT 'N', --同步开始时间值,取默认值时从增量表获取上次同步结束时间值
updatecondition IN VARCHAR2 DEFAULT '1=1', --同步数据的条件,用于where条件中限制源表数据,默认为不限制
endtimekey IN VARCHAR2 DEFAULT TO_CHAR(SYSDATE-2/24/60,'YYYY-MM-DD HH24:MI:SS') --同步结束时间值,默认同步到2分钟前的数据
)
AS
------------------1. 参数申明部分------------------
---------ETL LOG记录--与数据无关------------------------
DELETE_CNT NUMBER :=0 ;
UPDATE_CNT NUMBER :=0 ;
INSERT_CNT NUMBER :=0;
ETLBEGIN_TIME DATE := SYSDATE ;
ETLEND_TIME DATE ;
ETL_STATEMENT VARCHAR2(4000);
ETL_TIME NUMBER; --记录ETL执行时间,单位为秒
---------ETL参数定义---------------------------
LAST_TIMEKEY VARCHAR2(50); --上次增长点时间参数
CURRENT_TIMEKEY VARCHAR2(50); --本次增长点时间参数
sql_insert VARCHAR2(1000); --复制表字符串
sql_max_time VARCHAR2(1000); --查询最大时间字符串
INCREMENTALCOUNT NUMBER :=0 ;
BEGIN
------------------2. 逻辑处理部分------------------
DBMS_OUTPUT.PUT_LINE ('ETLBEGIN_TIME:'||TO_CHAR(ETLBEGIN_TIME,'YYYY/MM/DD HH24:MI:SS')) ;
--给LAST_TIMEKEY赋值(上次插入数据的最大UPDATE_TIMEKEY,格式同'2023-04-01 07:30:00')
IF starttimekey ='N' THEN
SELECT NVL(MAX(INCREMENTAL),TO_CHAR(ETLBEGIN_TIME-1/24,'YYYY-MM-DD HH24:MI:SS')) INTO LAST_TIMEKEY FROM ETL_INCREMENTAL WHERE TABLE_NAME =tablename AND VIEW_NAME =viewname AND UPDATE_CONDITION= updatecondition ;
ELSE LAST_TIMEKEY := starttimekey;
END IF ;
DBMS_OUTPUT.PUT_LINE('LAST_TIMEKEY= '||LAST_TIMEKEY );
--生成插入数据的SQL语句
sql_insert := 'INSERT INTO '||tablename||' SELECT A.*, '||'TO_DATE('||CHR(39)||TO_CHAR(ETLBEGIN_TIME,'YYYY-MM-DD HH24:MI:SS')||CHR(39)||',''YYYY-MM-DD HH24:MI:SS'')'||' AS INTERFACE_TIME FROM '||viewname||' A WHERE UPDATE_TIMEKEY > '||CHR(39)|| LAST_TIMEKEY ||CHR(39)||' AND UPDATE_TIMEKEY <= '||CHR(39)|| endtimekey ||CHR(39)||' AND '||updatecondition;
execute immediate sql_insert; --执行SQL语句
INSERT_CNT:= SQL%ROWCOUNT; --记录SQL执行行数
IF INSERT_CNT > 0
THEN ETL_STATEMENT:= '抽取成功,插入数据:'||TO_CHAR(INSERT_CNT)||'条;'||ETL_STATEMENT;
ELSE ETL_STATEMENT:= '无数据更新'||ETL_STATEMENT;
END IF;
DBMS_OUTPUT.PUT_LINE(ETL_STATEMENT);
--执行查询当前TIMEKEY并写入CURRENT_TIMEKEY
sql_max_time := 'SELECT MAX(UPDATE_TIMEKEY) FROM '||tablename||' WHERE INTERFACE_TIME = '||'TO_DATE('||CHR(39)||TO_CHAR(ETLBEGIN_TIME,'YYYY-MM-DD HH24:MI:SS')||CHR(39)||',''YYYY-MM-DD HH24:MI:SS'')' ;
execute immediate sql_max_time INTO CURRENT_TIMEKEY;
DBMS_OUTPUT.PUT_LINE('CURRENT_TIMEKEY ='|| CURRENT_TIMEKEY);
--查看增量状态表ETL_INCREMENTAL是否有值
SELECT COUNT(*) INTO INCREMENTALCOUNT FROM ETL_INCREMENTAL WHERE TABLE_NAME=tablename AND VIEW_NAME = viewname AND UPDATE_CONDITION = updatecondition;
IF CURRENT_TIMEKEY IS NULL THEN DBMS_OUTPUT.PUT_LINE('CURRENT_TIMEKEY为空');
ELSIF INCREMENTALCOUNT=0 THEN--代表增量状态表还未记录数据
--插入增量状态表,表名 及 时间临界值INCREMENTAL
INSERT INTO ETL_INCREMENTAL VALUES(tablename,viewname,updatecondition,CURRENT_TIMEKEY,ETLBEGIN_TIME);
ELSE
--更新增量状态表时间临界值INCREMENTAL
UPDATE ETL_INCREMENTAL SET INCREMENTAL= CURRENT_TIMEKEY , INTERFACE_TIME = ETLBEGIN_TIME
WHERE TABLE_NAME=tablename AND VIEW_NAME = viewname AND UPDATE_CONDITION = updatecondition AND INCREMENTAL < CURRENT_TIMEKEY;
END IF;
COMMIT;--提交事务
ETLEND_TIME:=SYSDATE;
ETL_TIME:= ROUND(( ETLEND_TIME - ETLBEGIN_TIME )*24*3600,3);
DBMS_OUTPUT.PUT_LINE ('ETLEND_TIME:'||TO_CHAR(ETLEND_TIME ,'YYYY-MM-DD HH24:MI:SS')||',ETL_TIME:'||ETL_TIME) ;
------------------正常记录-------------------------
insert into ETL_LOG (TABLE_NAME, VIEW_NAME, UPDATE_CONDITION, PRO_NAME, START_TIMEKEY, END_TIMEKEY, INSERT_DATA, ETL_START_TIME, ETL_END_TIME, ETL_TIME, DELETE_CNT, UPDATE_CNT, INSERT_CNT, MESSAGE,OWNER )
VALUES
( tablename,viewname, updatecondition,getName,LAST_TIMEKEY,CURRENT_TIMEKEY,INSERT_CNT-DELETE_CNT,TO_CHAR(ETLBEGIN_TIME,'YYYY-MM-DD HH24:MI:SS'),TO_CHAR(ETLEND_TIME ,'YYYY-MM-DD HH24:MI:SS'),ETL_TIME,DELETE_CNT, UPDATE_CNT,INSERT_CNT, ETL_STATEMENT,owner ) ;
COMMIT ;
DBMS_OUTPUT.PUT_LINE('增长起点:'||LAST_TIMEKEY||'增长终点:'||CURRENT_TIMEKEY);
------------------3. 异常处理部分------------------
EXCEPTION
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE('EXECUTE PROCEDURE '|| getName ||' FAILED.');
ETL_STATEMENT:= SUBSTR(SQLERRM,1,3000);
ETLEND_TIME:= SYSDATE;
ETL_TIME:= ROUND(( ETLEND_TIME - ETLBEGIN_TIME )*24*3600,3);
insert into ETL_LOG (TABLE_NAME, VIEW_NAME, UPDATE_CONDITION,PRO_NAME, START_TIMEKEY, END_TIMEKEY, INSERT_DATA, ETL_START_TIME, ETL_END_TIME, ETL_TIME, DELETE_CNT, UPDATE_CNT, INSERT_CNT, MESSAGE,OWNER )
VALUES
( tablename,viewname ,updatecondition,getName,LAST_TIMEKEY,CURRENT_TIMEKEY,INSERT_CNT-DELETE_CNT,TO_CHAR(ETLBEGIN_TIME,'YYYY-MM-DD HH24:MI:SS'),TO_CHAR(ETLEND_TIME ,'YYYY-MM-DD HH24:MI:SS'),ETL_TIME,DELETE_CNT, UPDATE_CNT,INSERT_CNT, ETL_STATEMENT,owner ) ;
COMMIT;
END;
三、手动运行测试ETL程序(存储过程)
1. 确认源表(视图)
要求源表增量字段名称为UPDATE_TIMEKEY,格式为’YYYY-MM-DD HH24:MI:SS’,如果不符合,需要转换为符合上述格式的视图,或改写上面的存储过程。
假设有源表(视图):DWD_SCORE_V,如下:
NAME |
SUBJECT |
SCORE |
UPDATE_TIMEKEY |
---|---|---|---|
张三 |
语文 |
80 |
2023-06-06 10:30:00 |
张三 |
数学 |
90 |
2023-06-06 10:30:00 |
李四 |
语文 |
85 |
2023-06-06 10:30:00 |
李四 |
数学 |
88 |
2023-06-06 10:30:00 |
2. 确认(建立)目标表
目标表的字段以及顺序与源表完全相同,并在最后增加一列INTERFACE_TIME,格式为DATE,本案例可参考以下建表语句建立目标表DWD_SCORE_T:
CREATE TABLE DWD_SCORE_T
(NAME VARCHAR2(20),
SUBJECT VARCHAR2(20),
SCORE NUMBER,
UPDATE_TIMEKEY VARCHAR2(20),
INTERFACE_TIME DATE
);
建立的表如下:
NAME |
SUBJECT |
SCORE |
UPDATE_TIMEKEY |
INTERFACE_TIME |
---|---|---|---|---|
3. 运行ETL程序(存储过程)
CALL PRO_COPY_BY_TIME_JO ('DWD_SCORE_T','DWD_SCORE_V','JOGARY','2023-06-06 10:00:00');
运行输出如下:
ETLBEGIN_TIME:2023/06/07 11:11:40
LAST_TIMEKEY= 2023-06-06 10:00:00
抽取成功,插入数据:4条;
CURRENT_TIMEKEY =2023-06-06 10:30:00
ETLEND_TIME:2023-06-07 11:11:40,ETL_TIME:0
增长起点:2023-06-06 10:00:00增长终点:2023-06-06 10:30:00
此时目标表DWD_SCORE_T内容如下:
NAME |
SUBJECT |
SCORE |
UPDATE_TIMEKEY |
INTERFACE_TIME |
---|---|---|---|---|
张三 |
语文 |
80 |
2023-06-06 10:30:00 |
2023-06-07 11:11:40 |
张三 |
数学 |
90 |
2023-06-06 10:30:00 |
2023-06-07 11:11:40 |
李四 |
语文 |
85 |
2023-06-06 10:30:00 |
2023-06-07 11:11:40 |
李四 |
数学 |
88 |
2023-06-06 10:30:00 |
2023-06-07 11:11:40 |
ETL增量表ETL_INCREMENTAL内容如下:
TABLE_NAME |
VIEW_NAME |
UPDATE_CONDITION |
INCREMENTAL |
INTERFACE_TIME |
---|---|---|---|---|
DWD_SCORE_T |
DWD_SCORE_V |
1=1 |
2023-06-06 10:30:00 |
2023-06-07 11:11:40 |
ETL日志记录表内容如下:
TABLE_NAME |
VIEW_NAME |
UPDATE_CONDITION |
PRO_NAME |
START_TIMEKEY |
END_TIMEKEY |
INSERT_DATA |
ETL_START_TIME |
ETL_END_TIME |
ETL_TIME |
DELETE_CNT |
UPDATE_CNT |
INSERT_CNT |
MESSAGE |
OWNER |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
DWD_SCORE_T |
DWD_SCORE_V |
1=1 |
P4SPOTUSER.PRO_COPY_BY_TIME_JO |
2023-06-06 10:00:00 |
2023-06-06 10:30:00 |
4 |
2023-06-07 11:11:40 |
2023-06-07 11:11:40 |
0 |
0 |
0 |
4 |
抽取成功,插入数据:4条; |
JOGARY |
假设源表新增了一条数据,再次运行ETL程序,可以把新增的数据同步过来。
由于ETL增量表已经记录上次的增量值,程序会自动调用此增量值作为本次ETL起点,因此不需要提供开始时间了
CALL PRO_COPY_BY_TIME_JO ('DWD_SCORE_T','DWD_SCORE_V','JOGARY');
运行上述同步后,
目标表DWD_SCORE_T内容如下:增加了一条“王五”的数据记录
NAME |
SUBJECT |
SCORE |
UPDATE_TIMEKEY |
INTERFACE_TIME |
---|---|---|---|---|
张三 |
语文 |
80 |
2023-06-06 10:30:00 |
2023-06-07 11:11:40 |
张三 |
数学 |
90 |
2023-06-06 10:30:00 |
2023-06-07 11:11:40 |
李四 |
语文 |
85 |
2023-06-06 10:30:00 |
2023-06-07 11:11:40 |
李四 |
数学 |
88 |
2023-06-06 10:30:00 |
2023-06-07 11:11:40 |
王五 |
语文 |
92 |
2023-06-06 12:10:00 |
2023-06-07 12:11:04 |
ETL增量表ETL_INCREMENTAL内容如下:INCREMENTAL更新了
TABLE_NAME |
VIEW_NAME |
UPDATE_CONDITION |
INCREMENTAL |
INTERFACE_TIME |
---|---|---|---|---|
DWD_SCORE_T |
DWD_SCORE_V |
1=1 |
2023-06-06 12:10:00 |
2023-06-07 12:11:04 |
ETL日志记录表内容如下:增加了一条抽数记录
TABLE_NAME |
VIEW_NAME |
UPDATE_CONDITION |
PRO_NAME |
START_TIMEKEY |
END_TIMEKEY |
INSERT_DATA |
ETL_START_TIME |
ETL_END_TIME |
ETL_TIME |
DELETE_CNT |
UPDATE_CNT |
INSERT_CNT |
MESSAGE |
OWNER |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
DWD_SCORE_T |
DWD_SCORE_V |
1=1 |
P4SPOTUSER.PRO_COPY_BY_TIME_JO |
2023-06-06 10:00:00 |
2023-06-06 10:30:00 |
4 |
2023-06-07 11:11:40 |
2023-06-07 11:11:40 |
0 |
0 |
0 |
4 |
抽取成功,插入数据:4条; |
JOGARY |
DWD_SCORE_T |
DWD_SCORE_V |
1=1 |
P4SPOTUSER.PRO_COPY_BY_TIME_JO |
2023-06-06 10:30:00 |
2023-06-06 11:10:00 |
1 |
2023-06-07 12:11:04 |
2023-06-07 12:11:04 |
0 | 0 | 0 | 1 |
抽取成功,插入数据:1条; |
JOGARY |
四、建立自动调度任务
要实现自动同步数据,还需要建立调度任务,一般企业数据库管理是通过调度工具来调度的,调度工具可以完善的实现任务管理、日志管理、历史记录、错误处理等。其实Oracle自身就有调度功能,用一些简单的代码就能实现。代码如下:
--创建job,自动生成job ID,分配的ID比较随机,建议使用下面的指定ID
declare
job number;
BEGIN
DBMS_JOB.SUBMIT(
JOB => job, /*自动生成JOB_ID*/
WHAT => '/*成绩表单任务----JOGARY*/
PRO_COPY_BY_TIME_JO (''DWD_SCORE_T'',''DWD_SCORE_V'',''JOGARY'');
', /*需要执行的存储过程名称或SQL语句*/
NEXT_DATE => SYSDATE, /*初次执行时间-立即执行*/
INTERVAL => 'TRUNC(SYSDATE,''DD'')+1+7/24' /*每隔1天执行一次,每天7点执行*/
);
commit;
end;
--创建job,指定job ID
BEGIN
DBMS_JOB.ISUBMIT(
JOB => 5001, /*指定JOB_ID*/
WHAT => '/*成绩表单任务----JOGARY*/
PRO_COPY_BY_TIME_JO (''DWD_SCORE_T'',''DWD_SCORE_V'',''JOGARY'');
', /*需要执行的存储过程名称或SQL语句*/
NEXT_DATE => SYSDATE, /*初次执行时间-立即执行*/
INTERVAL => 'TRUNC(SYSDATE,''HH'')+1/24+10/24/60' /*每隔1小时执行一次,每小时的10分开始执行*/
);
commit;
end;
--修改job某一项
--更改updatecondition
begin
dbms_job.what(5001 ,'/*成绩表单任务----JOGARY*/
PRO_COPY_BY_TIME_JO (''DWD_SCORE_T'',''DWD_SCORE_V'',''JOGARY'',''SUBJECT=''''语文'''''');
');
commit;
end;
--更改执行频率
begin
dbms_job.INTERVAL(5001,'TRUNC(SYSDATE,''HH'')+1/24+20/60/24');
commit;
end;
--更改下次执行时间
begin
dbms_job.NEXT_DATE(5001,SYSDATE);
commit;
end;
--停止 job
begin
dbms_job.broken(5001,true,sysdate); /*sysdate:立刻停止,也可以设置某一时刻停止*/
commit;
end;
--启动停止的job
begin
dbms_job.broken(5001,false,sysdate); /*sysdate:立刻启动,也可以设置某一时刻启动*/
commit;
end;
--删除 job
begin
dbms_job.remove(5001);
commit;
end;
至此,我们就完成了整套ETL过程的建立。
以下是通过一些SQL来查询以及处理ETL运行中的各种情况
--查看所有job
select TO_NUMBER(SUBSTR(JOB_NAME,11,10)) JOB_ID,
JOB_CREATOR,OWNER,JOB_ACTION,
TO_CHAR(START_DATE,'YYYY-MM-DD HH24:MI:SS') START_DATE,REPEAT_INTERVAL,ENABLED,STATE,RUN_COUNT,FAILURE_COUNT,
TO_CHAR(LAST_START_DATE,'YYYY-MM-DD HH24:MI:SS') LAST_START_DATE,LAST_RUN_DURATION,
TO_CHAR(NEXT_RUN_DATE,'YYYY-MM-DD HH24:MI:SS') NEXT_RUN_DATE
from dba_scheduler_jobs t
where JOB_NAME like 'DBMS_JOB%'
ORDER BY JOB_ID ;
--查询正在运行的job
select * from dba_scheduler_running_jobs t;
--查询job运行历史记录
select * from dba_scheduler_job_log t order by log_date desc;
--查看日志
SELECT e.*,e.OWNER FROM ETL_LOG e
ORDER BY ETL_START_TIME ASC ;
--查看增量表
SELECT * FROM ETL_INCREMENTAL ;
--查看存储过程的错误
Select * from user_errors where name='PRO_COPY_BY_TIME_JO';
--停止正在运行的job
SELECT a.* FROM dba_jobs_running a;
SELECT sid, serial# FROM v$session WHERE sid IN ('1767');
SELECT a.*,b.serial# FROM dba_jobs_running a,v$session b
WHERE a.sid = b.sid;
ALTER system kill SESSION '1316,35612';