stream配置表同步步骤

  • Post author:
  • Post category:其他



1

、创建用户


CREATE USER strmadmin IDENTIFIED BY strm****


DEFAULT TABLESPACE USERS


QUOTA UNLIMITED ON USERS;



GRANT DBA TO strmadmin;



BEGIN


DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(


grantee => ‘strmadmin’,


grant_privileges => TRUE);


END;


/


/*


connect strmadmin/***@lzmesdb


connect strmadmin/***@lzzlsj


*/



2

、创建

dblink


CREATE DATABASE LINK lzmesdb CONNECT TO strmadmin IDENTIFIED BY strmpw1a  USING ‘lzmesdb’;



CREATE DATABASE LINK lzzlsj CONNECT TO strmadmin IDENTIFIED BY strmpw1a  USING ‘lzzlsj’;



3

、创建目录


create directory script_dir as ‘/u01/app/oracle/product/11.2.0/dbhome_1/network/admin’;



4

、增加复制规则


DECLARE


TABLES   DBMS_UTILITY.uncl_array;


BEGIN


TABLES (1) := ‘QGLZAQD.AQD_JOBS’;


TABLES (2) := ‘QGLZAQD.AQD_MD_STEELGRADES’;


TABLES (3) := ‘QGLZAQD.AQD_PPL_JOBS’;


TABLES (4) := ‘QGLZAQD.AQD_PRODUCT_CLASS’;


TABLES(5):= ‘QGLZAQD.AQD_PRODUCTS’;


TABLES(6):= ‘QGLZAQD.AQD_RES_CATEGORIES’;


TABLES(7):= ‘QGLZAQD.AQD_RESOURCES’;


TABLES(8):= ‘QGLZAQD.AQD_MD_STG_CATEGORIES’;


TABLES(9):= ‘QGLZAQD.AQD_MD_STG_CLASSIFICATIONS’;


TABLES(10):=’QGLZAQD.AQD_MD_DOM_VALUES’;


TABLES(11):=’QGLZAQD.AQD_IN_PRD_QUALITIES’;


TABLES(12):=’QGLZAQD.AQD_INTERNAL_STEELGRADES’;





DBMS_STREAMS_ADM.maintain_tables


(table_names                       => TABLES,


source_directory_object           => NULL,


destination_directory_object      => NULL,


source_database                   => ‘LZZLSJ’,


destination_database              => ‘LZMESDB’,


perform_actions                   => FALSE,


script_name                       => ‘configure_rep.sql’,


script_directory_object           => ‘script_dir’,


bi_directional                    => FALSE,


include_ddl                       => false,


instantiation                     => DBMS_STREAMS_ADM.instantiation_table_network


);


END;


/



DECLARE


—   v_dml_rule   VARCHAR2 (100);


BEGIN


FOR v_dml_rule IN (SELECT rule_name


FROM dba_streams_table_rules


WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’)


LOOP


DBMS_STREAMS_ADM.rename_schema (rule_name             => v_dml_rule.rule_name,


from_schema_name      => ‘QGLZAQD’,


to_schema_name        => ‘QGLZMES’,


operation             => ‘ADD’


);



—    dbms_output.put_line(v_dml_rule.rule_name);


END LOOP;


END;


/



DECLARE


—   v_dml_rule   VARCHAR2 (100);


BEGIN


FOR v_dml_rule IN (SELECT rule_name


FROM dba_streams_table_rules


WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_JOBS’)


LOOP


DBMS_STREAMS_ADM.RENAME_TABLE(


rule_name => v_dml_rule.rule_name,


from_table_name  =>’QGLZAQD.AQD_JOBS’ ,


to_table_name  =>’QGLZMES.STM_JOBS’


);


—    dbms_output.put_line(v_dml_rule.rule_name);


END LOOP;


END;


/



DECLARE


—   v_dml_rule   VARCHAR2 (100);


BEGIN


FOR v_dml_rule IN (SELECT rule_name


FROM dba_streams_table_rules


WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_MD_STEELGRADES’)


LOOP


DBMS_STREAMS_ADM.RENAME_TABLE(


rule_name => v_dml_rule.rule_name,


from_table_name  =>’AQD_MD_STEELGRADES’ ,


to_table_name  =>’STM_MD_STEELGRADES’


);


—    dbms_output.put_line(v_dml_rule.rule_name);


END LOOP;


END;


/



DECLARE


—   v_dml_rule   VARCHAR2 (100);


BEGIN


FOR v_dml_rule IN (SELECT rule_name


FROM dba_streams_table_rules


WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_PPL_JOBS’)


LOOP


DBMS_STREAMS_ADM.RENAME_TABLE(


rule_name => v_dml_rule.rule_name,


from_table_name  =>’AQD_PPL_JOBS’ ,


to_table_name  =>’STM_PPL_JOBS’


);


—    dbms_output.put_line(v_dml_rule.rule_name);


END LOOP;


END;


/



DECLARE


—   v_dml_rule   VARCHAR2 (100);


BEGIN


FOR v_dml_rule IN (SELECT rule_name


FROM dba_streams_table_rules


WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_PRODUCT_CLASS’)


LOOP


DBMS_STREAMS_ADM.RENAME_TABLE(


rule_name => v_dml_rule.rule_name,


from_table_name  =>’AQD_PRODUCT_CLASS’ ,


to_table_name  =>’STM_PRODUCT_CLASS’


);


—    dbms_output.put_line(v_dml_rule.rule_name);


END LOOP;


END;


/



DECLARE


—   v_dml_rule   VARCHAR2 (100);


BEGIN


FOR v_dml_rule IN (SELECT rule_name


FROM dba_streams_table_rules


WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_PRODUCTS’)


LOOP


DBMS_STREAMS_ADM.RENAME_TABLE(


rule_name => v_dml_rule.rule_name,


from_table_name  =>’AQD_PRODUCTS’ ,


to_table_name  =>’STM_PRODUCTS’


);


—    dbms_output.put_line(v_dml_rule.rule_name);


END LOOP;


END;


/



DECLARE


—   v_dml_rule   VARCHAR2 (100);


BEGIN


FOR v_dml_rule IN (SELECT rule_name


FROM dba_streams_table_rules


WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_RES_CATEGORIES’)


LOOP


DBMS_STREAMS_ADM.RENAME_TABLE(


rule_name => v_dml_rule.rule_name,


from_table_name  =>’AQD_RES_CATEGORIES’ ,


to_table_name  =>’STM_RES_CATEGORIES’


);


—    dbms_output.put_line(v_dml_rule.rule_name);


END LOOP;


END;


/



DECLARE


—   v_dml_rule   VARCHAR2 (100);


BEGIN


FOR v_dml_rule IN (SELECT rule_name


FROM dba_streams_table_rules


WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_RESOURCES’)


LOOP


DBMS_STREAMS_ADM.RENAME_TABLE(


rule_name => v_dml_rule.rule_name,


from_table_name  =>’AQD_RESOURCES’ ,


to_table_name  =>’STM_RESOURCES’


);


—    dbms_output.put_line(v_dml_rule.rule_name);


END LOOP;


END;


/



DECLARE


—   v_dml_rule   VARCHAR2 (100);


BEGIN


FOR v_dml_rule IN (SELECT rule_name


FROM dba_streams_table_rules


WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_MD_STG_CATEGORIES’)


LOOP


DBMS_STREAMS_ADM.RENAME_TABLE(


rule_name => v_dml_rule.rule_name,


from_table_name  =>’AQD_MD_STG_CATEGORIES’ ,


to_table_name  =>’STM_MD_STG_CATEGORIES’


);


—    dbms_output.put_line(v_dml_rule.rule_name);


END LOOP;


END;


/



DECLARE


—   v_dml_rule   VARCHAR2 (100);


BEGIN


FOR v_dml_rule IN (SELECT rule_name


FROM dba_streams_table_rules


WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_MD_STG_CLASSIFICATIONS’)


LOOP


DBMS_STREAMS_ADM.RENAME_TABLE(


rule_name => v_dml_rule.rule_name,


from_table_name  =>’AQD_MD_STG_CLASSIFICATIONS’ ,


to_table_name  =>’STM_MD_STG_CLASSIFICATIONS’


);


—    dbms_output.put_line(v_dml_rule.rule_name);


END LOOP;


END;


/



DECLARE


—   v_dml_rule   VARCHAR2 (100);


BEGIN


FOR v_dml_rule IN (SELECT rule_name


FROM dba_streams_table_rules


WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_MD_DOM_VALUES’)


LOOP


DBMS_STREAMS_ADM.RENAME_TABLE(


rule_name => v_dml_rule.rule_name,


from_table_name  =>’AQD_MD_DOM_VALUES’ ,


to_table_name  =>’STM_MD_DOM_VALUES’


);


—    dbms_output.put_line(v_dml_rule.rule_name);


END LOOP;


END;


/



DECLARE


—   v_dml_rule   VARCHAR2 (100);


BEGIN


FOR v_dml_rule IN (SELECT rule_name


FROM dba_streams_table_rules


WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_IN_PRD_QUALITIES’)


LOOP


DBMS_STREAMS_ADM.RENAME_TABLE(


rule_name => v_dml_rule.rule_name,


from_table_name  =>’AQD_IN_PRD_QUALITIES’ ,


to_table_name  =>’STM_IN_PRD_QUALITIES’


);


—    dbms_output.put_line(v_dml_rule.rule_name);


END LOOP;


END;


/



DECLARE


—   v_dml_rule   VARCHAR2 (100);


BEGIN


FOR v_dml_rule IN (SELECT rule_name


FROM dba_streams_table_rules


WHERE streams_type = ‘APPLY’ AND rule_type = ‘DML’ and TABLE_NAME=’AQD_INTERNAL_STEELGRADES’)


LOOP


DBMS_STREAMS_ADM.RENAME_TABLE(


rule_name => v_dml_rule.rule_name,


from_table_name  =>’AQD_INTERNAL_STEELGRADES’ ,


to_table_name  =>’STM_INTERNAL_STEELGRADES’


);


—    dbms_output.put_line(v_dml_rule.rule_name);


END LOOP;


END;


/




DECLARE


iscn  NUMBER;         — Variable to hold instantiation SCN value


BEGIN


iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();


DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(


source_object_name    => ‘QGLZAQD.AQD_JOBS’,


source_database_name  => ‘lzzlsj’,


instantiation_scn     => iscn);



DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(


source_object_name    => ‘QGLZAQD.AQD_MD_STEELGRADES’,


source_database_name  => ‘lzzlsj’,


instantiation_scn     => iscn);



DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(


source_object_name    => ‘QGLZAQD.AQD_PPL_JOBS’,


source_database_name  => ‘lzzlsj’,


instantiation_scn     => iscn);



DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(


source_object_name    => ‘QGLZAQD.AQD_PRODUCT_CLASS’,


source_database_name  => ‘lzzlsj’,


instantiation_scn     => iscn);



DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(


source_object_name    => ‘QGLZAQD.AQD_PRODUCTS’,


source_database_name  => ‘lzzlsj’,


instantiation_scn     => iscn);



DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(


source_object_name    => ‘QGLZAQD.AQD_RES_CATEGORIES’,


source_database_name  => ‘lzzlsj’,


instantiation_scn     => iscn);



DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(


source_object_name    => ‘QGLZAQD.AQD_RESOURCES’,


source_database_name  => ‘lzzlsj’,


instantiation_scn     => iscn);



DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(


source_object_name    => ‘QGLZAQD.AQD_MD_STG_CATEGORIES’,


source_database_name  => ‘lzzlsj’,


instantiation_scn     => iscn);



DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(


source_object_name    => ‘QGLZAQD.AQD_MD_STG_CLASSIFICATIONS’,


source_database_name  => ‘lzzlsj’,


instantiation_scn     => iscn);



DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(


source_object_name    => ‘QGLZAQD.AQD_MD_DOM_VALUES’,


source_database_name  => ‘lzzlsj’,


instantiation_scn     => iscn);



DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(


source_object_name    => ‘QGLZAQD.AQD_IN_PRD_QUALITIES’,


source_database_name  => ‘lzzlsj’,


instantiation_scn     => iscn);



DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(


source_object_name    => ‘QGLZAQD.AQD_INTERNAL_STEELGRADES’,


source_database_name  => ‘lzzlsj’,


instantiation_scn     => iscn);




END;


/



7

、同步数据


insert into  STM_JOBS                           select * from  QGLZAQD.AQD_JOBS@lzzlsj;


insert into  STM_MD_STEELGRADES                 select * from  QGLZAQD.AQD_MD_STEELGRADES@lzzlsj;


insert into    STM_PPL_JOBS                     select * from  QGLZAQD.AQD_PPL_JOBS@lzzlsj;


insert into           STM_PRODUCT_CLASS         select * from  QGLZAQD.AQD_PRODUCT_CLASS          @lzzlsj;


insert into              STM_PRODUCTS           select * from  QGLZAQD.AQD_PRODUCTS               @lzzlsj;


insert into           STM_RES_CATEGORIES        select * from  QGLZAQD.AQD_RES_CATEGORIES         @lzzlsj;


insert into              STM_RESOURCES          select * from  QGLZAQD.AQD_RESOURCES              @lzzlsj;


insert into        STM_MD_STG_CATEGORIES        select * from  QGLZAQD.AQD_MD_STG_CATEGORIES      @lzzlsj;


insert into      STM_MD_STG_CLASSIFICATIONS     select * from  QGLZAQD.AQD_MD_STG_CLASSIFICATIONS @lzzlsj;


insert into           STM_MD_DOM_VALUES         select * from  QGLZAQD.AQD_MD_DOM_VALUES          @lzzlsj;


insert into           STM_INTERNAL_STEELGRADES  select * from  QGLZAQD.AQD_INTERNAL_STEELGRADES   @lzzlsj;


insert into             STM_IN_PRD_QUALITIES    select * from  QGLZAQD.AQD_IN_PRD_QUALITIES       @lzzlsj;



8

、增加心跳表


create table stream_heartbeat_table (id number primary key,cdate timestamp) tablespace users;


create sequence stream_heartbeat_seq start with 1;




begin


dbms_streams_adm.add_table_propagation_rules(


table_name => ‘stream_heartbeat_table’,


streams_name => ”,


source_queue_name => ‘”STRMADMIN”.”LZZLSJ$CAPQ”‘,


destination_queue_name => ‘”STRMADMIN”.”LZZLSJ$APPQ”@lzmesdb’,


include_dml => TRUE,


include_ddl => false,


include_tagged_lcr => TRUE,


source_database => ‘lzzlsj’,


inclusion_rule => TRUE,


and_condition => NULL,


queue_to_queue => true);



dbms_streams_adm.add_table_rules(


table_name => ‘stream_heartbeat_table’,


streams_type => ‘CAPTURE’,


streams_name => ‘”LZZLSJ$CAP”‘,


queue_name => ‘”STRMADMIN”.”LZZLSJ$CAPQ”‘,


include_dml => TRUE,


include_ddl => false,


include_tagged_lcr => TRUE,


source_database => ‘lzzlsj’,


inclusion_rule => TRUE);


end;



DECLARE


iscn  NUMBER;         — Variable to hold instantiation SCN value


BEGIN


iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();


DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@lzmesdb(


source_object_name    => ‘stream_heartbeat_table’,


source_database_name  => ‘lzzlsj’,


instantiation_scn     => iscn);


end;


/



begin


dbms_streams_adm.add_table_rules(


table_name => ‘stream_heartbeat_table’,


streams_type => ‘APPLY’,


streams_name => ”,


queue_name => ‘”STRMADMIN”.”LZZLSJ$APPQ”‘,


include_dml => TRUE,


include_ddl => false,


include_tagged_lcr => TRUE,


source_database => ‘lzzlsj’,


inclusion_rule => TRUE    );


end;


/



create or replace procedure update_heartbeat1


as


begin


update  stream_heartbeat_table  set id=stream_heartbeat_seq.nextval , cdate = systimestamp;


commit;


end;



begin


dbms_scheduler.create_job(job_name=>’update_heartbeat1_job1′,


job_type=>’STORED_PROCEDURE’,


job_action=>’update_heartbeat1′,


start_date=>systimestamp+5/(24*60*60),


repeat_interval => ‘FREQ=MINUTELY;INTERVAL=1;’,


enabled=>true,


comments=>’update into heartable stream_heartbeat_table’);


end;


/


启动和停止:



BEGIN


dbms_capture_adm.start_capture(


capture_name => ‘”LZZLSJ$CAP”‘);


EXCEPTION WHEN OTHERS THEN


IF sqlcode = -26666 THEN NULL;  — CAPTURE process already running


ELSE RAISE;


END IF;


END;


/



BEGIN


dbms_capture_adm.stop_capture(


capture_name => ‘”LZZLSJ$CAP”‘);


EXCEPTION WHEN OTHERS THEN


IF sqlcode = -26666 THEN NULL;  — CAPTURE process already running


ELSE RAISE;


END IF;


END;


/



DECLARE


q2q       VARCHAR2(10);


destn_q   VARCHAR2(65);


BEGIN


SELECT queue_to_queue INTO q2q


FROM dba_propagation


WHERE source_queue_owner = ‘STRMADMIN’ AND


source_queue_name = ‘LZZLSJ$CAPQ’ AND


destination_queue_owner = ‘STRMADMIN’ AND


destination_queue_name = ‘LZZLSJ$APPQ’ AND


destination_dblink = ‘LZMESDB’;



IF q2q = ‘TRUE’ THEN


destn_q := ‘”STRMADMIN”.”LZZLSJ$APPQ”‘;


ELSE


destn_q := NULL;


END IF;



dbms_aqadm.enable_propagation_schedule(


queue_name => ‘”STRMADMIN”.”LZZLSJ$CAPQ”‘,


destination => ‘LZMESDB’,


destination_queue => destn_q);


EXCEPTION WHEN OTHERS THEN


IF sqlcode = -24064 THEN NULL; — propagation already enabled


ELSE RAISE;


END IF;


END;


/



DECLARE


q2q       VARCHAR2(10);


destn_q   VARCHAR2(65);


BEGIN


SELECT queue_to_queue INTO q2q


FROM dba_propagation


WHERE source_queue_owner = ‘STRMADMIN’ AND


source_queue_name = ‘LZZLSJ$CAPQ’ AND


destination_queue_owner = ‘STRMADMIN’ AND


destination_queue_name = ‘LZZLSJ$APPQ’ AND


destination_dblink = ‘LZMESDB’;



IF q2q = ‘TRUE’ THEN


destn_q := ‘”STRMADMIN”.”LZZLSJ$APPQ”‘;


ELSE


destn_q := NULL;


END IF;



dbms_aqadm.disable_propagation_schedule(


queue_name => ‘”STRMADMIN”.”LZZLSJ$CAPQ”‘,


destination => ‘LZMESDB’,


destination_queue => destn_q);


EXCEPTION WHEN OTHERS THEN


IF sqlcode = -24064 THEN NULL; — propagation already enabled


ELSE RAISE;


END IF;


END;


/




DECLARE


apply_nm VARCHAR2(32);


apply_nm_dqt VARCHAR2(32);


BEGIN


SELECT apply_name INTO apply_nm


FROM dba_apply_progress


WHERE source_database = ‘LZZLSJ’;



apply_nm_dqt := ‘”‘ || apply_nm || ‘”‘;


dbms_apply_adm.start_apply(


apply_name => apply_nm_dqt);


EXCEPTION WHEN OTHERS THEN


IF sqlcode = -26666 THEN NULL;  — APPLY process already running


ELSE RAISE;


END IF;


END;


/



DECLARE


apply_nm VARCHAR2(32);


apply_nm_dqt VARCHAR2(32);


BEGIN


SELECT apply_name INTO apply_nm


FROM dba_apply_progress


WHERE source_database = ‘LZZLSJ’;



apply_nm_dqt := ‘”‘ || apply_nm || ‘”‘;


dbms_apply_adm.stop_apply(


apply_name => apply_nm_dqt);


EXCEPTION WHEN OTHERS THEN


IF sqlcode = -26666 THEN NULL;  — APPLY process already running


ELSE RAISE;


END IF;


END;


/


启动顺序:

apply  (

源端

)prop capture

停止:

apply prop capture


以下显示的日志,不能删除


COLUMN CONSUMER_NAME HEADING ‘Capture|Process|Name’ FORMAT A15


COLUMN SOURCE_DATABASE HEADING ‘Source|Database’ FORMAT A10


COLUMN SEQUENCE# HEADING ‘Sequence|Number’ FORMAT 99999


COLUMN NAME HEADING ‘Required|Archived Redo Log|File Name’ FORMAT A40


SELECT r.CONSUMER_NAME,


r.SOURCE_DATABASE,


r.SEQUENCE#,


r.NAME


FROM DBA_REGISTERED_ARCHIVED_LOG r, DBA_CAPTURE c


WHERE r.CONSUMER_NAME = c.CAPTURE_NAME AND


r.NEXT_SCN >= c.REQUIRED_CHECKPOINT_SCN;

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/25702/viewspace-735246/,如需转载,请注明出处,否则将追究法律责任。

转载于:http://blog.itpub.net/25702/viewspace-735246/