1
、创建用户
CREATE USER strmadmin IDENTIFIED BY strm****
DEFAULT TABLESPACE USERS
QUOTA UNLIMITED ON USERS;
GRANT DBA TO strmadmin;
BEGIN
DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(
grantee => ‘strmadmin’,
grant_privileges => TRUE);
END;
/
/*
connect strmadmin/***@lzmesdb
connect strmadmin/***@lzzlsj
*/
2
、创建
dblink
CREATE DATABASE LINK lzmesdb CONNECT TO strmadmin IDENTIFIED BY strmpw1a USING ‘lzmesdb’;
CREATE DATABASE LINK lzzlsj CONNECT TO strmadmin IDENTIFIED BY strmpw1a USING ‘lzzlsj’;
3
、创建目录
create directory script_dir as ‘/u01/app/oracle/product/11.2.0/dbhome_1/network/admin’;
4
、增加复制规则
DECLARE
TABLES DBMS_UTILITY.uncl_array;
BEGIN
TABLES (1) := ‘QGLZAQD.AQD_JOBS’;
TABLES (2) := ‘QGLZAQD.AQD_MD_STEELGRADES’;
TABLES (3) := ‘QGLZAQD.AQD_PPL_JOBS’;
TABLES (4) := ‘QGLZAQD.AQD_PRODUCT_CLASS’;
TABLES(5):= ‘QGLZAQD.AQD_PRODUCTS’;
TABLES(6):= ‘QGLZAQD.AQD_RES_CATEGORIES’;
TABLES(7):= ‘QGLZAQD.AQD_RESOURCES’;
TABLES(8):= ‘QGLZAQD.AQD_MD_STG_CATEGORIES’;
TABLES(9):= ‘QGLZAQD.AQD_MD_STG_CLASSIFICATIONS’;
TABLES(10):=’QGLZAQD.AQD_MD_DOM_VALUES’;
TABLES(11):=’QGLZAQD.AQD_IN_PRD_QUALITIES’;
TABLES(12):=’QGLZAQD.AQD_INTERNAL_STEELGRADES’;
DBMS_STREAMS_ADM.maintain_tables
(table_names => TABLES,
source_directory_object => NULL,
destination_directory_object => NULL,
source_database => ‘LZZLSJ’,
destination_database => ‘LZMESDB’,
perform_actions => FALSE,
script_name => ‘configure_rep.sql’,
script_directory_object => ‘script_dir’,
bi_directional => FALSE,
include_ddl => false,
instantiation => DBMS_STREAMS_ADM.instantiation_table_network
);
END;
/
DECLARE
— v_dml_rule VARCHAR2 (100);
BEGIN
FOR v_dml_rule IN (SELECT rule_name
FROM dba_streams_table_rules
WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’)
LOOP
DBMS_STREAMS_ADM.rename_schema (rule_name => v_dml_rule.rule_name,
from_schema_name => ‘QGLZAQD’,
to_schema_name => ‘QGLZMES’,
operation => ‘ADD’
);
— dbms_output.put_line(v_dml_rule.rule_name);
END LOOP;
END;
/
DECLARE
— v_dml_rule VARCHAR2 (100);
BEGIN
FOR v_dml_rule IN (SELECT rule_name
FROM dba_streams_table_rules
WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_JOBS’)
LOOP
DBMS_STREAMS_ADM.RENAME_TABLE(
rule_name => v_dml_rule.rule_name,
from_table_name =>’QGLZAQD.AQD_JOBS’ ,
to_table_name =>’QGLZMES.STM_JOBS’
);
— dbms_output.put_line(v_dml_rule.rule_name);
END LOOP;
END;
/
DECLARE
— v_dml_rule VARCHAR2 (100);
BEGIN
FOR v_dml_rule IN (SELECT rule_name
FROM dba_streams_table_rules
WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_MD_STEELGRADES’)
LOOP
DBMS_STREAMS_ADM.RENAME_TABLE(
rule_name => v_dml_rule.rule_name,
from_table_name =>’AQD_MD_STEELGRADES’ ,
to_table_name =>’STM_MD_STEELGRADES’
);
— dbms_output.put_line(v_dml_rule.rule_name);
END LOOP;
END;
/
DECLARE
— v_dml_rule VARCHAR2 (100);
BEGIN
FOR v_dml_rule IN (SELECT rule_name
FROM dba_streams_table_rules
WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_PPL_JOBS’)
LOOP
DBMS_STREAMS_ADM.RENAME_TABLE(
rule_name => v_dml_rule.rule_name,
from_table_name =>’AQD_PPL_JOBS’ ,
to_table_name =>’STM_PPL_JOBS’
);
— dbms_output.put_line(v_dml_rule.rule_name);
END LOOP;
END;
/
DECLARE
— v_dml_rule VARCHAR2 (100);
BEGIN
FOR v_dml_rule IN (SELECT rule_name
FROM dba_streams_table_rules
WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_PRODUCT_CLASS’)
LOOP
DBMS_STREAMS_ADM.RENAME_TABLE(
rule_name => v_dml_rule.rule_name,
from_table_name =>’AQD_PRODUCT_CLASS’ ,
to_table_name =>’STM_PRODUCT_CLASS’
);
— dbms_output.put_line(v_dml_rule.rule_name);
END LOOP;
END;
/
DECLARE
— v_dml_rule VARCHAR2 (100);
BEGIN
FOR v_dml_rule IN (SELECT rule_name
FROM dba_streams_table_rules
WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_PRODUCTS’)
LOOP
DBMS_STREAMS_ADM.RENAME_TABLE(
rule_name => v_dml_rule.rule_name,
from_table_name =>’AQD_PRODUCTS’ ,
to_table_name =>’STM_PRODUCTS’
);
— dbms_output.put_line(v_dml_rule.rule_name);
END LOOP;
END;
/
DECLARE
— v_dml_rule VARCHAR2 (100);
BEGIN
FOR v_dml_rule IN (SELECT rule_name
FROM dba_streams_table_rules
WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_RES_CATEGORIES’)
LOOP
DBMS_STREAMS_ADM.RENAME_TABLE(
rule_name => v_dml_rule.rule_name,
from_table_name =>’AQD_RES_CATEGORIES’ ,
to_table_name =>’STM_RES_CATEGORIES’
);
— dbms_output.put_line(v_dml_rule.rule_name);
END LOOP;
END;
/
DECLARE
— v_dml_rule VARCHAR2 (100);
BEGIN
FOR v_dml_rule IN (SELECT rule_name
FROM dba_streams_table_rules
WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_RESOURCES’)
LOOP
DBMS_STREAMS_ADM.RENAME_TABLE(
rule_name => v_dml_rule.rule_name,
from_table_name =>’AQD_RESOURCES’ ,
to_table_name =>’STM_RESOURCES’
);
— dbms_output.put_line(v_dml_rule.rule_name);
END LOOP;
END;
/
DECLARE
— v_dml_rule VARCHAR2 (100);
BEGIN
FOR v_dml_rule IN (SELECT rule_name
FROM dba_streams_table_rules
WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_MD_STG_CATEGORIES’)
LOOP
DBMS_STREAMS_ADM.RENAME_TABLE(
rule_name => v_dml_rule.rule_name,
from_table_name =>’AQD_MD_STG_CATEGORIES’ ,
to_table_name =>’STM_MD_STG_CATEGORIES’
);
— dbms_output.put_line(v_dml_rule.rule_name);
END LOOP;
END;
/
DECLARE
— v_dml_rule VARCHAR2 (100);
BEGIN
FOR v_dml_rule IN (SELECT rule_name
FROM dba_streams_table_rules
WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_MD_STG_CLASSIFICATIONS’)
LOOP
DBMS_STREAMS_ADM.RENAME_TABLE(
rule_name => v_dml_rule.rule_name,
from_table_name =>’AQD_MD_STG_CLASSIFICATIONS’ ,
to_table_name =>’STM_MD_STG_CLASSIFICATIONS’
);
— dbms_output.put_line(v_dml_rule.rule_name);
END LOOP;
END;
/
DECLARE
— v_dml_rule VARCHAR2 (100);
BEGIN
FOR v_dml_rule IN (SELECT rule_name
FROM dba_streams_table_rules
WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_MD_DOM_VALUES’)
LOOP
DBMS_STREAMS_ADM.RENAME_TABLE(
rule_name => v_dml_rule.rule_name,
from_table_name =>’AQD_MD_DOM_VALUES’ ,
to_table_name =>’STM_MD_DOM_VALUES’
);
— dbms_output.put_line(v_dml_rule.rule_name);
END LOOP;
END;
/
DECLARE
— v_dml_rule VARCHAR2 (100);
BEGIN
FOR v_dml_rule IN (SELECT rule_name
FROM dba_streams_table_rules
WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_IN_PRD_QUALITIES’)
LOOP
DBMS_STREAMS_ADM.RENAME_TABLE(
rule_name => v_dml_rule.rule_name,
from_table_name =>’AQD_IN_PRD_QUALITIES’ ,
to_table_name =>’STM_IN_PRD_QUALITIES’
);
— dbms_output.put_line(v_dml_rule.rule_name);
END LOOP;
END;
/
DECLARE
— v_dml_rule VARCHAR2 (100);
BEGIN
FOR v_dml_rule IN (SELECT rule_name
FROM dba_streams_table_rules
WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_INTERNAL_STEELGRADES’)
LOOP
DBMS_STREAMS_ADM.RENAME_TABLE(
rule_name => v_dml_rule.rule_name,
from_table_name =>’AQD_INTERNAL_STEELGRADES’ ,
to_table_name =>’STM_INTERNAL_STEELGRADES’
);
— dbms_output.put_line(v_dml_rule.rule_name);
END LOOP;
END;
/
DECLARE
iscn NUMBER; — Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(
source_object_name => ‘QGLZAQD.AQD_JOBS’,
source_database_name => ‘lzzlsj’,
instantiation_scn => iscn);
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(
source_object_name => ‘QGLZAQD.AQD_MD_STEELGRADES’,
source_database_name => ‘lzzlsj’,
instantiation_scn => iscn);
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(
source_object_name => ‘QGLZAQD.AQD_PPL_JOBS’,
source_database_name => ‘lzzlsj’,
instantiation_scn => iscn);
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(
source_object_name => ‘QGLZAQD.AQD_PRODUCT_CLASS’,
source_database_name => ‘lzzlsj’,
instantiation_scn => iscn);
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(
source_object_name => ‘QGLZAQD.AQD_PRODUCTS’,
source_database_name => ‘lzzlsj’,
instantiation_scn => iscn);
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(
source_object_name => ‘QGLZAQD.AQD_RES_CATEGORIES’,
source_database_name => ‘lzzlsj’,
instantiation_scn => iscn);
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(
source_object_name => ‘QGLZAQD.AQD_RESOURCES’,
source_database_name => ‘lzzlsj’,
instantiation_scn => iscn);
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(
source_object_name => ‘QGLZAQD.AQD_MD_STG_CATEGORIES’,
source_database_name => ‘lzzlsj’,
instantiation_scn => iscn);
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(
source_object_name => ‘QGLZAQD.AQD_MD_STG_CLASSIFICATIONS’,
source_database_name => ‘lzzlsj’,
instantiation_scn => iscn);
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(
source_object_name => ‘QGLZAQD.AQD_MD_DOM_VALUES’,
source_database_name => ‘lzzlsj’,
instantiation_scn => iscn);
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(
source_object_name => ‘QGLZAQD.AQD_IN_PRD_QUALITIES’,
source_database_name => ‘lzzlsj’,
instantiation_scn => iscn);
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(
source_object_name => ‘QGLZAQD.AQD_INTERNAL_STEELGRADES’,
source_database_name => ‘lzzlsj’,
instantiation_scn => iscn);
END;
/
7
、同步数据
insert into STM_JOBS select * from QGLZAQD.AQD_JOBS@lzzlsj;
insert into STM_MD_STEELGRADES select * from QGLZAQD.AQD_MD_STEELGRADES@lzzlsj;
insert into STM_PPL_JOBS select * from QGLZAQD.AQD_PPL_JOBS@lzzlsj;
insert into STM_PRODUCT_CLASS select * from QGLZAQD.AQD_PRODUCT_CLASS @lzzlsj;
insert into STM_PRODUCTS select * from QGLZAQD.AQD_PRODUCTS @lzzlsj;
insert into STM_RES_CATEGORIES select * from QGLZAQD.AQD_RES_CATEGORIES @lzzlsj;
insert into STM_RESOURCES select * from QGLZAQD.AQD_RESOURCES @lzzlsj;
insert into STM_MD_STG_CATEGORIES select * from QGLZAQD.AQD_MD_STG_CATEGORIES @lzzlsj;
insert into STM_MD_STG_CLASSIFICATIONS select * from QGLZAQD.AQD_MD_STG_CLASSIFICATIONS @lzzlsj;
insert into STM_MD_DOM_VALUES select * from QGLZAQD.AQD_MD_DOM_VALUES @lzzlsj;
insert into STM_INTERNAL_STEELGRADES select * from QGLZAQD.AQD_INTERNAL_STEELGRADES @lzzlsj;
insert into STM_IN_PRD_QUALITIES select * from QGLZAQD.AQD_IN_PRD_QUALITIES @lzzlsj;
8
、增加心跳表
create table stream_heartbeat_table (id number primary key,cdate timestamp) tablespace users;
create sequence stream_heartbeat_seq start with 1;
begin
dbms_streams_adm.add_table_propagation_rules(
table_name => ‘stream_heartbeat_table’,
streams_name => ”,
source_queue_name => ‘”STRMADMIN”.”LZZLSJ$CAPQ”‘,
destination_queue_name => ‘”STRMADMIN”.”LZZLSJ$APPQ”@lzmesdb’,
include_dml => TRUE,
include_ddl => false,
include_tagged_lcr => TRUE,
source_database => ‘lzzlsj’,
inclusion_rule => TRUE,
and_condition => NULL,
queue_to_queue => true);
dbms_streams_adm.add_table_rules(
table_name => ‘stream_heartbeat_table’,
streams_type => ‘CAPTURE’,
streams_name => ‘”LZZLSJ$CAP”‘,
queue_name => ‘”STRMADMIN”.”LZZLSJ$CAPQ”‘,
include_dml => TRUE,
include_ddl => false,
include_tagged_lcr => TRUE,
source_database => ‘lzzlsj’,
inclusion_rule => TRUE);
end;
DECLARE
iscn NUMBER; — Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(
source_object_name => ‘stream_heartbeat_table’,
source_database_name => ‘lzzlsj’,
instantiation_scn => iscn);
end;
/
begin
dbms_streams_adm.add_table_rules(
table_name => ‘stream_heartbeat_table’,
streams_type => ‘APPLY’,
streams_name => ”,
queue_name => ‘”STRMADMIN”.”LZZLSJ$APPQ”‘,
include_dml => TRUE,
include_ddl => false,
include_tagged_lcr => TRUE,
source_database => ‘lzzlsj’,
inclusion_rule => TRUE );
end;
/
create or replace procedure update_heartbeat1
as
begin
update stream_heartbeat_table set id=stream_heartbeat_seq.nextval , cdate = systimestamp;
commit;
end;
begin
dbms_scheduler.create_job(job_name=>’update_heartbeat1_job1′,
job_type=>’STORED_PROCEDURE’,
job_action=>’update_heartbeat1′,
start_date=>systimestamp+5/(24*60*60),
repeat_interval => ‘FREQ=MINUTELY;INTERVAL=1;’,
enabled=>true,
comments=>’update into heartable stream_heartbeat_table’);
end;
/
启动和停止:
BEGIN
dbms_capture_adm.start_capture(
capture_name => ‘”LZZLSJ$CAP”‘);
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -26666 THEN NULL; — CAPTURE process already running
ELSE RAISE;
END IF;
END;
/
BEGIN
dbms_capture_adm.stop_capture(
capture_name => ‘”LZZLSJ$CAP”‘);
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -26666 THEN NULL; — CAPTURE process already running
ELSE RAISE;
END IF;
END;
/
DECLARE
q2q VARCHAR2(10);
destn_q VARCHAR2(65);
BEGIN
SELECT queue_to_queue INTO q2q
FROM dba_propagation
WHERE source_queue_owner = ‘STRMADMIN’ AND
source_queue_name = ‘LZZLSJ$CAPQ’ AND
destination_queue_owner = ‘STRMADMIN’ AND
destination_queue_name = ‘LZZLSJ$APPQ’ AND
destination_dblink = ‘LZMESDB’;
IF q2q = ‘TRUE’ THEN
destn_q := ‘”STRMADMIN”.”LZZLSJ$APPQ”‘;
ELSE
destn_q := NULL;
END IF;
dbms_aqadm.enable_propagation_schedule(
queue_name => ‘”STRMADMIN”.”LZZLSJ$CAPQ”‘,
destination => ‘LZMESDB’,
destination_queue => destn_q);
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -24064 THEN NULL; — propagation already enabled
ELSE RAISE;
END IF;
END;
/
DECLARE
q2q VARCHAR2(10);
destn_q VARCHAR2(65);
BEGIN
SELECT queue_to_queue INTO q2q
FROM dba_propagation
WHERE source_queue_owner = ‘STRMADMIN’ AND
source_queue_name = ‘LZZLSJ$CAPQ’ AND
destination_queue_owner = ‘STRMADMIN’ AND
destination_queue_name = ‘LZZLSJ$APPQ’ AND
destination_dblink = ‘LZMESDB’;
IF q2q = ‘TRUE’ THEN
destn_q := ‘”STRMADMIN”.”LZZLSJ$APPQ”‘;
ELSE
destn_q := NULL;
END IF;
dbms_aqadm.disable_propagation_schedule(
queue_name => ‘”STRMADMIN”.”LZZLSJ$CAPQ”‘,
destination => ‘LZMESDB’,
destination_queue => destn_q);
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -24064 THEN NULL; — propagation already enabled
ELSE RAISE;
END IF;
END;
/
DECLARE
apply_nm VARCHAR2(32);
apply_nm_dqt VARCHAR2(32);
BEGIN
SELECT apply_name INTO apply_nm
FROM dba_apply_progress
WHERE source_database = ‘LZZLSJ’;
apply_nm_dqt := ‘”‘ || apply_nm || ‘”‘;
dbms_apply_adm.start_apply(
apply_name => apply_nm_dqt);
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -26666 THEN NULL; — APPLY process already running
ELSE RAISE;
END IF;
END;
/
DECLARE
apply_nm VARCHAR2(32);
apply_nm_dqt VARCHAR2(32);
BEGIN
SELECT apply_name INTO apply_nm
FROM dba_apply_progress
WHERE source_database = ‘LZZLSJ’;
apply_nm_dqt := ‘”‘ || apply_nm || ‘”‘;
dbms_apply_adm.stop_apply(
apply_name => apply_nm_dqt);
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -26666 THEN NULL; — APPLY process already running
ELSE RAISE;
END IF;
END;
/
启动顺序:
apply (
源端
)prop capture
停止:
apply prop capture
以下显示的日志,不能删除
COLUMN CONSUMER_NAME HEADING ‘Capture|Process|Name’ FORMAT A15
COLUMN SOURCE_DATABASE HEADING ‘Source|Database’ FORMAT A10
COLUMN SEQUENCE# HEADING ‘Sequence|Number’ FORMAT 99999
COLUMN NAME HEADING ‘Required|Archived Redo Log|File Name’ FORMAT A40
SELECT r.CONSUMER_NAME,
r.SOURCE_DATABASE,
r.SEQUENCE#,
r.NAME
FROM DBA_REGISTERED_ARCHIVED_LOG r, DBA_CAPTURE c
WHERE r.CONSUMER_NAME = c.CAPTURE_NAME AND
r.NEXT_SCN >= c.REQUIRED_CHECKPOINT_SCN;
来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/25702/viewspace-735246/,如需转载,请注明出处,否则将追究法律责任。
转载于:http://blog.itpub.net/25702/viewspace-735246/