Flink 调优:Checkpoint 配置 | 码农家园 (codenong.com)
Flink Checkpoint 参数详解 – 宁君 – 博客园 (cnblogs.com)
(131条消息) Flink的数据持久化-CheckPoint机制_Relian哈哈的博客-CSDN博客_flink 持久化
(131条消息) Flink中State管理与恢复之CheckPoint的参数设置_乀曼巴丶小飞侠的博客-CSDN博客
1. Checkpoint 的配置
在 Flink 应用程序中配置 Checkpoint,首先需要开启 Checkpoint,同时指定 Checkpoint 的时间间隔。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
CheckPoint是通过给程序快照的方式使得将历史某些时刻的状态保存下来,当任务挂掉之后,默认从最近一次保存的完整快照处进行恢复任务
通过 Checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
*/
// 每 500s 做一次 checkpoint;
env.enableCheckpointing(500000L);
Flink 默认提供 Extractly-Once 保证 State 的一致性。这可以通过 CheckpointConfig 进行设置,Flink 提供了 Extractly-Once,At-Least-Once 两种模式。
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
由于每个 Flink 应用程序的 State 大小不同,StateBackend 也可能有所不同,所以,Checkpoint 需要根据实际情况配置。Flink 默认同一时间只允许执行一个 Checkpoint,以避免占用太多正常数据处理资源。如果在配置的 Checkpoint 时间间隔之内,一个 Checkpoint 正在生产,另一个 Checkpoint 也需要开始生产,那么,第二个 Checkpoint 将会等到第一个 Checkpoint 生产完成才会开始。
那么,如果很多 Checkpoint 生产时间过长,比 Checkpoint 时间间隔还要长,会有什么影响呢?在这种情况下,Checkpoint 的配置是不理想的,原因有二:
- 这种现象意味着应用程序的正常数据处理过程和 Checkpoint 的生产是并发的,资源是被二者共享的。所以,它将会拖慢正常数据的处理速度,无法全力消费数据,甚至导致数据的处理速度慢于数据的输入速度。
- 因为需要等待前一个 Checkpoint 生产完成,会使下一个 Checkpoint 延迟生产,从而导致在失败恢复时需要更长的追赶时间。
为了保证应用程序可以全力处理数据,你可以配置 Checkpoint 彼此之间的停顿时间。比如,你配置最小停顿时间是一分钟,那么在一个 Checkpoint 生产完成之后的头一分钟,不会有新的 Checkpoint 被拉起,这仅限于同时最多只有一个 Checkpoint 生产的情况。
该参数主要目的是设定两个Checkpoint之间的最小时间间隔,防止出现例如状态数据过大而导致Checkpoint执行时间过长,从而导致Checkpoint积压过多,最终Flink应用密集地触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能。
//同一时间,只允许 有 1 个 Checkpoint 在发生
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//设定两个 Checkpoint 之间的最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
如果某个应用程序的 Checkpoint 生产需要比较长的时间,但由于不用消耗很多资源,那么这时候是可以配置 Checkpoint 更高的并发量的:
//同一时间,允许 有 n 个 Checkpoint 在发生
env.getCheckpointConfig().setMaxConcurrentCheckpoints(n);
值得注意的一点是:
Savepoint 是可以和 Checkpoint 并发生产的。即使有多个 Checkpoint 正在生产的过程中,Savepoint 也会并发生产
。
为了避免有 Checkpoint 生产时间过长,导致资源一直被占用,你可以给 Checkpoint 设置一个超时时间,Flink 默认的 Checkpoint 超时时间是 10 分钟。
env.getCheckpointConfig().setCheckpointTimeout(600000L);
Checkpoint 的配置原则
上一节介绍了 Checkpoint 的配置方法,以及 Checkpoint 时间间隔与 Checkpoint 生产时间的关系对 Flink 应用程序的影响。Checkpoint 的配置需要随着 Flink 应用程序的不同而不同。这里简单介绍一下 Checkpoint 的配置原则:
-
Checkpoint 时间间隔不易过大
。一般来说,Checkpoint 时间间隔越长,需要生产的 State 就越大。如此一来,当失败恢复时,需要更长的追赶时间。 -
Checkpoint 时间间隔不易过小
。如果 Checkpoint 时间间隔太小,那么 Flink 应用程序就会频繁 Checkpoint,导致部分资源被占有,无法专注地进行数据处理。 -
Checkpoint 时间间隔大于 Checkpoint 的生产时间
。当 Checkpoint 时间间隔比 Checkpoint 生产时间长时,在上次 Checkpoint 完成时,不会立刻进行下一次 Checkpoint,而是会等待一段时间,之后再进行新的 Checkpoint。否则,每次 Checkpoint 完成时,就会立即开始下一次 Checkpoint,系统会有很多资源被 Checkpoint 占用,而真正任务计算的资源就会变少。 -
开启本地恢复
。如果 Flink State 很大,在进行恢复时,需要从远程存储上读取 State 进行恢复,如果 State 文件过大,此时可能导致任务恢复很慢,大量的时间浪费在网络传输方面。此时可以设置 Flink 应用程序本地 State 恢复,应用程序 State 本地恢复默认没有开启,可以设置参数 state.backend.local-recovery 值为 true 进行激活。 -
设置 Checkpoint 保存数
。Checkpoint 保存数默认是 1,也就是只保存最新的 Checkpoint 的 State 文件,当进行 State 恢复时,如果最新的 Checkpoint 文件不可用时 (比如文件损坏或者其他原因),那么 State 恢复就会失败,如果设置 Checkpoint 保存数 3,即使最新的 Checkpoint 恢复失败,那么 Flink 也会回滚到上一次 Checkpoint 的状态文件进行恢复。考虑到这种情况,可以通过 state.checkpoints.num-retained 设置 Checkpoint 保存数。
Checkpoint 是 Flink 的失败恢复机制,它的配置对于 Flink 应用程序的性能和稳定性有着至关重要的影响。Checkpoint 需要根据 Flink 应用程序的不同而进行不同的配置,根据相关配置原则,以求达到理想的配置,使 Flink 应用程序性能和稳定性最优化。