Flink–Source(Kafka)

  • Post author:
  • Post category:其他

从kafka读取数据

依赖

kafka和flink是非常契合的。
若flink想和kafka建立连接,需要导入新的依赖

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
     <version>1.10.1</version>
</dependency>

addSource

之前从文件(readTextFile)、集合(fromCollection)属于特殊化方法。
当调用这种方法的时候,数据源已经被限定死了。

和kafka建立连接需要使用addSource,这是一般法方法,它对应的源是不固定的。

第一种实现方式

addSource[T](function:SourceContext[T] =>Unit):DataStream[T]

需要传入一个匿名函数,传入一个source上下文,没有返回值。
因为没有返回值,所以里边会生成数据并利用source上下文发出来。
最终得到一个DataStream[T]。后续都是关于DataStream[T]的转换
自己实现这么一个函数无疑是困难的,还有另外一中方式。

第二种实现方式

addSource[T](function:SourceFunction[T] ):DataStream[T]

首先,这里的SourceFunction是一个java接口。
所以这里需要一个 实现了此接口的类。是一个类似于函数功能的函数类
然后,SourceFunction接口中最核心的就是 SourceContext
接口中必须要实现的是两个方法cancel 和run
cancel方法需要在想要停止发送数据的时候调用。
run方法主要用于源源不断的读取数据。

与kafka建立连接

要和kafka建立连接。首先引入连接器依赖
连接器依赖已经实现了flink-kakfal连接类
之后addsource中直接创建一个连接类
因为要接收kafka数据 所以选择消费者

继承关系

FlinkKafkaConsumer11
FlinkKafkaConsumer10
FlinkKafkaConsumer09
FlinkKafkaConsumerBase
RichParallelSourceFunction
ParallelSourceFunction
SourceFunction
发现最终是继承于SourceFunction

继承关系

[ String ] 泛型 :表示从kafka读取的数据类型
( string topic , scheme 值的反序列化器,consumer的配置项 )
new SimpleStringSchema() 反序列器会直接当成普通的字符串去解析。

    val properties = new Properties()
    properties.setProperty("bootstrap.servers","localhost:9092")
    properties.setProperty("group.id","consumer-group")

    val kafkaData: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("Sennir",new SimpleStringSchema(),properties))

之后就是打印输出

完整代码

package com.erke.apitest

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaConsumer09}

object SourceTest {
  def main(args: Array[String]): Unit = {

    //创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers","localhost:9092")
    properties.setProperty("group.id","consumer-group")
    
	// 与kafka建立连接
    val kafkaData: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("Sennir",new SimpleStringSchema(),properties))
	
	//打印
    kafkaData.print()

    //执行
    env.execute("apitest")


  }
}

版权声明:本文为MINGZHEFENG原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。