1.11.0 flinksql自定义kafka源(支持多个topic)

  • Post author:
  • Post category:其他


目前flink 1.11.0还不支持多个topic的kafka连接器 , 要实现这个功能需要自定义源,这里是基于已有的kafka connector

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.11.0</version>
</dependency>

flink1.11版本相比之前版本 , 重构了自定义connector方法 , 引入DynamicTable,详情见

官网

1 创建connectorFactory,继承KafkaDynamicTableFactoryBase

public class TopicsKafkaTableConnecterFactory extends KafkaDynamicTableFactoryBase {
    public static final String IDENTIFIER = "topicsKafka";

    public TopicsKafkaTableConnecterFactory() {}

    @Override
    protected KafkaDynamicSourceBase createKafkaTableSource(DataType producedDataType, String topic, Properties properties, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis) {
        return new TopicsKafkaTableSource(producedDataType, topic, properties, decodingFormat, startupMode, specificStartupOffsets, startupTimestampMillis);
    }

    @Override
    protected KafkaDynamicSinkBase createKafkaTableSink(DataType consumedDataType, String topic, Properties properties, Optional<FlinkKafkaPartitioner<RowData>> partitioner, EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
        return new KafkaDynamicSink(consumedDataType, topic, properties, partitioner, encodingFormat);
    }

    @Override
    public String factoryIdentifier() {
        return "topicsKafka";
    }
}

2 创建TopicsKafkaTableSource,继承KafkaDynamicSourceBase

public class TopicsKafkaTableSource extends KafkaDynamicSourceBase {

    protected TopicsKafkaTableSource(DataType outputDataType, String topic, Properties properties, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis) {
        super(outputDataType, topic, properties, decodingFormat, startupMode, specificStartupOffsets, startupTimestampMillis);
    }


    @Override
    public DynamicTableSource copy() {
        return new KafkaDynamicSource(this.outputDataType, this.topic, this.properties, this.decodingFormat, this.startupMode, this.specificStartupOffsets, this.startupTimestampMillis);
    }

    @Override
    public String asSummaryString() {
        return "Kafka";
    }

    @Override
    protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(String s, Properties properties, DeserializationSchema<RowData> deserializationSchema) {
        //逗号切割topic名字
        List<String> topics = Arrays.asList(s.split(","));
        return new FlinkKafkaConsumer(topics, deserializationSchema, properties);
    }

}

3 配置

在项目资源文件夹下创建文件

META-INF/services/org.apache.flink.table.factories.Factory

文件内容:

指定上面创建的TopicsKafkaTableConnecterFactory 的全路径

4flinksql使用

CREATE TABLE mySource (
		a bigint,
		b bigint
	) WITH (
	'connector' = 'topicsKafka',
	'topic' = 'mytesttopic',
	'properties.bootstrap.servers' = '172.17.0.2:9092',
	'properties.group.id' = 'flink-test-cxy',
	'scan.startup.mode' = 'latest-offset',
	'format' = 'json'
	);
CREATE TABLE mysqlsink (
		id bigint,
		b varchar
	)
	with (
	    'connector' = 'print'
		/*'connector.type' = 'jdbc',
		'connector.url' = 'jdbc:mysql://...' ,
		'connector.username' = 'root' ,
		'connector.password' = 'root',
		'connector.table' = 'mysqlsink' ,
		'connector.driver' = 'com.mysql.cj.jdbc.Driver' ,
		'connector.write.flush.interval' = '5s',
		'connector.write.flush.max-rows' = '1'*/
	);
insert into mysqlsink select a , cast(b as varchar) b from mySource;



版权声明:本文为u010034713原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。