Apache Flink(三):Flink对接DataSource

  • Post author:
  • Post category:其他




DataSource

DataSource指定了流计算的输入,用户可以通过flink运行环境streamExecutionEnvironment的addSource()方法添加数据源,Flink已经预先实现了一些DataSource的实现,如果用户需要自定义自己的数据源实现可以通过实现SourceFunction接口(非并行Source)或者ParallelSourceFunction 接口(实现并行Source)或者继承RichParallelSourceFunction (实现并行Source并且支持状态操作).



File Based:以文本文件作为输入源

readTextFile(path) – 读取文本文件,底层通过TextInputFormat 一行行读取文件数据,返回是一个DataStream[String] – 仅仅处理一次

    //1.创建StreamExecutionEnvironment
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    
    //2.创建DataStream -细化
    val filePath="file:///D:\\data"
    val dataStream: DataStream[String] = fsEnv.readTextFile(filePath)
    //3.对数据做转换
    dataStream.flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .sum(1)
    .print()
    
    fsEnv.execute("FlinkWordCountsQuickStart")

readFile(fileInputFormat, path) – 读取文本文件,底层指定输入格式 – 仅仅处理一次

    //1.创建StreamExecutionEnvironment
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    
    //2.创建DataStream -细化
    val filePath="file:///D:\\data"
    val inputFormat = new TextInputFormat(null)
    val dataStream: DataStream[String] = fsEnv.readFile(inputFormat,filePath)
    //3.对数据做转换
    dataStream.flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .sum(1)
    .print()
    
    fsEnv.execute("FlinkWordCountsQuickStart")

readFile(fileInputFormat, path, watchType, interval, pathFilter) – 以上两个方法底层调用都是该方法。

     //1.创建StreamExecutionEnvironment
        val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    
        //2.创建DataStream -细化
        val filePath="file:///D:\\data"
        val inputFormat = new TextInputFormat(null)
    
        inputFormat.setFilesFilter(new FilePathFilter {
          override def filterPath(path: Path): Boolean = {
            if(path.getName().startsWith("1")){ //过滤不符合的文件
              return true
            }
            false
          }
        })
        val dataStream: DataStream[String] = fsEnv.readFile(inputFormat,filePath,
          FileProcessingMode.PROCESS_CONTINUOUSLY,1000)
        //3.对数据做转换
        dataStream.flatMap(_.split("\\s+"))
          .map((_,1))
          .keyBy(0)
          .sum(1)
          .print()
    
        fsEnv.execute("FlinkWordCountsQuickStart")
    

定期的扫描文件,如果文件内容被修改了,该文件会被完整的重新读取。因此可能会产生重复计算。



Collection:以集合作为数据源

    //1.创建StreamExecutionEnvironment
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    
    //2.创建DataStream -细化
    val dataStream: DataStream[String] = fsEnv.fromCollection(List("this is a demo","hello world"))
    //3.对数据做转换
    dataStream.flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .sum(1)
    .print()
    
    fsEnv.execute("FlinkWordCountsQuickStart")



自定义数据源

class UserDefineDataSource extends ParallelSourceFunction[String]{

  val lines = Array("Hello Flink", "Hello Spark", "Hello Scala")

  @volatile
  var isRunning = true

  // 运行
  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    while (isRunning){
      Thread.sleep(1000)
      sourceContext.collect(lines(new Random().nextInt(lines.length)))
    }
  }

  // 关闭
  override def cancel(): Unit = {
    isRunning = false
  }

}
object FlinkUserDefineSource {

  def main(args: Array[String]): Unit = {

    // 1.创建StreamExecutionEnvironment
    val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment

    // 使用用户自定义的数据源
    val dataStream : DataStream[String] = flinkEnv.addSource[String](
      new UserDefineDataSource
    )

    dataStream
      .flatMap(_.split("\\s+"))
      .map((_, 1))
      .keyBy(0)
      .sum(1)
      .print()

    // 执行计算
    flinkEnv.execute("FlinkWordCount")

  }

}



Flink对接Kafka数据源

引入相关依赖

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>1.8.1</version>
    </dependency>

实例代码

object FlinkKafkaSourceSimple{

  def main(args: Array[String]): Unit = {

    // 1.创建StreamExecutionEnvironment
    val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment

    // 2.创建DataStream
    val prop = new Properties()
    prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Spark:9092")
    prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g1")

    // 只能处理kafka中的value
    val dataStream : DataStream[String] = flinkEnv.addSource[String](
      new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), prop)
    )

    dataStream
        .flatMap(_.split("\\s+"))
        .map((_, 1))
        .keyBy(0)
        .sum(1)
        .print()

    // 执行计算
    flinkEnv.execute("FlinkWordCount")

  }

}

上述代码只能获取value信息,如果用户需要获取key/offset/partition等其他信息用户需要定制KafkaDeserializationSchema

获取Kafka Record元数据信息

   class UserDefineKafkaSchema extends KafkaDeserializationSchema[(Int, Long, String, String, String)]{

  override def isEndOfStream(t: (Int, Long, String, String, String)): Boolean = {
    false
  }

  override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]):
  (Int, Long, String, String, String) = {
    // 防止key为空
    if(consumerRecord.key() == null){
      (consumerRecord.partition(), consumerRecord.offset(), consumerRecord.topic(),
        "", new String(consumerRecord.value()))
    }else{
      (consumerRecord.partition(), consumerRecord.offset(), consumerRecord.topic(),
        StringUtils.arrayToString(consumerRecord.key()), new String(consumerRecord.value()))
    }

  }

  //告知返回值类型
  override def getProducedType: TypeInformation[(Int, Long, String, String, String)] = {
    createTypeInformation[(Int, Long, String, String, String)]
  }

}
    

实例代码


object FlinkKafkaSourceComplex {

  def main(args: Array[String]): Unit = {

    // 1.创建StreamExecutionEnvironment
    val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment

    // 2.创建DataStream
    val prop = new Properties()
    prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Spark:9092")
    prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g1")

    // 可以处理kafka中所有关键的信息
    val dataStream = flinkEnv.addSource[(Int, Long, String, String, String)](
      new FlinkKafkaConsumer[(Int, Long, String, String, String)]("flink", new UserDefineKafkaSchema, prop)
    )

    dataStream.print()

    // 执行计算
    flinkEnv.execute("FlinkWordCount")

  }

}



版权声明:本文为qq_38310603原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。