【Flink】容错机制

  • Post author:
  • Post category:其他




容错机制



检查点



检查点的保存

定期存盘,将状态保存到检查点。

保存的时间点:

source数据源处记录一个offset,当所有子任务都处理完同一个offset处的数据的时候触发checkpoint保存。每个子任务之间保存快照的具体时间可能不太一样。

在这里插入图片描述



从检查点恢复状态

如果发生故障,Flink应用会重启,从最近的一次检查点恢复状态。

遇到故障:

在这里插入图片描述

重启应用:

在这里插入图片描述
读取检查点,重置状态:

在这里插入图片描述

重放数据:整个系统的状态完全回退到了检查点保存完成的那一时刻。

在这里插入图片描述

继续处理数据:

在这里插入图片描述



检查点算法

1.检查点分界线 Checkpoint Barrier

类似于watermark,在数据流中插入一个特殊标记,遇到这个标记,就知道要做一次检查点保存了。

在这里插入图片描述

2.分布式快照算法

在流中插入分界线,可以明确指示触发检查点保存的时间,与watermark相似,是截止时间。

一个上游任务向多个并行下游任务发送出去的时候,需要广播出去;

多个上游任务向同一个下游任务传递时,需要在下游任务重执行“分界线对齐”操作,就是需要等到所有并行分区的 barrier 都到齐,才可以开始状态的保存。

以word count为例:

在这里插入图片描述

(1)JobManager定期向每个TaskManager发出检查点保存指令:

在这里插入图片描述

(2)状态快照保存成功,向下游传递:

在这里插入图片描述

(3)分界线对齐:

在这里插入图片描述

向下游多个并行子任务广播分界线,执行分界线对齐:

在这里插入图片描述

(4)分界线对齐后,保存状态到持久化存储:

在这里插入图片描述

(5)先处理缓存数据,然后正常继续处理:

如果某个算子收到了检查点分界线Barrier ,后面又来数据,先缓存起来,等到检查点保存成功后,这些缓存的数据就可以拿出来一个一个处理,处理的结果可以叠加在状态上。

(6)不对齐的检查点保存方式

由于分界线对齐要求先到达的分区做缓存等待,一定程度上影响处理速度,当出现背压时,下游任务会堆积大量的缓冲数据,检查点可能要很久才能保存完毕。

为了应对这种场景,Flink1.11之后提供了不对齐的检查点保存方式,可以将未处理的缓冲数据也保存进检查点。

这样,当遇到一个分区barrier的时候就不需要等待对齐,而是可以直接启动状态的保存了。



检查点配置

两个检查点之间的时间间隔,如果对性能要求高,那得时间间隔长一些,多用来处理数据;如果对实时性要求高,那就时间间隔短一些。

env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(""));

CheckpointConfig cf = env.getCheckpointConfig();

cf.setCheckpointTimeout(60000L);//超过多少毫秒,就被直接丢弃

cf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 精准一次(EXACTLY_ONCE)还是至少一次(AT_LEAST_ONCE)

cf.enableCheckpointing(1000L);//1s 每隔1s,JobManager就向TaskManager发出一条指令,保存检查点

cf.setMinPauseBetweenCheckpoints(500L); // 500毫秒,检查点的保存需要一些时间,如果用时700毫秒才保存好这个检查点,那一定要等满500ms,才能再次发送检查点保存指令,两次检查点保存的收尾之间不能小于500毫秒

cf.setMaxConcurrentCheckpoints(1); // 最大并发的检查点数量,同时能有几个检查点一起在保存;如果已经配置setMinPauseBetweenCheckpoints,那最大并发保存的检查点数量肯定是1

cf.enableUnalignedCheckpoints();//可以不做分界线对齐,来一个保存一个;要求当前的检查点必须是EXACTLY_ONCE,且最大并发数是1.

cf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 是否开启检查点的外部持久化? 当在web ui上点击取消作业的时候,保留检查点还是删除检查点

cf.setTolerableCheckpointFailureNumber(0);//允许失败多少次,如果是0,说明一次都不允许失败。



保存点savepoint

检查点是自动存盘便于故障恢复,保存点是用户有计划的手动存盘。

  • 版本管理和归档存储
  • 更新Flink版本:创建一个保存点,停掉Flink应用,升级Flink,从保存点重启;
  • 更新应用程序:直接更新应用程序,便于修改bug;
  • 调整并行度:应用运行时发现资源不足或者资源过剩,可以从保存点重启,将应用程序的并行度调大或调小;
  • 暂停应用程序:先把应用暂停,等资源够用了再启动;
  1. 创建保存点:命令行、代码里
  2. 从保存点重启应用



状态一致性

精准一次(EXACTLY_ONCE)还是至少一次(AT_LEAST_ONCE)

cf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 



状态一致性的概念和级别

状态一致性,就是计算结果要保证正确。

每个数据,不应该丢掉也不该重复,不多不少。

遇到故障可以恢复状态,恢复后重新计算,计算结果也是完全正确的。

级别:

最多一次 AT-MOST-ONCE:发生故障时什么都不做,丢就丢了,绝不重复。

至少一次 AT-LEAST-ONCE:事件多次出现不影响最终统计结果。

精确一次 EXACTLY-ONCE:恰好一次是最严格的要求也最难实现。不能丢,不能多。

应用状态的一致性检查点:是Flink故障恢复机制的核心。



端到端的状态一致性



端到端精准一次

数据源得可以重放数据,中间的Flink流处理器,写入Sink出口通道。三个部分都不能有问题。

  • 内部保证:开启checkpoint
  • source端:可重设数据的读取位置
  • sink端:从故障恢复时,数据不会重复写入外部系统:幂等写入、事务写入



输入端保证

可重设数据的读取位置。



输出端保证

幂等写入:一个操作,重复执行多次,只会导致一次结果修改:如同一个key,value写入hash表,重复吸入得到的结果总是一样;对外部系统要求过高。

事务写入:一系列严密的操作,所有操作都必须成功完成,斗则在每个操作的所有更改都会被撤销。事务具有原子性,要么全成功,要么全失败。

构建的事务对应着checkpoint,等到checkpoint真正完成的时候,才把所有对应的结果写入sink任务。

实现方式:预写日志,两阶段提交。


预写日志:

  • 把结果数据先当成状态保存,然后在收到checkpoint完成的通知后,一次性写入sink系统;
  • 对sink系统没有要求,一批搞定;
  • DataStream API提供了一个模板类:GenericWriteAheadSink,来实现这种事务性sink。

只有这批数据真正写入到sink系统后,再向JobManager确认,这时才能认为检查点checkpoint是成功的。如果这批数据没写出去,那这个检查点就认为保存失败,就会重做这批数据。

但是特殊场景:二次确认的时候,还是有可能出故障:

在这批数据真正写入到sink系统后,向JobManager二次确认的时候,发生了故障,认为这个事务没有成功,全部回滚重做,但是这批数据其实已经发送到sink一次了,这就是数据的重复。

这时最少一次。


两阶段提交:


预提交、真正提交。

  • 对于每个checkpoint,sink任务会启动一个事务,并将接下来所有接收到的数据添加到事务里;
  • 然后将这些数据写入外部sink系统,但不提交他们,这时只是预提交;
  • 当他收到checkpoint完成的通知时,他才正式提交事务,实现结果的真正写入;
  • 这种方式真正实现了精准一次性,它需要一个提供事务支持的外部sink系统。
  • Flink体用管理TwoPhaseCommitSinkFunction接口。

2PC对外部sink系统的要求:

  • 必须提供事务支持,或sink任务必须能够模拟外部系统上的事务
  • 在checkpoint的时间间隔里,必须能够开启一个事务并接收数据写入
  • 在收到checkpoint完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候sink系统关闭事务了,那么未提交的数据就会丢失,需要配置。
  • sink任务必须能够在进程失败后恢复事务。
  • 提交事务必须是幂等操作。

在这里插入图片描述



Flink和Kafka连接时的精准一次性保证

整体:

(1)Flink内部:开启检查点

(2)输入端:重新提交偏移量可以重新读取数据,把当前的偏移量保存到检查点里。

(3)输出端:FlinkKafkaProducer可以实现两阶段提交的TwoPhaseCommitSinkFunction接口。

在这里插入图片描述

(1)启动检查点保存

在这里插入图片描述

(2)算子任务对状态做快照:

在这里插入图片描述

(3)Sink任务开启事务,进行预提交:

在这里插入图片描述

sink收到检查点命令后,先保存当前状态,然后向JobManager进行检查点保存确认,然后开辟新的事务2处理后面的事件。然后等到得到JobManager确认所有的检查点都保存好了之后,才将事务1提交到外部系统。

(4)检查点保存完成,提交事务:

在这里插入图片描述

之前的数据虽然已经提交到Kafka,但会被标记为预提交的数据,预提交的数据是不能被消费的。

收到JobManager确认后,Kafka才会把预提交数据标记为已经提交的数据,外部可以消费。

如果发生故障,预提交的数据也会回滚。这些数据也会重新处理。

需要的配置:

Flink中传入EXACTILY-ONCE;

Kafka的隔离级别修改为read_committed;

事务超时配置,Flink的Kafka连接器中配置的超时时间药小于Kafka集群配置的事务最大超时事件。



版权声明:本文为qq_30885821原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。