一、基于事件时间的窗口操作
窗口在10分钟分组聚合,每5分钟触发一次结果表,如上图,数据在12:00-12:05来临,在12:05会进行结果统计。数据在12:05-12:10到达时,不但要统计12:00-12:10的数据,还需要统计12:05-12:15的数据。在绿色结果表中可以清晰的看到加粗的横线将不同窗口的结果进行划分。
在Java中使用如下代码进行设置
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word")
).count();
二、处理延迟数据和水印
根据上图方案,当数据会延迟到达。
为了解决延迟到达的问题,spark 维护了一个窗口,如上图,12:04需要聚合的数据在12:11到达,但是该数据依然在12:00-12:10窗口中进行聚合。
在spark 2.1中 引入了
水印
技术,引擎自动追踪数据的当前事件事件,并相应的清除状态。可以通过指定事件时间列和根据事件时间预计数据延迟的阈值来定义查询的水印。对于在时间T结束的特定窗口,引擎将保持状态并允许后期数据更新状态,直到(引擎看到的最大事件时间-后期阈值> T)。换句话说,阈值内的延迟数据将被聚合,但超过阈值的晚数据将被删除。
下述例子Java代码:
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"),
col("word"))
.count();
在这个例子中在,我们在列“timestamp”上定义了查询水印,定于允许数据延迟的阈值:10分钟。
如上图所示:在橙色实线上的延迟数据都会被计算在之前窗口中,但是在之下的代表超出了延迟阈值,进行清除。
根据上图,竖轴代表事件时间,横轴代表事件实际到达的时间,横轴和竖轴时间一致代表准时到达。在结果表触发点,黄色的水印会进行计算,来确定在接下来的5分钟内是否将延迟的数据进行清除或者时计算。
有些接收器(例如文件)可能不支持更新模式要求的细粒度更新。为了使用它们,我们还支持Append模式,在该模式中只有最终计数被写入sink。
如上图:不保留中间结果。
三、清除聚合状态的水印条件
清除聚合状态,水印必须满足下述条件:
- 输出模式必须时追加或者更细。完整模式需要保留所有的聚合数据,因此不能使用水印去除中间状态
- 聚合必须具有事件时间列,或者事件时间列上的窗口。
-
聚合使用的时间戳和调用
withWatermark()
方法使用的参数书时统一个,否则无效。例如
df.withWatermark("time", "1 min").groupBy("time2").count()
是无效的。 -
在对水印聚合之前,必须调用
withWatermark
,例如
df.groupBy("time").count().withWatermark("time", "1 min")
是无效的,在追加模式中。
四、基于水印的聚合语义保证
- 2小时的水印延迟(设置withWatermark)保证引擎永远不会丢弃延迟少于2小时的任何数据。换句话说,在事件时间方面,到那时为止处理的最新数据低于2小时的任何数据都保证被聚合。
- 然而,保证只在一个方向上严格。延迟超过2小时的数据不保证被丢弃;它可能会聚集,也可能不会聚集。数据延迟越久,引擎处理数据的可能性就越小。
总结
本文只要讲述了一些spark水印的一些技术。上述文章均来源于spark官网,如有理解错误指出,还请指正,一块进步。