葵花宝典–Flume

  • Post author:
  • Post category:其他


一、Flume概述

  • flume是一款高可用、高可靠、分布式的海量日志收集、聚合的传输系统。Flume基于流式框架,灵活简单。
  • Agent:flume的进程,包括source、channel、sink
  • Source:接收数据源,常用netcat、avro、taildir、exec
  • channel:连接source和sink,中间缓冲,常用filechannel、memorychannel和kafkachannel
  • sink:从chanel中拉取数据,写入到对应的目的地,常用hdfssink、kafka sink
  • event:发送数据的最小单位,有headers和body组成,headers内是k-v对。默认为空

二、Flume使用

1、安装:在官网下载对应的tar包,解压就可用,官网为我们提供了相关资料,便于使用时进行查阅

2、配置样例

# 定义agent的名字、source、channel、sink
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source(查阅官网)
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置sink(查阅官网)
a1.sinks.k1.type = logger

# 配置source
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置source和channel、source和sink连接
#一个sink只能连接一个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、启动:flume-ng -n a1 -c 配置文件文件夹 -f 编写的样例位置 -Dflume.root.logger=INFO,console(日志输出到控制台)

三、Flume进阶

1、事务e

put事务:将数据写入putlist,提交时判断channel是否可以写入,如果不可以写入事务回滚,数据需要重新提交

take事务:sink从channel拉取数据放入到takelist中,takelist提交数据写入到目的地,如果写失败了,数据回滚重新写入,在写的过程中takelist不释放资源,只有真正写成功才会释放资源

2、内部原理

数据进入source后,由channelprocessor调用相关的拦截器对数据进行预处理,处理好后发送给channelselector,选择对应的分发策略发送给channel,sink端由sinkprocessor选择相应的分发策略,发送到对应的sink把数据写入到对应的目的地

channelSelector:

  • Replicating:默认副本策略
  • Multiplexing:多路复用(路由策略),需要搭配拦截器使用

SinkProcessor:

  • DefaulatSinkProcessor:一对一
  • LoadBalancingSinkProcessor:负载均衡
  • FailoverSinkProcessor:失败转移

四、自定义拦截器步骤

实现Interceptor接口,重写对应的方法

public class MyInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        byte[] body = event.getBody();
        if ((body[0] >= 'A' && body[0] <= 'Z') || (body[0] >= 'a' && body[0] <= 'z')){
            event.getHeaders().put("type","zimu");
        }else if (body[0] >= '0' && body[0] <= '9'){
            event.getHeaders().put("type","shuzi");
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event: list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {
    }

    public  static  class  Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }

}

拦截器配置:(搭配多路复用)

#复用
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2


#自定义拦截器
a1.sources.r1.interceptors = i1
#注意:调用的是Interceptor的内部类Builder
a1.sources.r1.interceptors.i1.type = com.atguigu.demo.MyInterceptor$Builder



版权声明:本文为qq_33581278原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。