目录
不考虑SourceFunction生成wateramrk的方式,只分析DataStream.assign- TimestampsAndWatermarks方式的watermark生成
watermark生成分为两种
- 第一种为固定间隔时间生成,间隔通过env参数设置
- 第二种为标记生成可根据数据规则自定义什么时间生成watermark
图谱:
第一种:固定间隔时间 也是最常用的一种
测试代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dc = env.socketTextStream("localhost", 9999,'\n',0);
dc.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.minutes(3)) {
@Override
public long extractTimestamp(String element) {
Watermark currentWatermark = getCurrentWatermark();
log.error("当前watermark:"+currentWatermark.getTimestamp());
String[] split = element.split(",");
return Long.parseLong(split[1]);
}
}).print();
env.execute();
分析BoundedOutOfOrdernessTimestampExtractor类的源码,首先调用构造器 判断传入的 时间是不是<0, 然后将这个步长保存到maxOutOfOrderness(这里是3分钟换算为毫秒)中 初始化当前最大时间戳为long的最小值(-9223372036854595808)
public BoundedOutOfOrdernessTimestampExtractor2r(Time maxOutOfOrderness) {
log.error("gouzao");
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException(
"Tried to set the maximum allowed "
+ "lateness to "
+ maxOutOfOrderness
+ ". This parameter cannot be negative.");
}
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();//watermark的步长
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;//初始化当前最大时间戳为long的最小值+步长
}
接下来调用getCurrentWatermark 来计算watermark 这个方法是循环调用 循环时间可以通过env设置
@Override
public final Watermark getCurrentWatermark() {
log.error("getCurrentWatermark,currentMaxTimestamp:{},maxOutOfOrderness:{},lastEmittedWatermark:{}",currentMaxTimestamp,maxOutOfOrderness,lastEmittedWatermark);
// this guarantees that the watermark never goes backwards.
//如果当前最大时间戳-步长>=上次的watermark 那就使用新的lastEmittedWatermark作为当前的
//watermark,所以本质是:这个lastEmittedWatermark就是watermark包装的时间戳
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
当来了一条数据时数据后会立即调用extractTimestamp方法,每次来的数据如果 事件时间大于当前 最大时间戳就重新赋值,记录当前最大时间戳
然后,又开始循环调用getCurrentWatermark,此时如果最大时间-步长后的时间 大于了上一次的watermark 则替换
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
log.error("extractTimestamp");
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;//每次来的数据如果 事件时间大于当前 最大时间戳就重新赋值,记录当前最大时间戳
}
return timestamp;
}
所以说对于watermark的时间戳而言是一个循环检测并修改的逻辑, getCurrentWatermark()方法一直循环,extractTimestamp方法来了数据才调用
整个流程有关键三个参数
1,当前流时间里数据的最大时间戳,currentMaxTimestamp,每次来数据的时候回调用方法来决定是不是更新这个数据
2,watermark的步长,maxOutOfOrderness,这个是固定的,每次计算当前watermark时要用当前最大的时间戳-步长
3,lastEmittedWatermark 实际上指的是实际的watermark,每次循环调用getCurrentWatermark的时候,都会用currentMaxTimestamp-步长来决定是不是要把上一次的watermark替换掉
测试结果:红色是getCurrentWatermark一直循环调用 100ms一次 不间断 黄色是来了数据后的调用
第二种: 标记生成
没有提供实现类,需要继承接口来自定义实现
做个最简单的测试能跑通就行:
static class PunctuatedTest implements AssignerWithPunctuatedWatermarks<String> {
private long waterMark=0L;
@Override
public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
log.error("调用checkAndGetNextWatermark,waterMark:{}",waterMark);
String[] split = lastElement.split(",");
if(split[0].equals("1")){
long end = Long.parseLong(split[1]);
waterMark=end>waterMark?end:waterMark;
}
return new Watermark(waterMark);//当满足上述条件的时候生成watermark,也就是说自定义逻辑中可以包含步长的规则来实现自定义生成watermark
}
public long getWaterMarkTm (){
return waterMark;
}
@Override
public long extractTimestamp(String element, long recordTimestamp) {
log.error("调用extractTimestamp");
return 0;
}
}
测试结果是checkAndGetNextWatermark方法和extractTimestamp方法是在数据每来一条都调用一次来判断是否要生成watermark,具体两个方法要怎么实现就看具体要实现什么逻辑了