1 方案
1.创建记录用户消费的ConsumerMess类:包括三个属性:用户ID、消费金额、消费时间
case class ConsumerMess(userId:Int, spend:Double, Time:Long)
2.要求采用EventTime时间语义,可通过以下设置:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
3.考虑到数据存在乱序的可能性,需要使用Watermark机制处理乱序数据,并设置允许的最大延迟时间。
- 使用固定时间间隔的Timestamp Assigner指定时间戳(ConsumerMess.Time)和Watermark(延迟2s)。
- 使用翻滚窗口,窗口长度为T=10s。
data.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ConsumerMess](Time.seconds(2)) {
override def extractTimestamp(element: ConsumerMess): Long = {
element.Time
}
})
.keyBy(_.userId)
.timeWindow(Time.seconds(10))
2 代码
package org.ourhome.streamapi
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @Author Do
* @Date 2020/4/25 21:33
*/
object EventTImeTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val socketData: DataStream[String] = env.socketTextStream("192.168.237.128", 9999)
socketData.print("input ")
socketData.map(line => {
val str: Array[String] = line.split(",")
ConsumerMess(str(0).toInt, str(1).toDouble, str(2).toLong)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ConsumerMess](Time.seconds(2)) {
override def extractTimestamp(element: ConsumerMess): Long = {
element.Time
}
})
.keyBy(_.userId)
.timeWindow(Time.minutes(10))
.reduce((a, b) => ConsumerMess(a.userId, a.spend + b.spend, b.Time))
.print("out ")
env.execute()
}
case class ConsumerMess(userId:Int, spend:Double, Time:Long)
}
3 测试
Socket发送数据:
第一个字段代表用户123,第二个字段为用户消费金额,第三个字段时间戳。
为方便测试,这里从2020-04-25 12:00:01(1587787201000)开始记录第一条数据,且第1分钟消费100,第2分钟消费200……以此类推:
123,100,1587787201000 2020-04-25 12:00:01
123,400,1587787204000 2020-04-25 12:00:04
123,300,1587787203000 2020-04-25 12:00:03
123,500,1587787205000 2020-04-25 12:00:05
123,200,1587787202000 2020-04-25 12:00:02
123,600,1587787206000 2020-04-25 12:00:06
123,800,1587787208000 2020-04-25 12:00:08
123,700,1587787207000 2020-04-25 12:00:07
123,1000,1587787210000 2020-04-25 12:00:10
123,900,1587787209000 2020-04-25 12:00:09
123,1100,1587787211000 2020-04-25 12:00:11
123,1200,1587787212000 2020-04-25 12:00:12
可以观察到数据是乱序到达Flink。启动Flink程序后,1到11分的数据进入窗口,并以进入的顺序被打印:(input为打印的输入数据)
input > 123,100,1587787201000
input > 123,400,1587787204000
input > 123,300,1587787203000
input > 123,500,1587787205000
input > 123,200,1587787202000
input > 123,600,1587787206000
input > 123,800,1587787208000
input > 123,700,1587787207000
input > 123,1000,1587787210000
input > 123,900,1587787209000
input > 123,1100,1587787211000
input > 123,1200,1587787212000
当
123,1200,1587787212000
这条数据进入窗口,其携带的时间戳为
1587787212000
(
2020-04-25 12:00:12
),延迟时间参数为2s,此时Watermark更新为12-2=10,也就是这条数据对应的Watermark是10s(Watermark(10)),就意味着10s前的数据都已经到了,且窗口内有数据,所以可以触发窗口计算并销毁窗口:(output为打印的结果)
output > ConsumerMess(123,4500.0,1587787209000)
打印出的结果显示,用户123窗口期内共消费4500元。通过计算可知4500是9分钟内消费的金额,但是我们的窗口长度明明是10,这是否正确呢?答案是正确的!
4 timeWindow 窗口边界问题
4.1 getWindowStartWithOffset解析
对于上述现象,自然而言有个疑问:窗口的起始时间是如何确定的呢???顺着源码看一看:
打开timeWindow函数源码,其调用的是javaStream的
timeWindow
方法:
def timeWindow(size: Time): WindowedStream[T, K, TimeWindow] = {
new WindowedStream(javaStream.timeWindow(size))
}
接着,看一下timeWindow其内部。由于我们使用的是EventTIme时间语义,所以调用的是
TumblingEventTimeWindows.of(size)
方法:
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingEventTimeWindows.of(size));
}
}
这里有个start变量,也就是窗口的起始时间,start+size即窗口的结束时间。起始时间的计算调用的是
TimeWindow.getWindowStartWithOffset
方法:
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
最后来看看
getWindowStartWithOffset
,该方法有3个参数:
- timestamp:启动窗口的时间,单位毫秒
- offset:时间的偏移量,针对时区问题
- windowSize:窗口长度
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
现在,我们根据案例验证一下窗口的起始时间:
timestamp = 1587787201000 (2020-04-25 12:00:01)
offset = 0
windowSize = 10000
带入公式:
start = timestamp - (timestamp - offset + windowSize) % windowSize
得:
start = 1587787200000 (2020-04-25 12:00:00)
end = start + windowSize = 1587787210000 (2020-04-25 12:00:10)
所以:
-
从2020-04-25 12:00:00开始计算的滚动窗口的起始时间是从2020-04-25 12:00:00到2020-04-25 12:00:10(注意:左闭右开!),也就是
[12:00:00, 12:00:10)
-
再加上watermark要求的延迟2s,也就是
[12:00:00, 12:00:12)
即第一次窗口统计的0-9分钟的数据,也就解释了上一个部分的结果4500的合理性了!
4.2 继续验证
继续输入:
123,1300,1587787213000
123,1500,1587787215000
123,1700,1587787217000
123,2000,1587787220000
123,2100,1587787221000
123,2200,1587787222000
输出:
input > 123,1300,1587787213000
input > 123,1500,1587787215000
input > 123,1700,1587787217000
input > 123,2000,1587787220000
input > 123,2100,1587787221000
input > 123,2200,1587787222000
output > ConsumerMess(123,7800.0,1587787217000)
分析:
按照上面的分析,此次窗口的起始时间应该为
[12:00:10, 12:00:20)
,那么总消费金额应该为:1000+1100+1200+1300+1500+1700=7800,跟输出的结果是一致的!也就是说进入第一个窗口的12:00:10和12:00:11的数据会在第二个窗口触发计算时得到正确的计算,同时未进入第二个窗口的12:00:14、12:00:16、12:00:18和12:00:19的数据将会被遗弃。