12C数据库Goldengate同步异构数据库Kafka中间件之二

  • Post author:
  • Post category:其他


前两天测试环境的需求将上线生产环境,需求还是

a. 数据源:SSP库 ssp.m_system_user,Oracle DB 12.1.0.2.0,Ogg Version 12.2.0.1.1 OGGCORE_12.2.0.1.0_PLATFORMS_151211.1401_FBO

b. 数据目标:MySQL DLS库 DLS_SYSTEM_USER

c.kafka集群:10.1.1.247 ,Ogg Version 12.3.0.1.0 OGGCORE_OGGADP.12.3.0.1.0GA_PLATFORMS_170828.1608

由于oracle 12c已经是多租户架构,在使用OGG同步的时候,需要考虑下面一些情况

一个 CDB包含多个PDB

抽取模式只能是integrated(集成)模式,不支持claasic capture传统方式捕获;

因为要使用integrated extract,因此,需要能访问log mining server,而这个只能从cdb$root中访问;

源端要使用common user,即c##ogg这种用户来访问源端DB,这样能访问DB的redo log & all pdbs。

在GGSCI或参数文件中,可以使用pdb.schema.table来访问具体的表或序列;

可以在参数文件 中使用sourceCatalog参数,指定一个PDB,后面的参数中只需要schema.table即可;

目标端每个pdb要有一个replicat进程,即一个replicat进程只能投递到一个PDB,不能投递到多个。

源端OGG用户需要赋权:dbms_goldengate_auth.grant_admin_privilege(‘C##GGADMIN’,container=>‘all’),同时建议将ogg的用户设置赋权为:grant dba to c##ogg container=all;

源端DB除了以前要打开归档, force logging, 最小附加日志,可能还需要打开一个开关:alter system set enable_goldengate_replication=true;

具体实施步骤;

1、为要同步的表添加附加日志

dblogin userid ogg@SALESPDB,password OGG_PROD

add trandata ssp.m_system_user

2、 添加抽取进程

add extract EXT_KAF4,integrated tranlog, begin now –12c区别

add EXTTRAIL ./dirdat/k4, extract EXT_KAF4,MEGABYTES 200

GGSCI (salesdb as ogg@salesdb/SALESPDB) 17> dblogin useridalias ggroot

Successfully logged into database CDB$ROOT.

–得在cdb里面注册ext进程

register extract EXT_KAF4 database container(SALESPDB)

edit params EXT_KAF4

extract EXT_KAF4

userid c##ggadmin,PASSWORD ggadmin

LOGALLSUPCOLS

UPDATERECORDFORMAT COMPACT

exttrail ./dirdat/k4,FORMAT RELEASE 12.1

SOURCECATALOG SALESPDB

table ssp.m_system_user;

3、添加投递进程:

add extract PMP_KAF4, exttrailsource ./dirdat/k4

add rmttrail ./dirdat/b4,EXTRACT PMP_KAF4,MEGABYTES 200

eidt params PMP_KAF4

EXTRACT PMP_KAF4

USERID c##ggadmin,PASSWORD ggadmin

PASSTHRU

RMTHOST 10.1.1.247, MGRPORT 9178

RMTTRAIL ./dirdat/b4,format release 12.1

SOURCECATALOG SALESPDB

table ssp.m_system_user;

4.添加初始化进程

ADD EXTRACT ek_04, sourceistable —源端添加

EXTRACT ek_04

USERID c##ggadmin,PASSWORD ggadmin

RMTHOST 10.1.1.247, MGRPORT 9178

RMTFILE ./dirdat/b5,maxfiles 999, megabytes 500,format release 12.1

SOURCECATALOG SALESPDB

table ssp.m_system_user;

5.生成def文件:

edit param salesdb4

USERID c##ggadmin,PASSWORD ggadmin

defsfile /home/oracle/ogg/ggs12/dirdef/salesdb4.def,format release 12.1

SOURCECATALOG SALESPDB

table ssp.m_system_user;

在OGG_HOME下执行如下命令生成def文件

defgen paramfile ./dirprm/salesdb4.prm

将生成的def文件传到目标端kafka–$OGG_HOME/dirdef下

—mysql 数据库地址:10.1.11.24 mysql地址

—kafka地址 10.1.1.246:0000,10.1.1.247:0000 主题:DLS_MERCHANT

1、添加初始化进程:—dirprm

ADD replicat rp_06,specialrun

EDIT PARAMS rp_06

SPECIALRUN

end runtime

setenv(NLS_LANG=”AMERICAN_AMERICA.ZHS16GBK”)

targetdb libfile libggjava.so set property=./dirprm/kafka_k05.props

SOURCEDEFS ./dirdef/salesdb4.def

EXTFILE ./dirdat/b5

reportcount every 1 minutes, rate

grouptransops 10000

map SALESPDB.SSP.M_SYSTEM_USER,TARGET DLS.DLS_SYSTEM_USER;

2、添加复制进程:

add replicat rep_04,exttrail ./dirdat/b4

edit params rep_04

REPLICAT rep_04

setenv(NLS_LANG=”AMERICAN_AMERICA.ZHS16GBK”)

HANDLECOLLISIONS

targetdb libfile libggjava.so set property=./dirprm/kafka_k05.props

SOURCEDEFS ./dirdef/salesdb4.def

reportcount every 1 minutes, rate

grouptransops 10000

map SALESPDB.SSP.M_SYSTEM_USER,TARGET DLS.DLS_SYSTEM_USER;

3、参数配置:

cd /home/appgroup/ogg/ggs12/dirprm

custom_kafka_producer.properties 文件

vi kafka_k05.props

gg.handlerlist = kafkahandler

gg.handler.kafkahandler.type=kafka

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

#The following resolves the topic name using the short table name

gg.handler.kafkahandler.topicMappingTemplate= DLS_MERCHANT

#The following selects the message key using the concatenated primary keys

#gg.handler.kafkahandler.keyMappingTemplate=

#gg.handler.kafkahandler.format=avro_op

gg.handler.kafkahandler.format =json

gg.handler.kafkahandler.format.insertOpKey=I

gg.handler.kafkahandler.format.updateOpKey=U

gg.handler.kafkahandler.format.deleteOpKey=D

gg.handler.kafkahandler.format.truncateOpKey=T

gg.handler.kafkahandler.format.prettyPrint=false

gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]

gg.handler.kafkahandler.format.includePrimaryKeys=true

gg.handler.kafkahandler.SchemaTopicName= DLS_MERCHANT

gg.handler.kafkahandler.BlockingSend =false

gg.handler.kafkahandler.includeTokens=false

gg.handler.kafkahandler.mode=op

goldengate.userexit.timestamp=utc

goldengate.userexit.writers=javawriter

javawriter.stats.display=TRUE

javawriter.stats.full=TRUE

gg.log=log4j

gg.log.level=INFO

gg.report.time=30sec

#Sample gg.classpath for Apache Kafka

gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/


#Sample gg.classpath for HDP

#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/


javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

—-启动各个进程

1,源端抽取,投递,初始化进程启动

2,目标端启动初始化进程、执行初始化脚本,启动复制进程

start rp_06

./replicat paramfile ./dirprm/rp_06.prm reportfile ./dirrpt/rp_06.rpt -p INITIALDATALOAD

start rep_04



版权声明:本文为weixin_34380948原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。