一、Flume概述
- flume是一款高可用、高可靠、分布式的海量日志收集、聚合的传输系统。Flume基于流式框架,灵活简单。
- Agent:flume的进程,包括source、channel、sink
- Source:接收数据源,常用netcat、avro、taildir、exec
- channel:连接source和sink,中间缓冲,常用filechannel、memorychannel和kafkachannel
- sink:从chanel中拉取数据,写入到对应的目的地,常用hdfssink、kafka sink
- event:发送数据的最小单位,有headers和body组成,headers内是k-v对。默认为空
二、Flume使用
1、安装:在官网下载对应的tar包,解压就可用,官网为我们提供了相关资料,便于使用时进行查阅
2、配置样例
# 定义agent的名字、source、channel、sink
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source(查阅官网)
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 配置sink(查阅官网)
a1.sinks.k1.type = logger
# 配置source
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置source和channel、source和sink连接
#一个sink只能连接一个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、启动:flume-ng -n a1 -c 配置文件文件夹 -f 编写的样例位置 -Dflume.root.logger=INFO,console(日志输出到控制台)
三、Flume进阶
1、事务e
put事务:将数据写入putlist,提交时判断channel是否可以写入,如果不可以写入事务回滚,数据需要重新提交
take事务:sink从channel拉取数据放入到takelist中,takelist提交数据写入到目的地,如果写失败了,数据回滚重新写入,在写的过程中takelist不释放资源,只有真正写成功才会释放资源
2、内部原理
数据进入source后,由channelprocessor调用相关的拦截器对数据进行预处理,处理好后发送给channelselector,选择对应的分发策略发送给channel,sink端由sinkprocessor选择相应的分发策略,发送到对应的sink把数据写入到对应的目的地
channelSelector:
- Replicating:默认副本策略
- Multiplexing:多路复用(路由策略),需要搭配拦截器使用
SinkProcessor:
- DefaulatSinkProcessor:一对一
- LoadBalancingSinkProcessor:负载均衡
- FailoverSinkProcessor:失败转移
四、自定义拦截器步骤
实现Interceptor接口,重写对应的方法
public class MyInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
if ((body[0] >= 'A' && body[0] <= 'Z') || (body[0] >= 'a' && body[0] <= 'z')){
event.getHeaders().put("type","zimu");
}else if (body[0] >= '0' && body[0] <= '9'){
event.getHeaders().put("type","shuzi");
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event: list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
拦截器配置:(搭配多路复用)
#复用
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2
#自定义拦截器
a1.sources.r1.interceptors = i1
#注意:调用的是Interceptor的内部类Builder
a1.sources.r1.interceptors.i1.type = com.atguigu.demo.MyInterceptor$Builder
版权声明:本文为qq_33581278原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。