Oracle从0到1搭建一套ETL (不借助ETL工具),实现自动抽取数据功能

  • Post author:
  • Post category:其他


有时候我们在没有ETL工具的情况下,想实现简单的数据同步(ETL)功能,本文利用Oracle本身的功能(存储过程、函数、job等,不需要任何额外工具),从0到1搭建了一套ETL,可实现数据全量&增量同步、日志记录、自动调度等功能。

一、准备工作

1. 建立ETL日志记录表:用于记录ETL运行日志

CREATE TABLE ETL_LOG --ETL日志记录表
   (TABLE_NAME VARCHAR2(100), 
	VIEW_NAME VARCHAR2(100) , 
	UPDATE_CONDITION VARCHAR2(500), 
	PRO_NAME VARCHAR2(100), 
	START_TIMEKEY VARCHAR2(100), 
	END_TIMEKEY VARCHAR2(100), 
	INSERT_DATA NUMBER, 
	ETL_START_TIME VARCHAR2(100), 
	ETL_END_TIME VARCHAR2(100), 
	ETL_TIME NUMBER, 
	DELETE_CNT NUMBER, 
	UPDATE_CNT NUMBER, 
	INSERT_CNT NUMBER, 
	MESSAGE VARCHAR2(4000) , 
	OWNER VARCHAR2(100) 
   );

2. 建立ETL增量值记录表:记录每个表的增量值,作为下次同步该表的起点

CREATE TABLE ETL_INCREMENTAL --增量值记录表
   (TABLE_NAME VARCHAR2(100) , 
	VIEW_NAME VARCHAR2(100) , 
	UPDATE_CONDITION VARCHAR2(500) , 
	INCREMENTAL VARCHAR2(100) , 
	INTERFACE_TIME DATE
   ) ;

3. 建立getName函数:获取存储过程自身名称,用于日志记录

CREATE OR REPLACE FUNCTION P4SPOTUSER.getName
RETURN VARCHAR2 
AS
  l_owner varchar2(50);
  l_name  varchar2(50);
  l_lineno number;
  l_type  varchar2(50);
BEGIN 
  OWA_UTIL.who_called_me (l_owner, l_name, l_lineno, l_type);
  return l_owner || '.'|| l_name;
END;

二、ETL抽数程序(存储过程)建立

Oracle一般通过存储过程实现数据同步,根据特定同步需求,可在存储过程中做相应的实现,这里提供一个通用的存储过程,实现根据增量字段(一般为时间戳)增量抽取数据的功能,使用时只需传入目标表和源表(或试图)参数即可。

--创建一个通用的存储过程,实现增量抽取数据的功能。
CREATE OR REPLACE PROCEDURE PRO_COPY_BY_TIME_JO
(tablename in VARCHAR2, --目标表
viewname in VARCHAR2, --源表/视图
owner IN VARCHAR2 DEFAULT 'PUBLIC', --作者名字
starttimekey IN VARCHAR2 DEFAULT 'N', --同步开始时间值,取默认值时从增量表获取上次同步结束时间值
updatecondition IN VARCHAR2 DEFAULT '1=1', --同步数据的条件,用于where条件中限制源表数据,默认为不限制
endtimekey IN VARCHAR2 DEFAULT TO_CHAR(SYSDATE-2/24/60,'YYYY-MM-DD HH24:MI:SS') --同步结束时间值,默认同步到2分钟前的数据
)
AS
  ------------------1. 参数申明部分------------------
  ---------ETL LOG记录--与数据无关------------------------
  DELETE_CNT NUMBER :=0 ;
  UPDATE_CNT NUMBER :=0 ;
  INSERT_CNT NUMBER :=0;
  ETLBEGIN_TIME DATE := SYSDATE ;
  ETLEND_TIME DATE ;
  ETL_STATEMENT VARCHAR2(4000);
  ETL_TIME NUMBER; --记录ETL执行时间,单位为秒
  ---------ETL参数定义---------------------------
  LAST_TIMEKEY VARCHAR2(50);  --上次增长点时间参数
  CURRENT_TIMEKEY VARCHAR2(50);  --本次增长点时间参数
  sql_insert VARCHAR2(1000);  --复制表字符串
  sql_max_time VARCHAR2(1000);  --查询最大时间字符串
  INCREMENTALCOUNT NUMBER :=0 ;
 
BEGIN
  ------------------2. 逻辑处理部分------------------
	DBMS_OUTPUT.PUT_LINE ('ETLBEGIN_TIME:'||TO_CHAR(ETLBEGIN_TIME,'YYYY/MM/DD HH24:MI:SS')) ;
	--给LAST_TIMEKEY赋值(上次插入数据的最大UPDATE_TIMEKEY,格式同'2023-04-01 07:30:00') 
	IF starttimekey ='N' THEN 
		SELECT NVL(MAX(INCREMENTAL),TO_CHAR(ETLBEGIN_TIME-1/24,'YYYY-MM-DD HH24:MI:SS')) INTO LAST_TIMEKEY FROM ETL_INCREMENTAL WHERE TABLE_NAME =tablename AND VIEW_NAME =viewname AND UPDATE_CONDITION= updatecondition ;
	ELSE LAST_TIMEKEY := starttimekey;
	END IF ;
	DBMS_OUTPUT.PUT_LINE('LAST_TIMEKEY= '||LAST_TIMEKEY ); 
	--生成插入数据的SQL语句
	sql_insert := 'INSERT INTO '||tablename||' SELECT A.*, '||'TO_DATE('||CHR(39)||TO_CHAR(ETLBEGIN_TIME,'YYYY-MM-DD HH24:MI:SS')||CHR(39)||',''YYYY-MM-DD HH24:MI:SS'')'||' AS INTERFACE_TIME FROM '||viewname||' A WHERE UPDATE_TIMEKEY > '||CHR(39)|| LAST_TIMEKEY ||CHR(39)||' AND UPDATE_TIMEKEY <= '||CHR(39)|| endtimekey ||CHR(39)||' AND '||updatecondition;
	execute immediate sql_insert; --执行SQL语句
	INSERT_CNT:= SQL%ROWCOUNT; --记录SQL执行行数

	IF INSERT_CNT > 0 
		THEN ETL_STATEMENT:= '抽取成功,插入数据:'||TO_CHAR(INSERT_CNT)||'条;'||ETL_STATEMENT;
		ELSE ETL_STATEMENT:= '无数据更新'||ETL_STATEMENT;
	END IF;
	DBMS_OUTPUT.PUT_LINE(ETL_STATEMENT); 

	--执行查询当前TIMEKEY并写入CURRENT_TIMEKEY
	sql_max_time := 'SELECT MAX(UPDATE_TIMEKEY) FROM '||tablename||' WHERE INTERFACE_TIME = '||'TO_DATE('||CHR(39)||TO_CHAR(ETLBEGIN_TIME,'YYYY-MM-DD HH24:MI:SS')||CHR(39)||',''YYYY-MM-DD HH24:MI:SS'')' ;
	execute immediate sql_max_time INTO  CURRENT_TIMEKEY; 
	DBMS_OUTPUT.PUT_LINE('CURRENT_TIMEKEY ='|| CURRENT_TIMEKEY);
	--查看增量状态表ETL_INCREMENTAL是否有值
	SELECT COUNT(*) INTO INCREMENTALCOUNT FROM ETL_INCREMENTAL WHERE TABLE_NAME=tablename AND VIEW_NAME = viewname AND UPDATE_CONDITION = updatecondition;

	IF CURRENT_TIMEKEY IS NULL THEN DBMS_OUTPUT.PUT_LINE('CURRENT_TIMEKEY为空');
	ELSIF INCREMENTALCOUNT=0 THEN--代表增量状态表还未记录数据
		--插入增量状态表,表名 及 时间临界值INCREMENTAL
		INSERT INTO ETL_INCREMENTAL VALUES(tablename,viewname,updatecondition,CURRENT_TIMEKEY,ETLBEGIN_TIME);
	ELSE
	    --更新增量状态表时间临界值INCREMENTAL
	    UPDATE ETL_INCREMENTAL SET INCREMENTAL= CURRENT_TIMEKEY , INTERFACE_TIME = ETLBEGIN_TIME
	    WHERE TABLE_NAME=tablename AND VIEW_NAME = viewname AND UPDATE_CONDITION = updatecondition AND INCREMENTAL < CURRENT_TIMEKEY;
	END IF;
	COMMIT;--提交事务
	
	ETLEND_TIME:=SYSDATE;
	ETL_TIME:= ROUND(( ETLEND_TIME - ETLBEGIN_TIME )*24*3600,3);
	DBMS_OUTPUT.PUT_LINE ('ETLEND_TIME:'||TO_CHAR(ETLEND_TIME ,'YYYY-MM-DD HH24:MI:SS')||',ETL_TIME:'||ETL_TIME) ;

  ------------------正常记录-------------------------
	insert into ETL_LOG (TABLE_NAME, VIEW_NAME, UPDATE_CONDITION, PRO_NAME, START_TIMEKEY, END_TIMEKEY, INSERT_DATA, ETL_START_TIME, ETL_END_TIME, ETL_TIME, DELETE_CNT, UPDATE_CNT, INSERT_CNT, MESSAGE,OWNER     )
                    VALUES
                           ( tablename,viewname, updatecondition,getName,LAST_TIMEKEY,CURRENT_TIMEKEY,INSERT_CNT-DELETE_CNT,TO_CHAR(ETLBEGIN_TIME,'YYYY-MM-DD HH24:MI:SS'),TO_CHAR(ETLEND_TIME ,'YYYY-MM-DD HH24:MI:SS'),ETL_TIME,DELETE_CNT, UPDATE_CNT,INSERT_CNT, ETL_STATEMENT,owner ) ;
	COMMIT ;
	DBMS_OUTPUT.PUT_LINE('增长起点:'||LAST_TIMEKEY||'增长终点:'||CURRENT_TIMEKEY);

  ------------------3. 异常处理部分------------------
EXCEPTION
	WHEN OTHERS THEN
       DBMS_OUTPUT.PUT_LINE('EXECUTE PROCEDURE '|| getName ||' FAILED.');
       ETL_STATEMENT:=  SUBSTR(SQLERRM,1,3000);
       ETLEND_TIME:= SYSDATE;
       ETL_TIME:= ROUND(( ETLEND_TIME - ETLBEGIN_TIME )*24*3600,3);
       insert into ETL_LOG (TABLE_NAME, VIEW_NAME, UPDATE_CONDITION,PRO_NAME, START_TIMEKEY, END_TIMEKEY, INSERT_DATA, ETL_START_TIME, ETL_END_TIME, ETL_TIME, DELETE_CNT, UPDATE_CNT, INSERT_CNT, MESSAGE,OWNER     )
                    VALUES
                           ( tablename,viewname ,updatecondition,getName,LAST_TIMEKEY,CURRENT_TIMEKEY,INSERT_CNT-DELETE_CNT,TO_CHAR(ETLBEGIN_TIME,'YYYY-MM-DD HH24:MI:SS'),TO_CHAR(ETLEND_TIME ,'YYYY-MM-DD HH24:MI:SS'),ETL_TIME,DELETE_CNT, UPDATE_CNT,INSERT_CNT, ETL_STATEMENT,owner  ) ;
       COMMIT;
END;

三、手动运行测试ETL程序(存储过程)

1. 确认源表(视图)

要求源表增量字段名称为UPDATE_TIMEKEY,格式为’YYYY-MM-DD HH24:MI:SS’,如果不符合,需要转换为符合上述格式的视图,或改写上面的存储过程。

假设有源表(视图):DWD_SCORE_V,如下:


NAME

SUBJECT

SCORE

UPDATE_TIMEKEY

张三

语文

80

2023-06-06 10:30:00

张三

数学

90

2023-06-06 10:30:00

李四

语文

85

2023-06-06 10:30:00

李四

数学

88

2023-06-06 10:30:00

2. 确认(建立)目标表

目标表的字段以及顺序与源表完全相同,并在最后增加一列INTERFACE_TIME,格式为DATE,本案例可参考以下建表语句建立目标表DWD_SCORE_T:

CREATE TABLE DWD_SCORE_T
(NAME VARCHAR2(20),
SUBJECT VARCHAR2(20),
SCORE NUMBER,
UPDATE_TIMEKEY VARCHAR2(20),
INTERFACE_TIME DATE
);

建立的表如下:


NAME

SUBJECT

SCORE

UPDATE_TIMEKEY

INTERFACE_TIME

3. 运行ETL程序(存储过程)

CALL PRO_COPY_BY_TIME_JO ('DWD_SCORE_T','DWD_SCORE_V','JOGARY','2023-06-06 10:00:00');

运行输出如下:

ETLBEGIN_TIME:2023/06/07 11:11:40

LAST_TIMEKEY= 2023-06-06 10:00:00

抽取成功,插入数据:4条;

CURRENT_TIMEKEY =2023-06-06 10:30:00

ETLEND_TIME:2023-06-07 11:11:40,ETL_TIME:0

增长起点:2023-06-06 10:00:00增长终点:2023-06-06 10:30:00

此时目标表DWD_SCORE_T内容如下:


NAME

SUBJECT

SCORE

UPDATE_TIMEKEY

INTERFACE_TIME

张三

语文

80

2023-06-06 10:30:00

2023-06-07 11:11:40

张三

数学

90

2023-06-06 10:30:00

2023-06-07 11:11:40

李四

语文

85

2023-06-06 10:30:00

2023-06-07 11:11:40

李四

数学

88

2023-06-06 10:30:00

2023-06-07 11:11:40

ETL增量表ETL_INCREMENTAL内容如下:


TABLE_NAME

VIEW_NAME

UPDATE_CONDITION

INCREMENTAL

INTERFACE_TIME

DWD_SCORE_T

DWD_SCORE_V

1=1

2023-06-06 10:30:00

2023-06-07 11:11:40

ETL日志记录表内容如下:


TABLE_NAME

VIEW_NAME

UPDATE_CONDITION

PRO_NAME

START_TIMEKEY

END_TIMEKEY

INSERT_DATA

ETL_START_TIME

ETL_END_TIME

ETL_TIME

DELETE_CNT

UPDATE_CNT

INSERT_CNT

MESSAGE

OWNER

DWD_SCORE_T

DWD_SCORE_V

1=1

P4SPOTUSER.PRO_COPY_BY_TIME_JO

2023-06-06 10:00:00

2023-06-06 10:30:00

4

2023-06-07 11:11:40

2023-06-07 11:11:40

0

0

0

4

抽取成功,插入数据:4条;

JOGARY

假设源表新增了一条数据,再次运行ETL程序,可以把新增的数据同步过来。

由于ETL增量表已经记录上次的增量值,程序会自动调用此增量值作为本次ETL起点,因此不需要提供开始时间了

CALL PRO_COPY_BY_TIME_JO ('DWD_SCORE_T','DWD_SCORE_V','JOGARY');

运行上述同步后,

目标表DWD_SCORE_T内容如下:增加了一条“王五”的数据记录


NAME

SUBJECT

SCORE

UPDATE_TIMEKEY

INTERFACE_TIME

张三

语文

80

2023-06-06 10:30:00

2023-06-07 11:11:40

张三

数学

90

2023-06-06 10:30:00

2023-06-07 11:11:40

李四

语文

85

2023-06-06 10:30:00

2023-06-07 11:11:40

李四

数学

88

2023-06-06 10:30:00

2023-06-07 11:11:40

王五

语文

92

2023-06-06 12:10:00

2023-06-07 12:11:04

ETL增量表ETL_INCREMENTAL内容如下:INCREMENTAL更新了


TABLE_NAME

VIEW_NAME

UPDATE_CONDITION

INCREMENTAL

INTERFACE_TIME

DWD_SCORE_T

DWD_SCORE_V

1=1

2023-06-06 12:10:00

2023-06-07 12:11:04

ETL日志记录表内容如下:增加了一条抽数记录


TABLE_NAME

VIEW_NAME

UPDATE_CONDITION

PRO_NAME

START_TIMEKEY

END_TIMEKEY

INSERT_DATA

ETL_START_TIME

ETL_END_TIME

ETL_TIME

DELETE_CNT

UPDATE_CNT

INSERT_CNT

MESSAGE

OWNER

DWD_SCORE_T

DWD_SCORE_V

1=1

P4SPOTUSER.PRO_COPY_BY_TIME_JO

2023-06-06 10:00:00

2023-06-06 10:30:00

4

2023-06-07 11:11:40

2023-06-07 11:11:40

0

0

0

4

抽取成功,插入数据:4条;

JOGARY

DWD_SCORE_T

DWD_SCORE_V

1=1

P4SPOTUSER.PRO_COPY_BY_TIME_JO

2023-06-06 10:30:00

2023-06-06 11:10:00

1

2023-06-07 12:11:04

2023-06-07 12:11:04
0 0 0 1
抽取成功,插入数据:1条;

JOGARY

四、建立自动调度任务

要实现自动同步数据,还需要建立调度任务,一般企业数据库管理是通过调度工具来调度的,调度工具可以完善的实现任务管理、日志管理、历史记录、错误处理等。其实Oracle自身就有调度功能,用一些简单的代码就能实现。代码如下:

--创建job,自动生成job ID,分配的ID比较随机,建议使用下面的指定ID
declare
  job number;
BEGIN
  DBMS_JOB.SUBMIT(
        JOB => job,  /*自动生成JOB_ID*/  
        WHAT => '/*成绩表单任务----JOGARY*/
				PRO_COPY_BY_TIME_JO (''DWD_SCORE_T'',''DWD_SCORE_V'',''JOGARY'');
				',  /*需要执行的存储过程名称或SQL语句*/  
        NEXT_DATE => SYSDATE,  /*初次执行时间-立即执行*/  
        INTERVAL => 'TRUNC(SYSDATE,''DD'')+1+7/24' /*每隔1天执行一次,每天7点执行*/
      );  
  commit;
end;

--创建job,指定job ID
BEGIN
  DBMS_JOB.ISUBMIT(
        JOB => 5001,  /*指定JOB_ID*/  
        WHAT => '/*成绩表单任务----JOGARY*/
				PRO_COPY_BY_TIME_JO (''DWD_SCORE_T'',''DWD_SCORE_V'',''JOGARY'');
				',  /*需要执行的存储过程名称或SQL语句*/  
        NEXT_DATE => SYSDATE,  /*初次执行时间-立即执行*/  
        INTERVAL => 'TRUNC(SYSDATE,''HH'')+1/24+10/24/60' /*每隔1小时执行一次,每小时的10分开始执行*/
      );  
  commit;
end;

--修改job某一项
--更改updatecondition
begin
  dbms_job.what(5001 ,'/*成绩表单任务----JOGARY*/
				PRO_COPY_BY_TIME_JO (''DWD_SCORE_T'',''DWD_SCORE_V'',''JOGARY'',''SUBJECT=''''语文'''''');
');
commit;
end;
--更改执行频率
begin
  dbms_job.INTERVAL(5001,'TRUNC(SYSDATE,''HH'')+1/24+20/60/24');
commit;
end;
--更改下次执行时间
begin
  dbms_job.NEXT_DATE(5001,SYSDATE);
commit;
end;

--停止 job
begin   
  dbms_job.broken(5001,true,sysdate);        /*sysdate:立刻停止,也可以设置某一时刻停止*/
commit;
end;

--启动停止的job
begin   
  dbms_job.broken(5001,false,sysdate);        /*sysdate:立刻启动,也可以设置某一时刻启动*/
commit;
end;

--删除 job
begin
  dbms_job.remove(5001);  
 commit;
end;

至此,我们就完成了整套ETL过程的建立。

以下是通过一些SQL来查询以及处理ETL运行中的各种情况

--查看所有job
select TO_NUMBER(SUBSTR(JOB_NAME,11,10)) JOB_ID,
JOB_CREATOR,OWNER,JOB_ACTION,
TO_CHAR(START_DATE,'YYYY-MM-DD HH24:MI:SS') START_DATE,REPEAT_INTERVAL,ENABLED,STATE,RUN_COUNT,FAILURE_COUNT,
TO_CHAR(LAST_START_DATE,'YYYY-MM-DD HH24:MI:SS') LAST_START_DATE,LAST_RUN_DURATION,
TO_CHAR(NEXT_RUN_DATE,'YYYY-MM-DD HH24:MI:SS') NEXT_RUN_DATE
from dba_scheduler_jobs t
where JOB_NAME like 'DBMS_JOB%'
ORDER BY JOB_ID ;   

--查询正在运行的job
select * from dba_scheduler_running_jobs t;

--查询job运行历史记录
select * from dba_scheduler_job_log t order by log_date desc;

--查看日志
SELECT e.*,e.OWNER FROM ETL_LOG e 
ORDER BY ETL_START_TIME ASC ;

--查看增量表
SELECT * FROM ETL_INCREMENTAL ;

--查看存储过程的错误
Select * from user_errors where name='PRO_COPY_BY_TIME_JO';

--停止正在运行的job
SELECT a.* FROM dba_jobs_running a;
SELECT sid, serial# FROM v$session WHERE sid IN ('1767');
SELECT a.*,b.serial# FROM dba_jobs_running a,v$session b
WHERE a.sid = b.sid;
ALTER system kill SESSION '1316,35612';



版权声明:本文为weixin_46156257原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。