在Flume中会使用一些拦截器对source中的数据在进入channel之前进行拦截做一些处理,比如过滤掉一些数据,或者加上一些key/value等。可以同时使用多个拦截器,实现不同的功能。
常用的拦截器有时间戳拦截器、主机名拦截器、静态拦截器等。
时间戳拦截器:会在event的Header中添加一个key为timestamp,value为当前时间戳的值。
主机名拦截器:会在event的Header中添加一个key(默认是host,也可通过hostHeader这个属性来自定义key),value是flume agent的主机名或IP(通过useIP这个属性值来定义,如果useIP设置为true(默认值是true),那么value就是用IP值,如果useIP设置为false,那么value就是用主机名)。
静态拦截器:在event的Header中添加自指定的静态的key/value。
案例-时间戳拦截器+主机名拦截器+静态拦截器:
- 创建自定义conf文件
[root@hadoop01 test_conf]# pwd
/usr/local/wyh/apache-flume-1.8.0-bin/test_conf
[root@hadoop01 test_conf]# cat test-timestamp-interceptor.conf
myagent.sources=mysource1
myagent.channels=mychannel1
myagent.sinks=mysink1
#使用syslogtcp源,会帮我们坚硬某个主机上的某个tcp端口
myagent.sources.mysource1.type=syslogtcp
myagent.sources.mysource1.host=hadoop01
myagent.sources.mysource1.port=8888
#这里使用的是连续的三个拦截器
myagent.sources.mysource1.interceptors=myinterceptor1 myinterceptor2 myinterceptor3
myagent.sources.mysource1.interceptors.myinterceptor1.type=timestamp
#preserveExisting表明当该拦截器的key已经存在时是否将event中原有的key值覆盖掉,如果设置为true,表明不覆盖原有值,即保护原有值,设置为false时,会覆盖掉原来的key值
myagent.sources.mysource1.interceptors.myinterceptor1.preserveExisting=false
myagent.sources.mysource1.interceptors.myinterceptor2.type=host
myagent.sources.mysource1.interceptors.myinterceptor2.preserveExisting=false
myagent.sources.mysource1.interceptors.myinterceptor2.useIP=false
myagent.sources.mysource1.interceptors.myinterceptor2.hostHeader=my_flume_agent_host
myagent.sources.mysource1.interceptors.myinterceptor3.type=static
myagent.sources.mysource1.interceptors.myinterceptor3.preserveExisting=false
myagent.sources.mysource1.interceptors.myinterceptor3.key=wyh-test-interceptor-key
myagent.sources.mysource1.interceptors.myinterceptor3.value=wyh-test-interceptor-value
myagent.channels.mychannel1.type=memory
myagent.sinks.mysink1.type=logger
myagent.sinks.mysink1.maxBytesToLog=200
myagent.sources.mysource1.channels=mychannel1
myagent.sinks.mysink1.channel=mychannel1
- 启动flume agent
[root@hadoop01 test_conf]# flume-ng agent -c /usr/local/wyh/apache-flume-1.8.0-bin/conf -f /usr/local/wyh/apache-flume-1.8.0-bin/test_conf/test-timestamp-interceptor.conf -n myagent -Dflume.root.logger=INFO,console
启动成功:
- 发送测试数据
使用tcp发送数据时,需要先安装一下nc的包,才能使用nc命令:
[root@hadoop01 ~]# yum install -y nmap-ncat
发送数据:
[root@hadoop01 ~]# echo "hello,I am testing interceptor on hadoop01~" | nc hadoop01 8888
- flume agent端验证数据
案例-正则拦截器:
- 创建自定义conf文件
[root@hadoop01 test_conf]# cat test-regex-interceptor.conf
myagent.sources=mysource1
myagent.channels=mychannel1
myagent.sinks=mysink1
myagent.sources.mysource1.type=syslogtcp
myagent.sources.mysource1.host=hadoop01
myagent.sources.mysource1.port=8888
myagent.sources.mysource1.interceptors=myinterceptor1
myagent.sources.mysource1.interceptors.myinterceptor1.type=regex_filter
#preserveExisting表明当该拦截器的key已经存在时是否将event中原有的key值覆盖掉,如果设置为true,表明不覆盖原有值,即保护原有值,设置为false时,会覆盖掉原来的key值
myagent.sources.mysource1.interceptors.myinterceptor1.regex=^[A-Z].*$
#该属性表明当满足上述的正则时,该event应该被保留下来还是应该被移除掉,设置为false表明留下该event,设置为true表明移除掉该event。
myagent.sources.mysource1.interceptors.myinterceptor1.excludeEvents=false
myagent.channels.mychannel1.type=memory
myagent.sinks.mysink1.type=logger
myagent.sinks.mysink1.maxBytesToLog=200
myagent.sources.mysource1.channels=mychannel1
myagent.sinks.mysink1.channel=mychannel1
- 启动agent
[root@hadoop01 test_conf]# flume-ng agent -c /usr/local/wyh/apache-flume-1.8.0-bin/conf -f /usr/local/wyh/apache-flume-1.8.0-bin/test_conf/test-regex-interceptor.conf -n myagent -Dflume.root.logger=INFO,console
- 发送符合正则的测试数据(以大写字母开头的数据)
[root@hadoop01 ~]# echo "Hello regex~" | nc hadoop01 8888
- flume agent控制台验证数据
- 发送不符合正则的测试数据(非以大写字母开头的数据)
[root@hadoop01 ~]# echo "hello regex~" | nc hadoop01 8888
此时,控制台上没有获取到发送的数据,说明拦截器有效。
以上就是flume中常用拦截器的简单使用。
版权声明:本文为QYHuiiQ原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。