Flink Oracle CDC简介
   
flink cdc是在flink的基础上对oracle的数据进行实时采集,底层使用的是debezium框架来实现,debezium使用oracle自带的logminer技术来实现。logminer的采集需要对数据库和采集表添加补充日志,由于oracle18c不支持对数据添加补充日志,所以目前支持的oracle11、12、19三个版本。
    
    
    Flink Oracle CDC使用
   
flink oracle cdc 支持sql和api两种方式。oracle需要开启归档日志和补充日志才能完成采集,同时需要提供一个有权限的账号去连接oracle数据库完成实时采集。
    
    
    归档日志开启方式
   
# 连接oracle
ORACLE_SID=SID
export ORACLE_SID
sqlplus /nolog
  CONNECT sys/password AS SYSDBA
# 开启归档日志
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
    
    
    对数据库和表开启补充日志
   
-- 开启指定表的所有字段补充日志: 补充日志支持ALL、PRIMARY KEY方式
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- 开启数据库的补充日志
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
    
    
    提供一个有权限的用户
   
sqlplus sys/password@host:port/SID AS SYSDBA;
  CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
  GRANT CREATE SESSION TO flinkuser;
  GRANT SET CONTAINER TO flinkuser;
  GRANT SELECT ON V_$DATABASE to flinkuser;
  GRANT FLASHBACK ANY TABLE TO flinkuser;
  GRANT SELECT ANY TABLE TO flinkuser;
  GRANT SELECT_CATALOG_ROLE TO flinkuser;
  GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
  GRANT SELECT ANY TRANSACTION TO flinkuser;
  GRANT LOGMINING TO flinkuser;
  GRANT CREATE TABLE TO flinkuser;
  -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
  GRANT LOCK ANY TABLE TO flinkuser;
  GRANT ALTER ANY TABLE TO flinkuser;
  GRANT CREATE SEQUENCE TO flinkuser;
  GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
  GRANT SELECT ON V_$LOG TO flinkuser;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
  GRANT SELECT ON V_$LOGFILE TO flinkuser;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
  exit;
    flink的详细使用请参考
    
     官网地址
    
   
    
    
    源码解读
   
由于源码解读是基于api方式使用的,先要准备源码环境,通过maven引入jar包,并将相关的源码下载下来,就可以在idea里面愉快的阅读和调试源代码。
    
    
    引入maven包
   
官网最新的版本是2.4,发布版本是2.3.0,我调试的环境是2.2.0。下面的代码都是基于2.2.0来介绍。
<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-oracle-cdc</artifactId>
  <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
  <version>2.2.0</version>
</dependency>
    
    
    基于api方式使用oracle cdc
   
        Properties properties = new Properties();
        properties.put("decimal.handling.mode", "double");
        properties.put("database.url","jdbc:oracle:thin:@127.0.0.1:1521:orcl");
        SourceFunction<String> sourceFunction = OracleSource.<String>builder()
                .hostname("localhost")
                .port(1521)
                .database("orcl") // monitor XE database
                .schemaList("flinkuser") // monitor inventory schema
                .tableList("flinkuser.test") // monitor products table
                .username("flinkuser")
                .password("flinkpw")
                .startupOptions(StartupOptions.latest())
                .debeziumProperties(properties)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSOurce(sourceFunction).print();
        env.execute();
    以
    
     OracleSource
    
    作为一个工具类方法调用build()会返回一个
    
     DebeziumSourceFunction
    
    对象,在返回这个对象之前会设置build之前的参数。
   
public DebeziumSourceFunction<T> build() {
            Properties props = new Properties();
            props.setProperty("connector.class", OracleConnector.class.getCanonicalName());
            // Logical name that identifies and provides a namespace for the particular Oracle
            // database server being
            // monitored. The logical name should be unique across all other connectors, since it is
            // used as a prefix
            // for all Kafka topic names emanating from this connector. Only alphanumeric characters
            // and
            // underscores should be used.
            props.setProperty("database.server.name", DATABASE_SERVER_NAME);
            props.setProperty("database.hostname", checkNotNull(hostname));
            props.setProperty("database.user", checkNotNull(username));
            props.setProperty("database.password", checkNotNull(password));
            props.setProperty("database.port", String.valueOf(port));
            props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
            props.setProperty("database.dbname", checkNotNull(database));
            if (schemaList != null) {
                props.setProperty("schema.whitelist", String.join(",", schemaList));
            }
            if (tableList != null) {
                props.setProperty("table.include.list", String.join(",", tableList));
            }
            DebeziumOffset specificOffset = null;
            switch (startupOptions.startupMode) {
                case INITIAL:
                    props.setProperty("snapshot.mode", "initial");
                    break;
                case LATEST_OFFSET:
                    props.setProperty("snapshot.mode", "schema_only");
                    break;
                default:
                    throw new UnsupportedOperationException();
            }
            if (dbzProperties != null) {
                props.putAll(dbzProperties);
            }
            return new DebeziumSourceFunction<>(
                    deserializer, props, specificOffset, new OracleValidator(props));
        }
    跟进
    
     DebeziumSourceFunction
    
    源代码的run()方法里面提交解析oracle实时日志请求
   
// create the engine with this configuration ...
        this.engine =
                DebeziumEngine.create(Connect.class)
                        .using(properties)
                        .notifying(changeConsumer)
                        .using(OffsetCommitPolicy.always())
                        .using(
                                (success, message, error) -> {
                                    if (success) {
                                        // Close the handover and prepare to exit.
                                        handover.close();
                                    } else {
                                        handover.reportError(error);
                                    }
                                })
                        .build();
        // run the engine asynchronously
        executor.execute(engine);
        debeziumStarted = true;
    
     DebeziumEngine.build()
    
    的实现类是
    
     io.debezium.embedded.EmbeddedEngine.BuilderImpl#build
    
    这个方法,返回一个
    
     EmbeddedEngine
    
    对象,这是一个线程类。在run方法里面完成整个数据采集链路。方法调用栈
    
     
   
@Override
    public void run() {
        if (runningThread.compareAndSet(null, Thread.currentThread())) {
                ....
                // Instantiate the connector ...
                SourceConnector connector = null;
                try {
                    @SuppressWarnings("unchecked")
                    Class<? extends SourceConnector> connectorClass = (Class<SourceConnector>) classLoader.loadClass(connectorClassName);
                    connector = connectorClass.getDeclaredConstructor().newInstance();
                }
                // Instantiate the offset store ...
                final String offsetStoreClassName = config.getString(OFFSET_STORAGE);
                OffsetBackingStore offsetStore = null;
                try {
                    @SuppressWarnings("unchecked")
                    Class<? extends OffsetBackingStore> offsetStoreClass = (Class<OffsetBackingStore>) classLoader.loadClass(offsetStoreClassName);
                    offsetStore = offsetStoreClass.getDeclaredConstructor().newInstance();
                }
                ....
                // Initialize the offset store ...
                try {
                    offsetStore.configure(workerConfig);
                    offsetStore.start();
                }
                ....
                // Set up the offset commit policy ...
                if (offsetCommitPolicy == null) {
                    offsetCommitPolicy = Instantiator.getInstanceWithProperties(config.getString(EmbeddedEngine.OFFSET_COMMIT_POLICY),
                            () -> getClass().getClassLoader(), config.asProperties());
                }
                // Initialize the connector using a context that does NOT respond to requests to reconfigure tasks ...
                ConnectorContext context = new ConnectorContext() ;
                ....
                connector.initialize(context);
                OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, engineName,
                        keyConverter, valueConverter);
                OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, engineName,
                        keyConverter, valueConverter);
                Duration commitTimeout = Duration.ofMillis(config.getLong(OFFSET_COMMIT_TIMEOUT_MS));
                try {
                    // Start the connector with the given properties and get the task configurations ...
                    connector.start(config.asMap());
                    connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStarted);
                    List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
                    Class<? extends Task> taskClass = connector.taskClass();
                    if (taskConfigs.isEmpty()) {
                        String msg = "Unable to start connector's task class '" + taskClass.getName() + "' with no task configuration";
                        fail(msg);
                        return;
                    }
                    task = null;
                    try {
                        task = (SourceTask) taskClass.getDeclaredConstructor().newInstance();
                    }
                    catch (IllegalAccessException | InstantiationException t) {
                        fail("Unable to instantiate connector's task class '" + taskClass.getName() + "'", t);
                        return;
                    }
                    try {
                        SourceTaskContext taskContext = new SourceTaskContext() ;
                        ......
                        task.initialize(taskContext);
                        task.start(taskConfigs.get(0));
                        connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStarted);
                    }
                    ......
                    recordsSinceLastCommit = 0;
                    Throwable handlerError = null;
                    try {
                        timeOfLastCommitMillis = clock.currentTimeInMillis();
                        RecordCommitter committer = buildRecordCommitter(offsetWriter, task, commitTimeout);
                        while (runningThread.get() != null) {
                            List<SourceRecord> changeRecords = null;
                            try {
                                LOGGER.debug("Embedded engine is polling task for records on thread {}", runningThread.get());
                                changeRecords = task.poll(); // blocks until there are values ...
                                LOGGER.debug("Embedded engine returned from polling task for records");
                            }
                            catch (InterruptedException e) {
                                // Interrupted while polling ...
                                LOGGER.debug("Embedded engine interrupted on thread {} while polling the task for records", runningThread.get());
                                if (this.runningThread.get() == Thread.currentThread()) {
                                    // this thread is still set as the running thread -> we were not interrupted
                                    // due the stop() call -> probably someone else called the interrupt on us ->
                                    // -> we should raise the interrupt flag
                                    Thread.currentThread().interrupt();
                                }
                                break;
                            }
                            catch (RetriableException e) {
                                LOGGER.info("Retrieable exception thrown, connector will be restarted", e);
                                // Retriable exception should be ignored by the engine
                                // and no change records delivered.
                                // The retry is handled in io.debezium.connector.common.BaseSourceTask.poll()
                            }
                            try {
                                if (changeRecords != null && !changeRecords.isEmpty()) {
                                    LOGGER.debug("Received {} records from the task", changeRecords.size());
                                    changeRecords = changeRecords.stream()
                                            .map(transformations::transform)
                                            .filter(x -> x != null)
                                            .collect(Collectors.toList());
                                }
                                if (changeRecords != null && !changeRecords.isEmpty()) {
                                    LOGGER.debug("Received {} transformed records from the task", changeRecords.size());
                                    try {
                                        handler.handleBatch(changeRecords, committer);
                                    }
                                    catch (StopConnectorException e) {
                                        break;
                                    }
                                }
                                else {
                                    LOGGER.debug("Received no records from the task");
                                }
                            }
                            catch (Throwable t) {
                                // There was some sort of unexpected exception, so we should stop work
                                handlerError = t;
                                break;
                            }
                        }
                    }
                    ...
            }
        }
    }
- 
     通过反射方式初始化connector获取
 
 OracleConnector
 
 对象
- 初始化offset的存储对象
- 设置offset提交策略
- 
     通过
 
 connector.start(config.asMap());
 
 将配置属性设置给Connector对象
- 
     通过反射方式从connector获取SourceTask,在这里获取的是
 
 OracleConnectorTask
 
 对象
- 
     通过调用
 
 task.start(taskConfigs.get(0));
 
 启动任务去获取oracle的变更数据,具体方法路径
 
 io.debezium.connector.oracle.OracleConnectorTask#start
 
 ,具体实现代码如下:
@Override
   public ChangeEventSourceCoordinator start(Configuration config) {
       OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);
       TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
       SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
       Configuration jdbcConfig = connectorConfig.jdbcConfig();
       jdbcConnection = new OracleConnection(jdbcConfig, () -> getClass().getClassLoader());
       this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);
       this.schema.initializeStorage();
       String adapterString = config.getString(OracleConnectorConfig.CONNECTOR_ADAPTER);
       OracleConnectorConfig.ConnectorAdapter adapter = OracleConnectorConfig.ConnectorAdapter.parse(adapterString);
       OffsetContext previousOffset = getPreviousOffset(new OracleOffsetContext.Loader(connectorConfig, adapter));
       if (previousOffset != null) {
           schema.recover(previousOffset);
       }
       taskContext = new OracleTaskContext(connectorConfig, schema);
       Clock clock = Clock.system();
       // Set up the task record queue ...
       this.queue = new ChangeEventQueue.Builder<DataChangeEvent>()
               .pollInterval(connectorConfig.getPollInterval())
               .maxBatchSize(connectorConfig.getMaxBatchSize())
               .maxQueueSize(connectorConfig.getMaxQueueSize())
               .loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
               .build();
       errorHandler = new OracleErrorHandler(connectorConfig.getLogicalName(), queue);
       final OracleEventMetadataProvider metadataProvider = new OracleEventMetadataProvider();
       EventDispatcher<TableId> dispatcher = new EventDispatcher<>(
               connectorConfig,
               topicSelector,
               schema,
               queue,
               connectorConfig.getTableFilters().dataCollectionFilter(),
               DataChangeEvent::new,
               metadataProvider,
               schemaNameAdjuster);
       final OracleStreamingChangeEventSourceMetrics streamingMetrics = new OracleStreamingChangeEventSourceMetrics(taskContext, queue, metadataProvider,
               connectorConfig);
       ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator(
               previousOffset,
               errorHandler,
               OracleConnector.class,
               connectorConfig,
               new OracleChangeEventSourceFactory(connectorConfig, jdbcConnection, errorHandler, dispatcher, clock, schema, jdbcConfig, taskContext, streamingMetrics),
               new OracleChangeEventSourceMetricsFactory(streamingMetrics),
               dispatcher,
               schema);
       coordinator.start(taskContext, this.queue, metadataProvider);
       return coordinator;
   }
- 
     创建一个任务上下文对象
 
 taskContext
 
 ,改对象用来保存任务的参数和schema属性
- 
     设置一个消息队列
 
 queue
 
 ,用来保存解析后的消息
- 
     创建事件分发器对象
 
 dispatcher
 
 ,该对象用来下发解析后的数据到队列中
- 
     创建
 
 io.debezium.pipeline.ChangeEventSourceCoordinator
 
 对象,调用io.debezium.pipeline.ChangeEventSourceCoordinator#start,
 
  
 
 方法中的会调用streamEvents,streamEvents最后调用
 
 io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource#execute
 
 方法,该方法就是解析oracle日志的最终实现方法
public void execute(ChangeEventSourceContext context) {
       try (TransactionalBuffer transactionalBuffer = new TransactionalBuffer(schema, clock, errorHandler, streamingMetrics)) {
           try {
               startScn = offsetContext.getScn();
               createFlushTable(jdbcConnection);
               if (!isContinuousMining && startScn.compareTo(getFirstOnlineLogScn(jdbcConnection, archiveLogRetention)) < 0) {
                   throw new DebeziumException(
                           "Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ".  Please perform a new snapshot.");
               }
               setNlsSessionParameters(jdbcConnection);
               checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);
               initializeRedoLogsForMining(jdbcConnection, false, archiveLogRetention);
               HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder();
               try {
                   // todo: why can't OracleConnection be used rather than a Factory+JdbcConfiguration?
                   historyRecorder.prepare(streamingMetrics, jdbcConfiguration, connectorConfig.getLogMinerHistoryRetentionHours());
                   final LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, jdbcConnection,
                           connectorConfig, streamingMetrics, transactionalBuffer, offsetContext, schema, dispatcher,
                           clock, historyRecorder);
                   final String query = SqlUtils.logMinerContentsQuery(connectorConfig, jdbcConnection.username());
                   try (PreparedStatement miningView = jdbcConnection.connection().prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY,
                           ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT)) {
                       currentRedoLogSequences = getCurrentRedoLogSequences();
                       Stopwatch stopwatch = Stopwatch.reusable();
                       while (context.isRunning()) {
                           // Calculate time difference before each mining session to detect time zone offset changes (e.g. DST) on database server
                           streamingMetrics.calculateTimeDifference(getSystime(jdbcConnection));
                           Instant start = Instant.now();
                           endScn = getEndScn(jdbcConnection, startScn, streamingMetrics, connectorConfig.getLogMiningBatchSizeDefault());
                           flushLogWriter(jdbcConnection, jdbcConfiguration, isRac, racHosts);
                           if (hasLogSwitchOccurred()) {
                               // This is the way to mitigate PGA leaks.
                               // With one mining session, it grows and maybe there is another way to flush PGA.
                               // At this point we use a new mining session
                               LOGGER.trace("Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}",
                                       startScn, endScn, offsetContext.getScn(), strategy, isContinuousMining);
                               endMining(jdbcConnection);
                               initializeRedoLogsForMining(jdbcConnection, true, archiveLogRetention);
                               abandonOldTransactionsIfExist(jdbcConnection, transactionalBuffer);
                               // This needs to be re-calculated because building the data dictionary will force the
                               // current redo log sequence to be advanced due to a complete log switch of all logs.
                               currentRedoLogSequences = getCurrentRedoLogSequences();
                           }
                           startLogMining(jdbcConnection, startScn, endScn, strategy, isContinuousMining, streamingMetrics);
                           stopwatch.start();
                           miningView.setFetchSize(connectorConfig.getMaxQueueSize());
                           miningView.setFetchDirection(ResultSet.FETCH_FORWARD);
                           miningView.setString(1, startScn.toString());
                           miningView.setString(2, endScn.toString());
                           try (ResultSet rs = miningView.executeQuery()) {
                               Duration lastDurationOfBatchCapturing = stopwatch.stop().durations().statistics().getTotal();
                               streamingMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing);
                               processor.processResult(rs);
                               startScn = endScn;
                               if (transactionalBuffer.isEmpty()) {
                                   LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn);
                                   offsetContext.setScn(startScn);
                               }
                           }
                           streamingMetrics.setCurrentBatchProcessingTime(Duration.between(start, Instant.now()));
                           pauseBetweenMiningSessions();
                       }
                   }
               }
               finally {
                   historyRecorder.close();
               }
           }
           catch (Throwable t) {
               logError(streamingMetrics, "Mining session stopped due to the {}", t);
               errorHandler.setProducerThrowable(t);
           }
           finally {
               LOGGER.info("startScn={}, endScn={}, offsetContext.getScn()={}", startScn, endScn, offsetContext.getScn());
               LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString());
               LOGGER.info("Streaming metrics dump: {}", streamingMetrics.toString());
           }
       }
   }
- 创建一张临时表,保存最后一次解析的SCN,用来下一次解析的数据位置
CREATE TABLE LOGMNR_FLUSH_TABLE (LAST_SCN NUMBER(19,0));
- 检查数据库和表有没有开启归档日志
checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);
- 调用数据库的数据字典构建存储过程,并数据库的归档日志和在线日志添加到logminer中
   private void initializeRedoLogsForMining(OracleConnection connection, boolean postEndMiningSession, Duration archiveLogRetention) throws SQLException {
        if (!postEndMiningSession) {
            if (OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)) {
                buildDataDictionary(connection);
            }
            if (!isContinuousMining) {
                setRedoLogFilesForMining(connection, startScn, archiveLogRetention);
            }
        }
        else {
            if (!isContinuousMining) {
                if (OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)) {
                    buildDataDictionary(connection);
                }
                setRedoLogFilesForMining(connection, startScn, archiveLogRetention);
            }
        }
    }
以上代码在数据库中会执行如下语句:
BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;
# 查看在线日志列表
SELECT MIN(F.MEMBER) AS FILE_NAME, L.NEXT_CHANGE# AS NEXT_CHANGE, F.GROUP#, L.FIRST_CHANGE# AS FIRST_CHANGE, L.STATUS 
                 FROM V$LOG L, V$LOGFILE F 
                 WHERE F.GROUP# = L.GROUP# AND L.NEXT_CHANGE# > 0 
                 GROUP BY F.GROUP#, L.NEXT_CHANGE#, L.FIRST_CHANGE#, L.STATUS ORDER BY 3;
# 查看归档日志列表
SELECT NAME AS FILE_NAME, NEXT_CHANGE# AS NEXT_CHANGE, FIRST_CHANGE# AS FIRST_CHANGE 
        FROM V$ARCHIVED_LOG
        WHERE NAME IS NOT NULL 
        AND ARCHIVED = 'YES' 
        AND STATUS = 'A' 
        AND NEXT_CHANGE# '?' --上一次爬取的scn
        AND DEST_ID IN (SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS WHERE STATUS='VALID' AND TYPE='LOCAL' AND ROWNUM=1))         ORDER BY 2;  
将归档日志合并到在线日志中,将合并的列表添加到logminer中用来解析
EGIN sys.dbms_logmnr.add_logfile(LOGFILENAME => '" + fileName + "', OPTIONS => "DBMS_LOGMNR.ADDFILE");END;
- 
     调用
 
 sys.dbms_logmnr.start_logmnr
 
 开始解析归档日志,并将解析的结果写入
 
 V$LOGMNR_CONTENTS
 
   static String startLogMinerStatement(Scn startScn, Scn endScn, OracleConnectorConfig.LogMiningStrategy strategy, boolean isContinuousMining) {
        String miningStrategy;
        if (strategy.equals(OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO)) {
            miningStrategy = "DBMS_LOGMNR.DICT_FROM_REDO_LOGS + DBMS_LOGMNR.DDL_DICT_TRACKING ";
        }
        else {
            miningStrategy = "DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG ";
        }
        if (isContinuousMining) {
            miningStrategy += " + DBMS_LOGMNR.CONTINUOUS_MINE ";
        }
        return "BEGIN sys.dbms_logmnr.start_logmnr(" +
                "startScn => '" + startScn + "', " +
                "endScn => '" + endScn + "', " +
                "OPTIONS => " + miningStrategy +
                " + DBMS_LOGMNR.NO_ROWID_IN_STMT);" +
                "END;";
    }
最终查询结果的语句
SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK FROM V$LOGMNR_CONTENTS WHERE SCN > '2468014' AND SCN <= '2468297' AND ((OPERATION_CODE IN (5,34) AND USERNAME NOT IN ('SYS','SYSTEM','FLINKUSER')) OR (OPERATION_CODE IN (7,36)) OR (OPERATION_CODE IN (1,2,3) AND TABLE_NAME != 'LOG_MINING_FLUSH' AND SEG_OWNER NOT IN ('APPQOSSYS','AUDSYS','CTXSYS','DVSYS','DBSFWUSER','DBSNMP','GSMADMIN_INTERNAL','LBACSYS','MDSYS','OJVMSYS','OLAPSYS','ORDDATA','ORDSYS','OUTLN','SYS','SYSTEM','WMSYS','XDB') AND (REGEXP_LIKE(SEG_OWNER,'^flinkuser$','i')) AND (REGEXP_LIKE(SEG_OWNER || '.' || TABLE_NAME,'^flinkuser.test$','i')) ))
解析查询的结果
         miningView.setFetchSize(connectorConfig.getMaxQueueSize());
         miningView.setFetchDirection(ResultSet.FETCH_FORWARD);
         miningView.setString(1, startScn.toString());
         miningView.setString(2, endScn.toString());
         try (ResultSet rs = miningView.executeQuery()) {
             Duration lastDurationOfBatchCapturing = stopwatch.stop().durations().statistics().getTotal();
             streamingMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing);
             processor.processResult(rs);
             startScn = endScn;
             if (transactionalBuffer.isEmpty()) {
                 LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn);
                 offsetContext.setScn(startScn);
             }
         }
    解析的具体类和方法
    
     io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor#processResult
    
    ,这个类就是完成sql语句的解析,将sql语句中的字段和字段所对应的值解析到两个数组中,包装成一个Entry对象传递给converter去解析,这个的具体解析过程后面再补充。
   
    
    
    使用中遇到的问题
   
    
    
    如果oracle的dbName配置的是SID会出现链接不上的问题。
   
    如果oracle的dbName配置的是SID,而不是service_name,就会出现链接不上的问题,出现这个问题的原因是犹豫oracle的默认链接是一
    
     jdbc:oracle:thin@localhost:1521/service_name
    
    方式拼接,如果要链接SID需要改成
    
     jdbc:oracle:thin:@localhost:1521:sid
    
    方式。在oracle cdc中可以手动指定oracle链接字符串,这样就会根据你指定的url方式去链接。
   
    
    
    基于api的方式指定
   
properties.put("database.url","jdbc:oracle:thin:@localhost:1521:sid");
    
    
    基于flink sql方式指定
   
    基于sql设置属性需要加一个
    
     debezium.
    
    前缀
   
CREATE TABLE GSP_PURCHASE_ORDER(
     ORDER_ID STRING NOT NULL,
     EBELN STRING,
     BSTYP STRING,
     PRIMARY KEY(ORDER_ID) NOT ENFORCED
     ) WITH (
     'connector' = 'oracle-cdc',
     'debezium.database.url' = 'jdbc:oracle:thin:@localhost:1521:RACTEST1',
     'debezium.database.tablename.case.insensitive' = 'false',
     'hostname' = 'localhost',
     'port' = '1521',
     'username' = '****',
     'password' = '****',
     'database-name' = 'RACTEST1',
     'schema-name' = 'GSP_MODULE_UAT',
     'table-name' = 'GSP_PURCHASE_ORDER');
    
    
    oracle11g大小写敏感问题
   
    oracle在查询表的状态的时候默认是会将表名转换为小写,这样查询的表补充日志的时候认为是没有,所以需要指定
    
     database.tablename.case.insensitive
    
    属性改成false,这样就不会自动将表名转换为小写去校验表,这样就能成功的校验出表是否有添加补充日志,对于oracle11g,表名和schema一定要大写。
   
    
    
    基于api的修改方式
   
properties.put("database.tablename.case.insensitive","false");
    
    
    基于sql的修改方式
   
修改方式参考第一个问题的修改方式。
 
