Flink面向状态编程

  • Post author:
  • Post category:其他


有状态的Stream应用开发

流处理,以前我们的认知是来一条数据就处理一条,例如:解析某种编码的事件,将事件转换为更易读、已处理的编码格式。而当今天,我们要开发一个流式应用时,往往需要进行事件中间状态的存储。例如:我们需要每隔两秒计算出来最近1小时的访问流量。

Flink的特点就是有状态的流式处理。而有状态会让事情变得复杂起来。当流处理有状态时,一旦出现故障,就需要将出错之前的状态恢复回来,并且在对Flink集群进行扩展时,我们也需要将状态进行重新分配。还有一种需求,当一些计算指标存储在状态中时,我们希望通过HTTP这类方式,能够读取到状态中的数据,以用图形化的方式展示出来。

所以,对于Flink状态的开发、管理,是尤其重要的。

在Flink中有三种不同类型的State,一种是Keyed State、一种是Operator State,还有一种是Broadcast state。

Keyed State

Keyed State介绍

State是一种存储,在Flink中拥有不同结构的State。这里要说的Keyed State,也就是按照key-value形式来组织的State。

Keyd State是专门针对KeyedStream能够使用的state。例如 :针对以下的KeyedStream。


KeyedStream


<


OrderItem11


,


String


>


itemByKeyDS


=


orderItemDS


.keyBy(item -> item.getGoodsId());

换句话说,就是按照key进行分区之后的操作才能够使用Keyed State。Keyed state可以确保所有的状态更新都是本地操作,而且可以保证状态数据的一致性。因为一个key肯定属于一个分区,一个key的状态数据无需分布式存储。

使用Keyed State

每一个key对应有一个State,这个State可不简单。它的存储结构是非常丰富的。Flink官方把这些结构的操作称之为针对Keyed State的primitive(原语)。后边,我们也使用这个词来描述。

在Keyed State中,Flink支持的primitive有5种,我们分别来一下:

<1> ValueState<T>

ValueState也就是用于存储值的State,例如:我们可以将一个Integer或者是Double等这一类的类型存储在ValueState中。

<2> ListState<T>

表示可以将一个List数据存储在State中。它支持List的大部分操作,例如:添加一个元素到State中、获取元素、迭代列表或者更新整个列表。

<3> ReducingState<T>

Reuce表示用于保存聚合结果的状态。它的接口和ListState类似,但是ReducingState是用于做聚合计算的。

<4> AggregatingState<T>

类似于ReducingState,只不过计算的类型可以和元素的类型不一样。

<5> MapState<UK, UV>

可以将key-value存储。例如:可以通过put、get、keys、values等操作。

注意:所有的primitive都支持一个clear操作,可以将当前key的所有状态数据清除。

要获取状态,我们需要构建一个StateDescriptor,我们可以根据要创建的状态来选择不同的Descriptor。例如:ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、MapStateDescriptor。通过context才可以获取到State,所以一般使用的是Rich Function中才能使用状态。

基于Keyed State的WordCount实现

接下来,我们使用Flink中的Keyed State来实现WordCount。因为WordCount需要将单词进行累加计数,所以此处,我们用ReducingState就比较方便一些,直接往State中添加单词,然后基于ReducingState实现一个Reduce Function。

<1> 把Flink的依赖导入进来:

<repositories>
    <repository>
        <id>aliyunmaven</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
</repositories>

<properties>
    <flink-version>1.12.0</flink-version>
    <scala-version>2.12</scala-version>
    <mysql-version>5.1.47</mysql-version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink-version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala-version}</artifactId>
        <version>${flink-version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala-version}</artifactId>
        <version>${flink-version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.5</version>
    </dependency>
    <dependency>
  



版权声明:本文为ChinaPoison原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。