Flume之常用拦截器

  • Post author:
  • Post category:其他


在Flume中会使用一些拦截器对source中的数据在进入channel之前进行拦截做一些处理,比如过滤掉一些数据,或者加上一些key/value等。可以同时使用多个拦截器,实现不同的功能。

常用的拦截器有时间戳拦截器、主机名拦截器、静态拦截器等。

时间戳拦截器:会在event的Header中添加一个key为timestamp,value为当前时间戳的值。

主机名拦截器:会在event的Header中添加一个key(默认是host,也可通过hostHeader这个属性来自定义key),value是flume agent的主机名或IP(通过useIP这个属性值来定义,如果useIP设置为true(默认值是true),那么value就是用IP值,如果useIP设置为false,那么value就是用主机名)。

静态拦截器:在event的Header中添加自指定的静态的key/value。



案例-时间戳拦截器+主机名拦截器+静态拦截器:

  • 创建自定义conf文件
[root@hadoop01 test_conf]# pwd
/usr/local/wyh/apache-flume-1.8.0-bin/test_conf
[root@hadoop01 test_conf]# cat test-timestamp-interceptor.conf
myagent.sources=mysource1
myagent.channels=mychannel1
myagent.sinks=mysink1

#使用syslogtcp源,会帮我们坚硬某个主机上的某个tcp端口
myagent.sources.mysource1.type=syslogtcp
myagent.sources.mysource1.host=hadoop01
myagent.sources.mysource1.port=8888
#这里使用的是连续的三个拦截器
myagent.sources.mysource1.interceptors=myinterceptor1 myinterceptor2 myinterceptor3
myagent.sources.mysource1.interceptors.myinterceptor1.type=timestamp
#preserveExisting表明当该拦截器的key已经存在时是否将event中原有的key值覆盖掉,如果设置为true,表明不覆盖原有值,即保护原有值,设置为false时,会覆盖掉原来的key值
myagent.sources.mysource1.interceptors.myinterceptor1.preserveExisting=false
myagent.sources.mysource1.interceptors.myinterceptor2.type=host
myagent.sources.mysource1.interceptors.myinterceptor2.preserveExisting=false
myagent.sources.mysource1.interceptors.myinterceptor2.useIP=false
myagent.sources.mysource1.interceptors.myinterceptor2.hostHeader=my_flume_agent_host
myagent.sources.mysource1.interceptors.myinterceptor3.type=static
myagent.sources.mysource1.interceptors.myinterceptor3.preserveExisting=false
myagent.sources.mysource1.interceptors.myinterceptor3.key=wyh-test-interceptor-key
myagent.sources.mysource1.interceptors.myinterceptor3.value=wyh-test-interceptor-value

myagent.channels.mychannel1.type=memory

myagent.sinks.mysink1.type=logger
myagent.sinks.mysink1.maxBytesToLog=200

myagent.sources.mysource1.channels=mychannel1
myagent.sinks.mysink1.channel=mychannel1

  • 启动flume agent
[root@hadoop01 test_conf]# flume-ng agent -c /usr/local/wyh/apache-flume-1.8.0-bin/conf -f /usr/local/wyh/apache-flume-1.8.0-bin/test_conf/test-timestamp-interceptor.conf -n myagent -Dflume.root.logger=INFO,console

启动成功:

  • 发送测试数据

使用tcp发送数据时,需要先安装一下nc的包,才能使用nc命令:

[root@hadoop01 ~]# yum install -y nmap-ncat

发送数据:

[root@hadoop01 ~]# echo "hello,I am testing interceptor on hadoop01~" | nc hadoop01 8888
  • flume agent端验证数据



案例-正则拦截器:

  • 创建自定义conf文件
[root@hadoop01 test_conf]# cat test-regex-interceptor.conf
myagent.sources=mysource1
myagent.channels=mychannel1
myagent.sinks=mysink1

myagent.sources.mysource1.type=syslogtcp
myagent.sources.mysource1.host=hadoop01
myagent.sources.mysource1.port=8888
myagent.sources.mysource1.interceptors=myinterceptor1
myagent.sources.mysource1.interceptors.myinterceptor1.type=regex_filter
#preserveExisting表明当该拦截器的key已经存在时是否将event中原有的key值覆盖掉,如果设置为true,表明不覆盖原有值,即保护原有值,设置为false时,会覆盖掉原来的key值
myagent.sources.mysource1.interceptors.myinterceptor1.regex=^[A-Z].*$
#该属性表明当满足上述的正则时,该event应该被保留下来还是应该被移除掉,设置为false表明留下该event,设置为true表明移除掉该event。
myagent.sources.mysource1.interceptors.myinterceptor1.excludeEvents=false


myagent.channels.mychannel1.type=memory

myagent.sinks.mysink1.type=logger
myagent.sinks.mysink1.maxBytesToLog=200

myagent.sources.mysource1.channels=mychannel1
myagent.sinks.mysink1.channel=mychannel1
  • 启动agent
[root@hadoop01 test_conf]# flume-ng agent -c /usr/local/wyh/apache-flume-1.8.0-bin/conf -f /usr/local/wyh/apache-flume-1.8.0-bin/test_conf/test-regex-interceptor.conf -n myagent -Dflume.root.logger=INFO,console
  • 发送符合正则的测试数据(以大写字母开头的数据)
[root@hadoop01 ~]# echo "Hello regex~" | nc hadoop01 8888
  • flume agent控制台验证数据

  • 发送不符合正则的测试数据(非以大写字母开头的数据)
[root@hadoop01 ~]# echo "hello regex~" | nc hadoop01 8888

此时,控制台上没有获取到发送的数据,说明拦截器有效。

以上就是flume中常用拦截器的简单使用。



版权声明:本文为QYHuiiQ原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。