Flink(十)窗口计算

  • Post author:
  • Post category:其他




一、简介

窗口计算:flink的灵魂

窗口计算就是把无界数据流切分为有限大小的“bucket”—>窗口(bucket/window/panel),在窗口上应用计算换上完成计算处理


核心:

窗口的

划分



计算


我们经常需要在一个时间窗口维度上对数据进行聚合,窗口是流处理应用中经常需要解决的问题。Flink的窗口算子为我们提供了方便易用的API,我们可以将数据流切分成一个个窗口,对窗口内的数据进行处理。

大致程序结构:

按照有没有进行keyby分成了两种 不同的处理方式

  1. 首先,我们要决定是否对一个DataStream按照Key进行分组,这一步必须在窗口计算之前进行。
  2. windowAll不对数据流进行分组,所有数据将发送到后续执行的算子单个实例上。
  3. 经过windowAll的算子是不分组的窗口(Non-Keyed Window),它们的原理和操作与Keyed Window类似,唯一的区别在于所有数据将发送给下游的单个实例,或者说下游算子的并行度为1。
// Keyed Window
stream
       .keyBy(...)               <-  按照一个Key进行分组
       .window(...)              <-  将数据流中的元素分配到相应的窗口中
      [.trigger(...)]            <-  指定触发器Trigger(可选)
      [.evictor(...)]            <-  指定清除器Evictor(可选)
       .reduce/aggregate/process()      <-  窗口处理函数Window Function

// Non-Keyed Window
stream
       .windowAll(...)           <-  不分组,将数据流中的所有元素分配到相应的窗口中
      [.trigger(...)]            <-  指定触发器Trigger(可选)
      [.evictor(...)]            <-  指定清除器Evictor(可选)
       .reduce/aggregate/process()      <-  窗口处理函数Window Function

Flink窗口的骨架结构中有两个必须的两个操作:

  1. 使用窗口分配器(WindowAssigner)将数据流中的元素分配到对应的窗口。
  2. 当满足窗口触发条件后,对窗口内的数据使用窗口处理函数(Window Function)进行处理,常用的Window Function有reduce、2. aggregate、process。

    其他的trigger、evictor则是窗口的触发和销毁过程中的附加选项,主要面向需要更多自定义的高级编程者,如果不设置则会使用默认的配置。

    在这里插入图片描述

    上图是窗口的生命周期示意图,假如我们设置的是一个10分钟的滚动窗口,第一个窗口的起始时间是0:00,结束时间是0:10,后面以此类推。当数据流中的元素流入后,窗口分配器会根据时间(Event Time或Processing Time)分配给相应的窗口。相应窗口满足了触发条件,比如已经到了窗口的结束时间,会触发相应的Window Function进行计算。



二、窗口的划分

window assigner–>窗口分配器:把无界数据流怎么做窗口的划分

窗口主要有两种,一种基于时间(Time-based Window),一种基于数量(Count-based Window)。本文主要讨论Time-based Window



2.1Tumbling Windows:滚动窗口

窗口大小是固定的、上一个窗口的结束是下一个窗口的开始(窗口不会重叠)

在这里插入图片描述

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

/**
 * 通过滚动窗口对无界数据流进行窗口的切分
 * 以word count为需求实现
 */
object TumblingWindowsAssignerJob {

  def main(args: Array[String]): Unit = {

    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    //1.通过滚动窗口对无界数据流进行切分
    //2.滚动窗口的大小是5秒钟
    //3.ProcessingTime==》处理时间,以系统时钟作为时间基准====》现在先不管它
    //通过window算子对无界数据流进行窗口的切分:窗口分配器WindowAssigner
    val windowedStream: WindowedStream[(String, Int), String, TimeWindow] = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

    val result: DataStream[(String, Int)] = windowedStream.reduce((sum, elem) => (elem._1, sum._2 + elem._2))

    result.print()

    environment.execute("tumblingWindowsAssignerJob")
  }

}



2.2Sliding Windows:滑动窗口

窗口大小是固定的,窗口有可能有重叠。窗口会有一个滑动步长(上一个窗口开始,往后滑动一定的时间下一个窗口开始)

在这里插入图片描述

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

object SlidingWindowsAssignerJob {

  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)


    //1.window算子实现窗口的划分
    //2SlidingProcessingTimeWindows实现滑动窗口
    //3.滑动窗口有固定大小:10秒钟;窗口的滑动步长5秒钟
    //滑动窗口有可能会重叠
    val windowedStream: WindowedStream[(String, Int), String, TimeWindow] = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))

    val result: DataStream[(String, Int)] = windowedStream.reduce((sum, elem) => (elem._1, sum._2 + elem._2))

    result.print()

    environment.execute("tumblingWindowsAssignerJob")
  }

}



2.3 Session Windows:会话窗口

窗口大小不固定,窗口之间会有一个间隙(gap).会话窗口根据Session gap切分不同的窗口,当一个窗口在大于Session gap的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的。

在这里插入图片描述

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{ProcessingTimeSessionWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

object SessionWindowsAssignerJob {

  def main(args: Array[String]): Unit = {

    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    //window算子实现窗口的划分
    //2.withGap方法里面的时间表示的是间隙。设置了间隙是5秒钟
    var windowedStream = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
    val result: DataStream[(String, Int)] = windowedStream.reduce((sum, elem) => (elem._1, sum._2 + elem._2))

    result.print()

    environment.execute("tumblingWindowsAssignerJob")
  }

}



2.4 Global Window:全局窗口

整个数据流是一个窗口,因为数据流是无界的,所以全局窗口默认情况下,永远不会触发计算数据

在这里插入图片描述

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow}

/**
 * 通过滚动窗口对无界数据流进行窗口的切分
 * 以word count为需求实现
 */
object GlobalWindowsAssignerJob {

  def main(args: Array[String]): Unit = {

    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)


    //window算子进行窗口的划分
    //globalWindows就是把整个无界数据流划分为一个窗口
    //因为无界数据流没有结束,全局窗口也没有结束,所以全局窗口默认的是永远不会触发计算函数
    val windowedStream: WindowedStream[(String, Int), String, GlobalWindow] = keyedStream
      .window(GlobalWindows.create())
      .trigger(CountTrigger.of[GlobalWindow](3))//这是一个数量触发器;窗口中的元素个数到达了规定的值3,就会触发执行

    val result: DataStream[(String, Int)] = windowedStream.reduce((sum, elem) => {
      println("***********")
      (elem._1, sum._2 + elem._2)
    })

    result.print()

    environment.execute("tumblingWindowsAssignerJob")
  }

}



三、窗口计算函数

数据经过了window和WindowAssigner之后,已经被分配到不同的窗口里,接下来,我们要通过窗口函数,在每个窗口上对窗口内的数据进行处理。窗口函数主要分为两种,一种是增量计算,如reduce和aggregate,一种是全量计算,如process。

  1. 增量计算指的是窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据,再保存到窗口中。
  2. 全量计算指的是窗口先缓存该窗口所有元素,等到触发条件后对窗口内的全量元素执行计算



3.1reduceFunction

reduceFunction 做的是

增量

计算—》每过来一个数据,就会执行一次计算。到窗口结束,触发计算的时候,就把计算的结果发送出去

使用reduce算子时,我们要重写一个ReduceFunction。

它接受两个相同类型的输入,生成一个输出,即两两合一地进行汇总操作,生成一个同类型的新元素。在窗口上进行reduce的原理与之类似,只不过多了一个窗口状态数据,这个状态数据的数据类型和输入的数据类型是一致的,是之前两两计算的中间结果数据。当数据流中的新元素流入后,ReduceFunction将中间结果和新流入数据两两合一,生成新的数据替换之前的状态数据。

使用reduce的好处是窗口的状态数据量非常小,实现一个ReduceFunction也相对比较简单,可以使用Lambda表达式,也可以重写函数。缺点是能实现的功能非常有限,因为中间状态数据的数据类型、输入类型以及输出类型三者必须一致,而且只保存了一个中间状态数据,当我们想对整个窗口内的数据进行操作时,仅仅一个中间状态数据是远远不够的。

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object ReduceFunctionJob {

  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    val result: DataStream[(String, Int)] = keyedStream
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .reduce(new MyReduceFunction)

    result.print()

    environment.execute("reduceFunctionJob")
  }

}

class  MyReduceFunction extends ReduceFunction[(String, Int)]{
  override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {

//    println(value1+"***"+value2)

    (value2._1,value2._2+value1._2)
  }
}



3.2 processWindowFunction

processWindowFunction做的是

批量

计算—》每过来一个数据,就会把数据存储起来。到窗口结束,触发计算的时候,把所有的数据获取到进行一次性计算

可以获取到窗口的元数据信息—》窗口的开始时间,结束时间…

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object ProcessWindowFunctionJob {
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)
    val result: DataStream[(String, Int)] = keyedStream
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .process(new MyProcessWindowFunction)
    result.print()
    environment.execute("processWindowFunctionJob")
  }
}
//四个类型参数
//In:输入数据的类型
//Out:输出数据的类型
//Key: Key的类型,keyBy所对应的类型。keyedStream[类型1,类型2]-->类型2就是key的类型
//Window:窗口的类型,分成了两种,一种是时间窗口,一种是全局窗口
//class MyProcessWindowFunction extends ProcessWindowFunction[In,Out,Key,Window]
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Int),(String, Int),String,TimeWindow]{
  //处理数据
  /**
   *
   * @param key
   * @param context 上下文
   * @param elements 这个参数是一个集合,就是把所有落入到窗口的数据先暂存起来;当窗口触发计算函数的时候,再从这个集合中读取所有的数据进行计算处理
   * @param out
   */
  override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
    //1.word count功能的实现
    val count: Int = elements.map(_._2).sum
    //2.获取元数据信息
    val timeWindow: TimeWindow = context.window
    //窗口的开始时间
    val start: Long = timeWindow.getStart
    //窗口的结束时间
    val end: Long = timeWindow.getEnd
    println(s"窗口时间:[${start}-${end}),窗口中的数据:${elements.mkString(",")}")
    //构建返回值
    out.collect((key,count))
  }
}



3.3 既要高效计算又要元数据信息

  • 高效计算就是指增量计算,是由ReduceFunction、aggregateFunction完成的
  • 元数据信息,是由ProcessWindowFunction完成的

    就应该把这两个放在一起使用
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object ReduceAndProcessWindowFunctionJob {

  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    val result: DataStream[String] = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .reduce(new MyReduceFunction2, new MyProcessWindowFunction2)

    result.print()


    environment.execute("processWindowFunctionJob")
  }

}
class MyReduceFunction2 extends ReduceFunction[(String,Int)]{
  override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {

    println(value1+"***"+value2)

    (value2._1,value1._2+value2._2)
  }
}

class MyProcessWindowFunction2 extends ProcessWindowFunction[(String,Int),String,String,TimeWindow]{
  override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = {

    //elements这里面存储的不是一个一个的数据,而是reduceFunction计算完成的结果
    println(elements.mkString(","))//

    val timeWindow: TimeWindow = context.window

    val start: Long = timeWindow.getStart

    val end: Long = timeWindow.getEnd
    println(s"窗口时间:[${start}-${end})")


    val list: List[(String, Int)] = elements.toList
    out.collect(list(0)._1+"的个数是:"+list(0)._2)

  }
}



3.4在窗口计算中使用状态

  • 在窗口中使用状态:一个窗口对应一个状态,做数据的存储
  • 在窗口之间使用状态:把所有窗口的数据汇总起来
import org.apache.flink.api.common.state.{KeyedStateStore, ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object StateJob {
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)
    val result: DataStream[String] = keyedStream
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .process(new MyProcessWindowFunction3)
    result.print()
    environment.execute("stateJob")
  }
}
class MyProcessWindowFunction3 extends ProcessWindowFunction[(String,Int),String,String,TimeWindow]{
  var vsdForEachWindow:ValueStateDescriptor[Int]=_
  var vsdForAllWindows:ValueStateDescriptor[Int]=_
  override def open(parameters: Configuration): Unit = {
    vsdForEachWindow=new ValueStateDescriptor[Int]("vsdfew",createTypeInformation[Int])
    vsdForAllWindows=new ValueStateDescriptor[Int]("vsdfaw",createTypeInformation[Int])
  }
  override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = {
    val count: Int = elements.map(_._2).sum
    //状态的使用
    //没有太大的意义,因为每个窗口的数据都进行了计算
    //针对每一个窗口
    val windowRuntimeContext: KeyedStateStore = context.windowState
    val valueStateForEachWindow: ValueState[Int] = windowRuntimeContext.getState(vsdForEachWindow)
    //全局的
    val globalRuntimeContext: KeyedStateStore = context.globalState
    //把所有窗口的数据都累计起来
    val valueStateForAllWindows: ValueState[Int] = globalRuntimeContext.getState(vsdForAllWindows)
    //从状态中读取之前所有窗口计算的单词的个数
    val oldCount: Int = valueStateForAllWindows.value()
    valueStateForAllWindows.update(oldCount+count)
    println(s"到目前位置,${key}的个数是:"+valueStateForAllWindows.value())
    out.collect(key+"个数是:"+count)

  }
}



3.5Trigger

触发器(Trigger)决定了何时启动Window Function来处理窗口中的数据以及何时将窗口内的数据清理。增量计算窗口函数对每个新流入的数据直接进行聚合,Trigger决定了在窗口结束时将聚合结果发送出去;全量计算窗口函数需要将窗口内的元素缓存,Trigger决定了在窗口结束时对所有元素进行计算然后将结果发送出去。每个窗口都有一个默认的Trigger,比如前文这些例子都是基于Processing Time的时间窗口,当到达窗口的结束时间时,Trigger以及对应的计算被触发。如果我们有一些个性化的触发条件,比如窗口中遇到某些特定的元素、元素总数达到一定数量或窗口中的元素到达时满足某种特定的模式时,我们可以自定义一个Trigger。我们甚至可以在Trigger中定义一些提前计算的逻辑,比如在Event Time语义中,虽然Watermark还未到达,但是我们可以定义提前计算输出的逻辑,以快速获取计算结果,获得更低的延迟。

我们先看Trigger返回一个什么样的结果。当满足某个条件,Trigger会返回一个名为TriggerResult的结果:

  • CONTINUE:什么都不做。
  • FIRE:启动计算并将结果发送给下游,不清理窗口数据。
  • PURGE:清理窗口数据但不执行计算。
  • FIRE_AND_PURGE:启动计算,发送结果然后清理窗口数据。

    在继续介绍Trigger的使用之前,我们可以先了解一下定时器(Timer)的使用方法。我们可以把Timer理解成一个闹钟,使用前先注册未来一个时间,当时间到达时,就像闹钟会响一样,程序会启用一个回调函数,来执行某个时间相关的任务。对于自定义Trigger来说,我们需要考虑注册时间的逻辑,当到达这个时间时,Flink会启动Window Function,清理窗口数据。

WindowAssigner都有一个默认的Trigger。比如基于Event Time的窗口会有一个EventTimeTrigger,每当窗口的Watermark时间戳到达窗口的结束时间,Trigger会发送FIRE。此外,ProcessingTimeTrigger对应Processing Time窗口,CountTrigger对应Count-based窗口。

下面以一个提前计算的案例来解释如何使用自定义的Trigger。在股票或任何交易场景中,我们比较关注价格急跌的情况,默认窗口长度是60秒,如果价格跌幅超过5%,则立即执行Window Function,如果价格跌幅在1%到5%之内,那么10秒后触发Window Function。

class MyTrigger extends Trigger[StockPrice, TimeWindow] {

  override def onElement(element: StockPrice,
                         time: Long,
                         window: TimeWindow,
                         triggerContext: Trigger.TriggerContext): TriggerResult = {
    val lastPriceState: ValueState[Double] = triggerContext.getPartitionedState(new ValueStateDescriptor[Double]("lastPriceState", classOf[Double]))

    // 设置返回默认值为CONTINUE
    var triggerResult: TriggerResult = TriggerResult.CONTINUE

    // 第一次使用lastPriceState时状态是空的,需要先进行判断
    // 状态数据由Java端生成,如果是空,返回一个null
    // 如果直接使用Scala的Double,需要使用下面的方法判断是否为空
    if (Option(lastPriceState.value()).isDefined) {
      if ((lastPriceState.value() - element.price) > lastPriceState.value() * 0.05) {
        // 如果价格跌幅大于5%,直接FIRE_AND_PURGE
        triggerResult = TriggerResult.FIRE_AND_PURGE
      } else if ((lastPriceState.value() - element.price) > lastPriceState.value() * 0.01) {
        val t = triggerContext.getCurrentProcessingTime + (10 * 1000 - (triggerContext.getCurrentProcessingTime % 10 * 1000))
        // 给10秒后注册一个Timer
        triggerContext.registerProcessingTimeTimer(t)
      }
    }
    lastPriceState.update(element.price)
    triggerResult
  }

  // 我们不用EventTime,直接返回一个CONTINUE
  override def onEventTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }

  override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.FIRE_AND_PURGE
  }

  override def clear(window: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
    val lastPrice: ValueState[Double] = triggerContext.getPartitionedState(new ValueStateDescriptor[Double]("lastPrice", classOf[Double]))
    lastPrice.clear()
  }
}

senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val input: DataStream[StockPrice] = ...

val average = input
      .keyBy(s => s.symbol)
      .timeWindow(Time.seconds(60))
      .trigger(new MyTrigger)
      .aggregate(new AverageAggregate)



3.6Evictor

evict:剔除器,在计算函数触发计算之前/之后,可以把窗口中的数据剔除掉

keyedStream.window.

evictor(evictor)

Flink提供了Evictor.Flink提供了很多具体的Evictor实现类

  • CountEvictor:数量剔除器,当窗口中数据的数量达到规定的值,进行数据的剔除

    如果Flink提供的Evictor不能满足业务需要,可以自定义Evictor
  • 写一个类,实现Evictor接口
  • 重写里面的方法
/**
    * T为元素类型
    * W为窗口
  */
public interface Evictor<T, W extends Window> extends Serializable {
    /**
     * 在Window Function前调用
   */
    void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

    /**
     * 在Window Function后调用
     */
    void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
    /**
     * Evictor的上下文
     */
    interface EvictorContext {
        long getCurrentProcessingTime();
        MetricGroup getMetricGroup();
        long getCurrentWatermark();
    }
}

evictBefore和evictAfter分别在Window Function之前和之后被调用,窗口的所有元素被放在了Iterable<TimestampedValue>,我们要实现自己的清除逻辑。当然,对于增量计算的ReduceFunction和AggregateFunction,我们没必要使用Evictor。

Flink提供了几个实现好的Evictor:

CountEvictor保留一定数目的元素,多余的元素按照从前到后的顺序先后清理。

TimeEvictor保留一个时间段的元素,早于这个时间段的元素会被清理。



版权声明:本文为weixin_44079636原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。