flink watermark源码解析

  • Post author:
  • Post category:其他



目录


watermark生成分为两种


图谱:


第一种:固定间隔时间 也是最常用的一种


第二种: 标记生成


不考虑SourceFunction生成wateramrk的方式,只分析DataStream.assign- TimestampsAndWatermarks方式的watermark生成

watermark生成分为两种

  1. 第一种为固定间隔时间生成,间隔通过env参数设置
  2. 第二种为标记生成可根据数据规则自定义什么时间生成watermark

图谱:

第一种:固定间隔时间 也是最常用的一种

测试代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dc  = env.socketTextStream("localhost", 9999,'\n',0);
dc.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.minutes(3)) {
    @Override
    public long extractTimestamp(String element) {
        Watermark currentWatermark = getCurrentWatermark();
        log.error("当前watermark:"+currentWatermark.getTimestamp());
        String[] split = element.split(",");
        return Long.parseLong(split[1]);
    }
}).print();

env.execute();

分析BoundedOutOfOrdernessTimestampExtractor类的源码,首先调用构造器 判断传入的 时间是不是<0, 然后将这个步长保存到maxOutOfOrderness(这里是3分钟换算为毫秒)中 初始化当前最大时间戳为long的最小值(-9223372036854595808)

public BoundedOutOfOrdernessTimestampExtractor2r(Time maxOutOfOrderness) {
    log.error("gouzao");
    if (maxOutOfOrderness.toMilliseconds() < 0) {
        throw new RuntimeException(
                "Tried to set the maximum allowed "
                        + "lateness to "
                        + maxOutOfOrderness
                        + ". This parameter cannot be negative.");
    }
    this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();//watermark的步长
    this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;//初始化当前最大时间戳为long的最小值+步长
}

接下来调用getCurrentWatermark 来计算watermark 这个方法是循环调用 循环时间可以通过env设置

@Override
public final Watermark getCurrentWatermark() {
    log.error("getCurrentWatermark,currentMaxTimestamp:{},maxOutOfOrderness:{},lastEmittedWatermark:{}",currentMaxTimestamp,maxOutOfOrderness,lastEmittedWatermark);
    // this guarantees that the watermark never goes backwards.
  //如果当前最大时间戳-步长>=上次的watermark 那就使用新的lastEmittedWatermark作为当前的
  //watermark,所以本质是:这个lastEmittedWatermark就是watermark包装的时间戳
    long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
    if (potentialWM >= lastEmittedWatermark) {
        lastEmittedWatermark = potentialWM;
    }
    return new Watermark(lastEmittedWatermark);
}

当来了一条数据时数据后会立即调用extractTimestamp方法,每次来的数据如果 事件时间大于当前 最大时间戳就重新赋值,记录当前最大时间戳

然后,又开始循环调用getCurrentWatermark,此时如果最大时间-步长后的时间 大于了上一次的watermark 则替换

@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
    log.error("extractTimestamp");
    long timestamp = extractTimestamp(element);
    if (timestamp > currentMaxTimestamp) {
        currentMaxTimestamp = timestamp;//每次来的数据如果 事件时间大于当前 最大时间戳就重新赋值,记录当前最大时间戳
    }
    return timestamp;
}

所以说对于watermark的时间戳而言是一个循环检测并修改的逻辑, getCurrentWatermark()方法一直循环,extractTimestamp方法来了数据才调用


整个流程有关键三个参数

1,当前流时间里数据的最大时间戳,currentMaxTimestamp,每次来数据的时候回调用方法来决定是不是更新这个数据

2,watermark的步长,maxOutOfOrderness,这个是固定的,每次计算当前watermark时要用当前最大的时间戳-步长

3,lastEmittedWatermark 实际上指的是实际的watermark,每次循环调用getCurrentWatermark的时候,都会用currentMaxTimestamp-步长来决定是不是要把上一次的watermark替换掉

测试结果:红色是getCurrentWatermark一直循环调用 100ms一次 不间断  黄色是来了数据后的调用

第二种: 标记生成

没有提供实现类,需要继承接口来自定义实现

做个最简单的测试能跑通就行:

static class PunctuatedTest implements AssignerWithPunctuatedWatermarks<String> {
  private  long waterMark=0L;
    @Override
    public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
       log.error("调用checkAndGetNextWatermark,waterMark:{}",waterMark);
        String[] split = lastElement.split(",");
        if(split[0].equals("1")){
            long end = Long.parseLong(split[1]);
            waterMark=end>waterMark?end:waterMark;
        }
        return new Watermark(waterMark);//当满足上述条件的时候生成watermark,也就是说自定义逻辑中可以包含步长的规则来实现自定义生成watermark
    }


    public long getWaterMarkTm (){
        return waterMark;
    }


    @Override
    public long extractTimestamp(String element, long recordTimestamp) {
        log.error("调用extractTimestamp");
        return 0;
    }
}

测试结果是checkAndGetNextWatermark方法和extractTimestamp方法是在数据每来一条都调用一次来判断是否要生成watermark,具体两个方法要怎么实现就看具体要实现什么逻辑了



版权声明:本文为qq_45395046原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。