Flume的TaildirSource配置路径匹配滚动时间(修改源码)
   
    
    
    一、背景
   
    最近有个需求,要用flume读取腾讯云CDN的日志,由于日志是运维同学直接挂在到本地服务器上的,每次访问目录下的文件都相当于要与腾讯云之间有个交互,会产生大量的访问量,似乎这部分要收费的,所有要尽可能地少访问。
    
    日志路径如下
   
/data/cos_dir/tencentcdnlog/2022/04/02/15
    
    
    目录是按照时间实时滚动增加的,也就是说16点时02目录下会多一个16的文件夹,16文件夹里会放有那些日志文件
    
    需求:现在要按时间每小时读取对应文件夹下的文件
    
    原本用的采集工具是fluentd,它的采集配置本身支持滚动的时间但现在要用flume了,flume本身是不支持父目录里有通配符的,所以我们需要改下源码
    
    fluentd配置如下
   
<source>
@type tail
path /data/cos_dir/tencentcdnlog/%Y/%m/%d/%H/*
pos_file /home/john/logs/td-agent/john_cdn_log.log.pos
tag john_cdn_log.bar
read_from_head true
enable_watch_timer false
<parse>
@type none
</parse>
</source>
<match john_cdn_log.bar>
@type kafka_buffered
# list of seed brokers
brokers xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092
# buffer settings
buffer_type file
buffer_path /home/john/logs/td-agent/buffer/john_cdn_log/td
buffer_chunk_limit 256m
buffer_queue_limit 64
# 拉取频率
flush_interval 1s
num_threads 2
# topic settings
default_topic john_cdn_log_input
# data type settings
output_data_type attr:message
compression_codec gzip
# producer settings
max_send_retries 1
kafka_agg_max_bytes 1000000
</match>
    
    
    二、正片(修改TaildirSource源码)
   
    修改完源码后有如下功能:
    
    a、支持带时间通配符的目录
    
    b、支持配置初始化时自动创建不存在的父目录
   
    
    
    1、下载源码
   
    本次修改的是1.9版本
    
    
     官网下载源码包
    
   
https://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-src.tar.gz
https://github.com/apache/flume
    找到taildirSource对应位置
    
     
   
    
    
    2、分析源码
   
    
    
    TaildirSource.java
   
    start()方法,主要做三件事,一、开启一个ReliableTaildirEventReader事件监听者,二、开启个线程去校验文件什么时候关闭,三、开启一个线程去维护position位置文件。
    
    
    configure(Context context)
    
    负责装载配置文件
    
    
    
    process()
    
    负责与channel的交互,把读取到的文件给channel
    
     
   
    
    
    ReliableTaildirEventReader.java
   
    看构造方法
    
    会在这里创建TaildirMatcher对象,每个父路径一个对象,待会我们重点改它
    
    
    
    看updateTailFiles(boolean skipToEnd)方法
    
    这个方法很重要,会遍历每个TaildirMatcher(一个TM相当于一个父目录),更新下面的文件信息
    
    
    
    TaildirSource中的process也会调用它
    
     
   
    
    
    TaildirMatcher.java
   
    看构造方法
    
    这里,每个TaildirMatcher都会保存一个对应的父目录信息,我们要改的也主要是这里
    
    
    
    这还有个getMatchingFiles()用来更新文件
    
     
   
    
    
    3、修改源码
   
    
    
    TaildirMatcher.java
   
    主要是在创建TaildirMatcher时,把输入的路径中含有的时间通配符换成了当前时间对应的准确值,由于TaildirMathcer会在一开始判断下配置中给定的文件路径的父目录是否存在,若不存在会报错,而我测试中用的时间目录是精确到分钟的,所以为了我每次启动时不报错,就在配置中加了个配置,让它一开始检测到父目录不存在时可以自动创建父目录
    
     
   
     
   
    
    
    ReliableTaildirEventReader.java
   
    主要的替换操作就在这了,当需要更新文件时,会去判断TaildirMatcher中的父目录parentDir是否是当前时间的目录,如果不是,就会把parentDir替换成当前时间对应的目录
    
    
    
     
   
    
    
    TaildirSource.java
   
    
    
     
   
    
    
    TaildirSourceConfigurationConstants.java
   
    
    
    改完后install打包,只取taildirsource包,丢到flume的lib目录下就行了
    
    
    
     
   
以上所有操作都是针对带时间通配符的路径的,对其他路径不会有影响,主体思路是比较简单的,就是在每次更新文件的时候去判断对应父目录是否是当前时间的,如果不是那就更新为当前时间的父目录,上面的一些其他代码主要是为了实现我的另外两个操作,一、可以加个配置在开始时自动创建不存在的时间父目录而不报错,二、在时间滚动到下一个时间时,判断下对应父目录是否存在,若不存在则不替换父目录,同时打个不存在的info,但为了避免每5秒的更新都打印这个info我又加了个缓存来记录上一次的路径,从而与当前对比,不同时才打印
    
    
    三、注意
   
    1、由于父目录会更新,导致position游标文件也会被后面目录覆盖
    
    2、上文提到有个默认5秒的更新文件时间,假设路径中的时间最小精度是分,当父目录滚动时,在前一分钟的最后5秒(不一定是)到下一分钟这段时间的数据是否会丢失还没去验证
   
    
    
    四、可能遇到的问题
   
    1、打包时遇到其他组件(flume-ng-morphline-solr-sink)报错,直接从maven中干掉它不打包它就行了
    
    
    
    
    
    2、打包时遇到checkstyle格式不符合规范报错,vm中加点参数,跳过检查就行了
    
     
   
    
    
    3、遇到spotbugs代码错误检查报错,找到对应pom.xml稍微提高下上限就行了,因为不一定是有问题的错。
    
     
   
    
    
    五、其他
   
    
    
    自定义channel
   
    为了方便看channel的数据,顺便自定义了个简陋的Chanenl往本地文件写接受到的source数据
    
    pom.xml
   
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>
    </dependencies>
package com.john.myflumechannel;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.flume.*;
import org.apache.flume.channel.AbstractChannel;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
public class MyFlumeChannel1 extends AbstractChannel {
    private static final Logger LOG = LoggerFactory.getLogger(MyFlumeChannel1.class);
    // 文件的落盘位置
    String outputPath;
    File file;
    public void put(Event event) throws ChannelException {
        Transaction transaction = null;
        try{
            transaction = getTransaction();
            transaction.begin();
            String s = new String(event.getBody());
            // 追加写
            FileUtils.writeStringToFile(file, s + "\n", true);
            if(s.length() > 0){
                LOG.info("接收到一条数据:" + s);
            }
            transaction.commit();
        }catch (Exception e){
            transaction.rollback();
            LOG.info("something is error: " + e.getMessage());
        }finally {
            transaction.close();
        }
    }
    public Event take() throws ChannelException {
        return null;
    }
    public Transaction getTransaction() {
        return new BasicTransactionSemantics() {
            @Override
            protected void doPut(Event event) throws InterruptedException {
            }
            @Override
            protected Event doTake() throws InterruptedException {
                return null;
            }
            @Override
            protected void doCommit() throws InterruptedException {
            }
            @Override
            protected void doRollback() throws InterruptedException {
            }
        };
    }
    public void configure(Context context){
        String outputPath = context.getString("outputPath");
        Preconditions.checkNotNull(outputPath, "outputPath must be set!!");
        this.outputPath = outputPath;
        file = new File(outputPath);
    }
}
打包同样丢到flume的lib下就行了
    
    
    测试时用配置文件
   
# example.conf: A single-node Flume configuration
# Name the components on this agent
mytest.sources = r1 r2
mytest.channels = c1 c2
# Describe/configure the source
mytest.sources.r1.type = TAILDIR
mytest.sources.r1.channels = c1
mytest.sources.r1.filegroups = f1 f2
mytest.sources.r1.filegroups.f1 = /root/john/%Y/%m/%d/%H/%M/.*
mytest.sources.r1.filegroups.f2 = /root/john/2022/04/02/14/05/.*
mytest.sources.r1.positionFile = /root/john/flume/flume-position/myposition1.json
mytest.sources.r1.batchSize = 100
mytest.sources.r1.initTimeParentDir = true
#mytest.channels.c1.type = file
#mytest.channels.c1.checkpointDir = /root/john/flume/checkpoint
#mytest.channels.c1.dataDirs = /root/john/flume/data
mytest.channels.c1.type = com.john.myflumechannel.MyFlumeChannel1
mytest.channels.c1.outputPath = /root/john/flume/myChannel/mydata.log
# Describe/configure the source
mytest.sources.r2.type = TAILDIR
mytest.sources.r2.channels = c2
mytest.sources.r2.filegroups = f1
mytest.sources.r2.filegroups.f1 = /root/john/flume/testlog/.*
mytest.sources.r2.positionFile = /root/john/flume/flume-position/myposition2.json
mytest.sources.r2.batchSize = 100
mytest.channels.c2.type = com.john.myflumechannel.MyFlumeChannel1
mytest.channels.c2.outputPath = /root/john/flume/myChannel/mydata.log
 
