CkeckPoint优化

  • Post author:
  • Post category:其他



Flink 调优:Checkpoint 配置 | 码农家园 (codenong.com)


Flink Checkpoint 参数详解 – 宁君 – 博客园 (cnblogs.com)


(131条消息) Flink的数据持久化-CheckPoint机制_Relian哈哈的博客-CSDN博客_flink 持久化


(131条消息) Flink中State管理与恢复之CheckPoint的参数设置_乀曼巴丶小飞侠的博客-CSDN博客


1. Checkpoint 的配置

在 Flink 应用程序中配置 Checkpoint,首先需要开启 Checkpoint,同时指定 Checkpoint 的时间间隔。

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
         CheckPoint是通过给程序快照的方式使得将历史某些时刻的状态保存下来,当任务挂掉之后,默认从最近一次保存的完整快照处进行恢复任务
         通过 Checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
*/
        // 每 500s 做一次 checkpoint;
        env.enableCheckpointing(500000L);

Flink 默认提供 Extractly-Once 保证 State 的一致性。这可以通过 CheckpointConfig 进行设置,Flink 提供了 Extractly-Once,At-Least-Once 两种模式。

  env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

由于每个 Flink 应用程序的 State 大小不同,StateBackend 也可能有所不同,所以,Checkpoint 需要根据实际情况配置。Flink 默认同一时间只允许执行一个 Checkpoint,以避免占用太多正常数据处理资源。如果在配置的 Checkpoint 时间间隔之内,一个 Checkpoint 正在生产,另一个 Checkpoint 也需要开始生产,那么,第二个 Checkpoint 将会等到第一个 Checkpoint 生产完成才会开始。

那么,如果很多 Checkpoint 生产时间过长,比 Checkpoint 时间间隔还要长,会有什么影响呢?在这种情况下,Checkpoint 的配置是不理想的,原因有二:

  1. 这种现象意味着应用程序的正常数据处理过程和 Checkpoint 的生产是并发的,资源是被二者共享的。所以,它将会拖慢正常数据的处理速度,无法全力消费数据,甚至导致数据的处理速度慢于数据的输入速度。
  2. 因为需要等待前一个 Checkpoint 生产完成,会使下一个 Checkpoint 延迟生产,从而导致在失败恢复时需要更长的追赶时间。

为了保证应用程序可以全力处理数据,你可以配置 Checkpoint 彼此之间的停顿时间。比如,你配置最小停顿时间是一分钟,那么在一个 Checkpoint 生产完成之后的头一分钟,不会有新的 Checkpoint 被拉起,这仅限于同时最多只有一个 Checkpoint 生产的情况。

该参数主要目的是设定两个Checkpoint之间的最小时间间隔,防止出现例如状态数据过大而导致Checkpoint执行时间过长,从而导致Checkpoint积压过多,最终Flink应用密集地触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能。

 //同一时间,只允许 有 1 个 Checkpoint 在发生
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//设定两个 Checkpoint 之间的最小时间间隔
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

如果某个应用程序的 Checkpoint 生产需要比较长的时间,但由于不用消耗很多资源,那么这时候是可以配置 Checkpoint 更高的并发量的:

//同一时间,允许 有 n 个 Checkpoint 在发生
env.getCheckpointConfig().setMaxConcurrentCheckpoints(n);

值得注意的一点是:

Savepoint 是可以和 Checkpoint 并发生产的。即使有多个 Checkpoint 正在生产的过程中,Savepoint 也会并发生产

为了避免有 Checkpoint 生产时间过长,导致资源一直被占用,你可以给 Checkpoint 设置一个超时时间,Flink 默认的 Checkpoint 超时时间是 10 分钟。

env.getCheckpointConfig().setCheckpointTimeout(600000L);


Checkpoint 的配置原则

上一节介绍了 Checkpoint 的配置方法,以及 Checkpoint 时间间隔与 Checkpoint 生产时间的关系对 Flink 应用程序的影响。Checkpoint 的配置需要随着 Flink 应用程序的不同而不同。这里简单介绍一下 Checkpoint 的配置原则:


  1. Checkpoint 时间间隔不易过大

    。一般来说,Checkpoint 时间间隔越长,需要生产的 State 就越大。如此一来,当失败恢复时,需要更长的追赶时间。

  2. Checkpoint 时间间隔不易过小

    。如果 Checkpoint 时间间隔太小,那么 Flink 应用程序就会频繁 Checkpoint,导致部分资源被占有,无法专注地进行数据处理。

  3. Checkpoint 时间间隔大于 Checkpoint 的生产时间

    。当 Checkpoint 时间间隔比 Checkpoint 生产时间长时,在上次 Checkpoint 完成时,不会立刻进行下一次 Checkpoint,而是会等待一段时间,之后再进行新的 Checkpoint。否则,每次 Checkpoint 完成时,就会立即开始下一次 Checkpoint,系统会有很多资源被 Checkpoint 占用,而真正任务计算的资源就会变少。

  4. 开启本地恢复

    。如果 Flink State 很大,在进行恢复时,需要从远程存储上读取 State 进行恢复,如果 State 文件过大,此时可能导致任务恢复很慢,大量的时间浪费在网络传输方面。此时可以设置 Flink 应用程序本地 State 恢复,应用程序 State 本地恢复默认没有开启,可以设置参数 state.backend.local-recovery 值为 true 进行激活。

  5. 设置 Checkpoint 保存数

    。Checkpoint 保存数默认是 1,也就是只保存最新的 Checkpoint 的 State 文件,当进行 State 恢复时,如果最新的 Checkpoint 文件不可用时 (比如文件损坏或者其他原因),那么 State 恢复就会失败,如果设置 Checkpoint 保存数 3,即使最新的 Checkpoint 恢复失败,那么 Flink 也会回滚到上一次 Checkpoint 的状态文件进行恢复。考虑到这种情况,可以通过 state.checkpoints.num-retained 设置 Checkpoint 保存数。

Checkpoint 是 Flink 的失败恢复机制,它的配置对于 Flink 应用程序的性能和稳定性有着至关重要的影响。Checkpoint 需要根据 Flink 应用程序的不同而进行不同的配置,根据相关配置原则,以求达到理想的配置,使 Flink 应用程序性能和稳定性最优化。



版权声明:本文为m0_57320261原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。