flume 采集写入hbase速度越来越慢_Flume中的事务

  • Post author:
  • Post category:其他


一提到事务,首先就想到的是关系型数据库中的事务,事务一个典型的特征就是将一批操作做成原子性的,要么都成功,要么都失败。

在Flume中一共有两个事务:


  • Put事务:

    在Source到Channel之间

  • Take事务:

    Channel到Sink之间

从Source到Channel过程中,数据在Flume中会被封装成Event对象,也就是一批Event,把这批Event放到一个事务中,把这个事务也就是这批event一次性的放入Channel中。同理,Take事务的时候,也是把这一批event组成的事务统一拿出来到sink放到HDFS上。

Flume中的Put事务

  • 事务开始的时候会调用一个doPut 方法,doPut方法将一批数据放在putList中;

    • putList在向Channel发送数据之前先检查Channel的容量能否放得下,如果放不下一个都不放,只能doRollback;
    • 数据批的大小取决于配置参数batch size的值;
    • putList的大小取决于配置Channel的参数transaction capacity的大小,该参数大小就体现在putList上;(Channel的另一个参数capacity指的是Channel的容量);
  • 数据顺利的放到putList之后,接下来可以调用doCommit方法,把putList中所有的Event放到 Channel 中,成功放完之后就清空putList;

在doCommit提交之后,事务在向Channel存放数据的过程中,事务容易出问题。如Sink取数据慢,而Source放数据速度快,容易造成Channel中数据的积压,如果putList中的数据放不进去,会如何呢?

此时会调用 doRollback 方法,doRollback方法会进行两项操作:将putList清空; 抛出 ChannelException异常。source会捕捉到doRollback抛出的异常,然后source就将刚才的一批数据重新采集,然后重新开始一个新的事务,这就是事务的回滚。

Flume中的 Take 事务

Take事务同样也有takeList,HDFS sink配置有一个batch size,这个参数决定Sink从Channel 取数据的时候一次取多少个,所以该batch size得小于takeList的大小,而takeList的大小取决于 transaction capacity 的大小,同样是channel中的参数。


Take事务流程:

事务开始后

  • doTake方法会将channel中的event剪切到takeList中。如果后面接的是HDFS Sink的话,在把Channel中的event剪切到takeList中的同时也往写入HDFS的IO缓冲流中放一份event(数据写入HDFS是先写入IO缓冲流然后flush到HDFS);
  • 当takeList中存放了batch size 数量的event之后,就会调用doCommit方法,doCommit方法会做两个操作:

    • 针对HDFS Sink,手动调用IO流的flush方法,将IO流缓冲区的数据写入到HDFS磁盘中;
    • 清空takeList中的数据

flush到HDFS的时候组容易出问题。flush到HDFS的时候,可能由于网络原因超时导致数据传输失败,这个时候调用doRollback方法来进行回滚,回滚的时候由于takeList中还有备份数据,所以将takeList中的数据原封不动地还给channel,这时候就完成了事务的回滚。

但是,如果flush到HDFS的时候,数据flush了一半之后出问题了,这意味着已经有一半的数据已经发送到HDFS上面了,现在出了问题,同样需要调用doRollback方法来进行回滚,回滚并没有“一半”之说,它只会把整个takeList中的数据返回给 channel,然后继续进行数据的读写。这样开启下一个事务的时候容易造成

数据重复

的问题。


Flume中的put和take事务流程如下图:

823ebfe283f8b3b79179dcb6e23bafa2.png
Flume中的事务流程示意图



版权声明:本文为weixin_30619981原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。