背景描述:
从kafka消费的数据落到Hbase中,源表的数据量较大,因此采用了mutator缓存1000条,或者累积1秒后,一起put。避免短时间内大量访问hbase regionServer,把hbase 干废了。
由于mutator的flush操作是在invoke()方法中触发的,而invoke方法只在数据达到sink算子时,才会被触发。因此在夜间数据比较稀疏时,数据的时效性取决于两条数据到达的时间间隔。
为解决这个问题,在open函数中初始化了一个Timer定时器,设置每秒中调用一次Invoke方法。这样除了数据达到会触发invoke方法,定时器也会触发invoke。
@Override
public void open(Configuration parameters) thorw Exceptions{
initialConnection();
initialMuator();
timer = new Timer();
timer.schedule(new TimerTask(){
@Override
public void run(){
invoke(new PutVo(), context);
}
},0L,1000L);
}
然而出现了丢数问题。6万笔数据丢了三千多笔。
看日志发现,因为Timer起了并发线程,如下图。理想情况下两个并发的线程顺序执行,互不干扰。然而事实上他们共享了puts对象,出现了在第一个线程中刚把数据加到puts对象中,第二个线程把puts给清空了。
如此以来,出现了丢数。
解决方法:
目前采用对自定义的sink类继承CheckpointedFunction接口,实现其中的snapshotState方法。
checkpointedFunction 是实现operator state 的核心方法,其中定义了两个方法:snapshotState 和 initialState
- snapshotState 在checkpoint的时候会被调用,用于快照状态,通常用于flush、commit、synchronize外部系统
- initializeState 在从状态中恢复时会被调用。
计划在每次checkpoint的时候,flush一次。由于每次进行Checkpoint前,都需要暂停处理新流入数据,然后再开始执行快照。所以不会出现刚刚把数据放到puts里,然后puts被清空的情况。
public class HbaseMuatorSink extends RichSinkFunction<PutVo> implements Serializable, CheckpoitedFunction{
private List<Put> puts = new ArrayList<>();
private BufferedMutator mutator = null;
@Override
public void open() throws Exception{
initialConnection();
initialMuator();
}
@Override
public void invoke(PutVo putVo, Context context){
puts.add(getPut(putVo));
count +=1;
if(count > 1000){
mutator.mutate();
mutator.flush();
puts.clear();
count = 0;
}
}
@Override
public void close() throws Exception{
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw Exception{
mutator.mutate(puts);
mutator.flush();
puts.clear();
count = 0;
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
}
}
———2023年1月10日
今天回头看,发现两个问题。
一、在初始化BufferedMutator时,使用BufferedMutatorParams除了可以定义缓存大小,还可以定义刷写时间,检查是否要刷写的频率。
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("t1"));
//设置缓存大小为 10M
params.writeBufferSize(10 * 1024 * 1024L);
//设置缓存自动刷写等待时间1000ms
params.setWriteBufferPeriodicFlushTimeoutMs(1000);
//设置缓存自动刷写检查时间1000ms。(从hbase 4.00起这个方法将被废弃,hbase自己会设置一个全局的检查时间)
params.setWriteBufferPeriodicFlushTimerTickMs(1000);
mutator = connection.getBufferedMutator(params);
如此一来,就不用在snapshot的时候,去做缓存了。
二、对于上面的线程安全问题,flink是个集群,我们提交的任务肯定会在集群中并发执行。那么执行过程中,是否也会有并发导致的数据丢失呢?
flink任务可以设置并行度,并不是每个任务都会并发执行。如果并行度只有1,那么就是在一个slot中顺序执行。如果并行度为 n,那么会启动 n 个slot,每个slot执行一串task。每个slot内是一个进程,slot之间是多个进程,不会共享进程内的对象,因此不会出线程安全的问题。
reference:
聊聊flink的CheckpointedFunction – 简书