有状态的Stream应用开发
流处理,以前我们的认知是来一条数据就处理一条,例如:解析某种编码的事件,将事件转换为更易读、已处理的编码格式。而当今天,我们要开发一个流式应用时,往往需要进行事件中间状态的存储。例如:我们需要每隔两秒计算出来最近1小时的访问流量。
Flink的特点就是有状态的流式处理。而有状态会让事情变得复杂起来。当流处理有状态时,一旦出现故障,就需要将出错之前的状态恢复回来,并且在对Flink集群进行扩展时,我们也需要将状态进行重新分配。还有一种需求,当一些计算指标存储在状态中时,我们希望通过HTTP这类方式,能够读取到状态中的数据,以用图形化的方式展示出来。
所以,对于Flink状态的开发、管理,是尤其重要的。
在Flink中有三种不同类型的State,一种是Keyed State、一种是Operator State,还有一种是Broadcast state。
Keyed State
Keyed State介绍
State是一种存储,在Flink中拥有不同结构的State。这里要说的Keyed State,也就是按照key-value形式来组织的State。
Keyd State是专门针对KeyedStream能够使用的state。例如 :针对以下的KeyedStream。
KeyedStream
<
OrderItem11
,
String
>
itemByKeyDS
=
orderItemDS
.keyBy(item -> item.getGoodsId());
换句话说,就是按照key进行分区之后的操作才能够使用Keyed State。Keyed state可以确保所有的状态更新都是本地操作,而且可以保证状态数据的一致性。因为一个key肯定属于一个分区,一个key的状态数据无需分布式存储。
使用Keyed State
每一个key对应有一个State,这个State可不简单。它的存储结构是非常丰富的。Flink官方把这些结构的操作称之为针对Keyed State的primitive(原语)。后边,我们也使用这个词来描述。
在Keyed State中,Flink支持的primitive有5种,我们分别来一下:
<1> ValueState<T>
ValueState也就是用于存储值的State,例如:我们可以将一个Integer或者是Double等这一类的类型存储在ValueState中。
<2> ListState<T>
表示可以将一个List数据存储在State中。它支持List的大部分操作,例如:添加一个元素到State中、获取元素、迭代列表或者更新整个列表。
<3> ReducingState<T>
Reuce表示用于保存聚合结果的状态。它的接口和ListState类似,但是ReducingState是用于做聚合计算的。
<4> AggregatingState<T>
类似于ReducingState,只不过计算的类型可以和元素的类型不一样。
<5> MapState<UK, UV>
可以将key-value存储。例如:可以通过put、get、keys、values等操作。
注意:所有的primitive都支持一个clear操作,可以将当前key的所有状态数据清除。
要获取状态,我们需要构建一个StateDescriptor,我们可以根据要创建的状态来选择不同的Descriptor。例如:ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、MapStateDescriptor。通过context才可以获取到State,所以一般使用的是Rich Function中才能使用状态。
基于Keyed State的WordCount实现
接下来,我们使用Flink中的Keyed State来实现WordCount。因为WordCount需要将单词进行累加计数,所以此处,我们用ReducingState就比较方便一些,直接往State中添加单词,然后基于ReducingState实现一个Reduce Function。
<1> 把Flink的依赖导入进来:
<repositories>
<repository>
<id>aliyunmaven</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<properties>
<flink-version>1.12.0</flink-version>
<scala-version>2.12</scala-version>
<mysql-version>5.1.47</mysql-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala-version}</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala-version}</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>