Flink Watermark分配策略

  • Post author:
  • Post category:其他

Flink Watermark分配策略

WaterMark是Flink为了处理Event Time窗口计算提出的一种机制,本质上是一种时间戳。主要用来处理乱序数据或者延迟数据的,这里通常watermark机制结合window来实现(watermark用来触发window的计算

说到Event Time,就要提到Flink的三种时间:

  • 事件时间(Event Time): 事件现实发生的事件,是事件数据本身的一个属性(类似订单的创建时间)。使用这个语义时需要指定数据哪个字段表示时间,必须设置Watermark。使用EventTime时,数据可能是乱序的。

  • 处理时间(Processing Time): 数据流进入到具体某个算子时的系统时间。不需要Watermark机制,只依赖当前节点的操作系统时间

  • 提取时间(Ingestion Time): 数据进入Flink系统的时间,也就是flink读取数据源的时间(source)。从source到下游各个算子可能有多个计算环节,任何一个算子处理速度的快慢都可能影响下游算子的Processing Time。而Ingestion Time定义的是数据流入flink系统的时间,不会被下游的处理速度影响,因此ingestion Time是Event Time和Processing Time的一个折中方案同样不需要设置Watermark,也不需要太多缓存,延迟较低。

旧版本的Watermark分配策略

  • AssignerWithPeriodicWatermarks: 周期性生成watermark(可以取决于event time,或基于处理时间)
  • AssignerWithPunctuatedWatermarks: 为每一个元素生成watermark(每来一个元素都进行一次判断,更消耗性能)

通常情况下会使用第一种机制,除更节省性能外,也可以避免已到元素长时间等待

当watermark完全基于event time时,如果没有元素到达,则watermark不会被更新。当一段时间没有元素到达,那么窗口中已经到达的元素将会等待很久才会被输出

为了避免这种情况,可以使用周期性的watermark分配器,这些分配器不仅仅基于event time进行分配,比如,可以使用一个分配器,当一段时间没有接收到新的event时,则将当前时间作为watermark。

以常用的周期性策略举例

BoundedOutOfOrdernessTimestampExtractor类 是AssignerWithPeriodicWatermarks接口的一个实现类,实现extractTimestamp方法,指定元素的哪个属性作为时间戳,并返回。

socketStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>() {
        @Override
        public long extractTimestamp(String element) {
              return 0;   // 一般这里返回的是事件的一个时间属性
        }
})

新版本的Watermark分配策略

flink1.11版本后建议使用的watermark生成策略,当创建DataStream对象后,使用如下方法指定

assignTimestampsAndWatermarks(WatermarkStrategy[T])

WatermarkStrategy接口重写了两个方法createWatermarkGenerator,createTimestampAssigner,返回下边两个对象:

  • TimestampAssigner:主要指定流中数据元素的某个字段作为事件的时间戳,时间戳的分配是生成watermark的基础

  • WatermarkGenerator:主要负责按照既定方式,基于时间戳生成水位线,在WatermarkGzaienerator接口中有onEvent,onPeriodicEmit方法

源码

WatermarkStrategy的四种策略:

  • 固定乱序长度策略(forBoundedOutOfOrderness)
  • 单调递增策略(forMonotonousTimestamps)
  • 自定义策略(forGenerator):一般内置的分配策略已经能够满足需求,也可以自定义类实现WatermarkStrategy接口
  • 不生成策略(noWatermarks)

WatermarkStrategy的分配策略只需要实现WatermarkGenerator接口即可。该接口中有两个方法:

  • onEvent方法在接收到每一个事件数据时就会触发调用,该方法第一个参数event表示接收的事件数据,第二个参数eventTimeStamp表示事件时间戳,第三个参数output可以output.emitWatermark方法生成一个watermark。

  • onPeriodicEmit方法会周期性触发,比每个元素生成一个watermark效率高,接收一个watermarkOutput类型的参数output,内部可以用output.emitWatermark方法生成一个watermark。周期时间为处理时间,可以调用环境配置的env.setAutoWatermarkInterval() 方法来设置,默认为200ms

WatermarkGenerator

那么,就可以讲解下Flink内置的两种分配策略!!

forBoundedOutOfOrderness

需要注意的是,每来1条事件数据,只是更新了事件流的最大时间戳并不会直接发送水印
只有 {@link ExecutionConfig#getAutoWatermarkInterval()} 周期性间隔到了以后,水印才会被发送。

forMonotonousTimestamps
  • forMonotonousTimestamps使用AscendingTimestampsWatermarks类来实现,AscendingTimestampsWatermarks继承自BoundedOutOfOrdernessWatermarks

  • 相当于没有设置乱序延迟时间的forBoundedOutOfOrderness

案例
// 测试
public class WatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //周期性生成watermarkTest
        env.getConfig().setAutoWatermarkInterval(100);

        // 读取数据源,并行度为 1
        DataStream<Event> stream = env.fromElements(
                        new Event("Mary", "./home", 1000L),
                        new Event("Bob", "./cart", 2000L),
                        new Event("Alice", "./prod?id=100", 3000L),
                        new Event("Alice", "./prod?id=200", 3500L),
                        new Event("Bob", "/prod?id=2", 2500L),
                        new Event("Alice", "./prod?id=300", 3600L),
                        new Event("Bob", "./home", 3000L),
                        new Event("Bob", "./prod?id=1", 2300L),
                        new Event("Bob", "./prod?id=3", 3300L))
                //有序的Watermarks
//                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
//                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
//                            @Override
//                            public long extractTimestamp(Event element, long recordTimestamp) {
//                                return element.timestamp; //默认是毫秒
//                            }
//                        }))

        //乱序的Watermarks
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        env.execute();
    }
}

WatermarkStrategyWithIdleness

WatermarkStrategyWithIdleness是空闲流的实现类,同样也实现了WatermarkGenerator接口,如果某时间段内没有记录在分区内流动,则该分区被标记为空闲,不会阻碍下游操作程序中水印的进度

如果某些分区数据很少,并且可能没有数据,那么空闲流的设置就显得很重要,如果没有空闲,这些流可以暂停整个事件时间的申请进度

//写一个小示例

public class TumblingEventWindowDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketStream = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple3<String,String, Long>> s = env.fromElements(
                Tuple3.of("1","", 1L),
                Tuple3.of("1","", 1L),
                Tuple3.of("1","", 2L),
                Tuple3.of("1","", 2L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String,Long>>() {
                @Override
                public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                    return element.f1;
                }
            })
                .withIdleness(Duration.ofSeconds(50))  //表示50秒内没有元素到达,将该流标记为空闲流

        );

        env.execute();
    }
}

源码解析可以去Flink官网查看。。


版权声明:本文为weixin_47029065原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。