flume详解

  • Post author:
  • Post category:其他


原博客地址:http://blog.csdn.net/w412692660/article/details/49308921


Flume


依赖关系:


JDK :Flume


源码程序是由



Java



平台编制而成,故此对


JDK


有强制依赖。


HDFS :flume


监听日志文件变化,将日志程序写入存储介质


HDFS



Hbase











HDFS


同属存储介质,用于实时查询,


flume


可藉由


zookeeper


获取


HBASE


访问实例



本次项目


flume


的职责要求:


flume


是收集日志的开源软件解决方案之一,相对于其他同类软件他具有高可用的,高可靠的,分布式等特性,对于分布式日志采集有得天独厚的优势。


此次项目中


flume


是承上启下的作用,上游为



Python



程序分割本服务器日志,下游为将日志存储


HDFS


,期间确保数据安全性及整体应用可用性,故此我们需要在每台日志收集的服务器中安装


flume


程序和


python


分割程序,如下图所示:



日志收集工作流程是




1.OTSWEB


生成相关日志信息,可按小时、天等(现阶段是按天)


2.PYTHON


日志分割程序,依据文件行数对


OTSWEB


日志实施分割为多个文件,同时写入


flume


客户端侦测的文件夹中


3.flume


客户端侦测文件夹变化,对新增文件读取,并提交至其他服务器的


flume


日志存储模块


4.flume


日志存储模块将收到的日志整合,按


5


分钟为时限将日志数据写入


HDFS




日志模块可参考下图所示:




(蓝色





我方应用,灰色





依赖软件,白色





后续扩展)


Flume


程序安装:




依据服务器客观条件,我们应先检测是否有安装


JDK


以及


CDH


,如上述软件已安装完毕,则将软件包解压放置相关目录下。


解压后目录:


bin目录









放置


flume


的启动命令文件。(flume-ng)


conf目录:放置


flume


核心的配置文件,包括


flume


工作脚本(flume-conf.properties用于定制个性化日志收集,主要工作内容)、


flume


运行信息(flume-env.sh用于配置


JDK


等信息)、


flume


日志管理(log4j.properties用于配置


log4j


信息)


Cloudera目录:用于放置


CDH


管理


flume


信息,可以暂时忽略不计,独立


flume


运行,不依赖此包内容


docs目录:是记录


flume


参考文档


lib


目录:是


flume


运行依赖类库


tools目录:是配置log4j直接输出日志到flume集成配置使用的相关


jar





(


我们非是使用此种方式,亦可忽略不计


)




我们需要配置的内容相对简单,仅需要两步骤操作:





一、配置环境变量


vim  /etc/profile


#flume


export FLUME_HOME=/home/grid/flume/apache-flume-xxx-bin


export  FLUME_CONF_DIR=$FLUME_HOME/conf


export  PATH=$PATH:$FLUME_HOME/bin


source  /etc/profile





二、配置


conf/


flume-conf.properties


(


核心


)




配置


conf/


flume-conf.properties是我们工作的核心,也是


flume


需要我们定制个性化服务的支撑点。




配置源一个


JOB


,通常称之为


agent


,其主要包括三个部分,分别是数据入口(sources)、管道(channels)、数据出口(


sink






数据入口(sources)


:


配置数据源信息,我们主要使用SpoolingDirectory Source和Avro Source,其他支持包括Exce Source、NetCat Source、SyslogSource、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source,etc。




管道(channels):保障数据的传输高可用及高可靠,有两种分别是基于内存及硬盘,我们为了保证其安全、稳定则使用基于硬盘传输,支持断点续传等功能




数据出口(


sink


):配置数据最终的存储位置,我们主要使用HDFSSink和Avro Sink,其他支持包括Logger Sink、Thrift Sink、IRC Sink、File Roll Sink、NullSink、HBaseSinks、HBaseSink、AsyncHBaseSink、MorphlineSolrSink、ElasticSearchSink、CustomSink


flume-conf配置:




我们使用


flume


主要包括两个部分


agent


完成数据的对接工作,分别是


flume


客户端及


flume


日志存储。







flume




客户端负责



收集


otsweb


应用端的日志,与


otsweb


应用为一对一的关系,强依赖于


otsweb


应用。(


flume


客户端与


python


程序安装于


otsweb


应用服务器)







flume




日志存储



是将


flume


客户端的日志存储到


HDFS


中,同时依据相关配置信息完成文件传输及存储,


flume


客户端与


flume


日志存储是一对多的关系,此方式可以保证并行存储至


HDFS


,不会受到


HDFS


管道流写入的限制。




flume


客户端配置文件(flume-conf.properties):




# TODO


定义数据入口(sources)、管道(channels)、数据出口(


sink




#


定义数据入口(sources)

a1.sources= r1


#


定义两个数据出口(


sink


),此处


sink


数量是与


flume


日志存储数据相呼应,暂定为两个


SINK

a1.sinks= k1 k2


#


定义管道(channels)

a1.channels= c1


#


TODO


配置数据入口(sources)


#


定义数据源类型为监听文件夹

a1.sources.r1.type= spooldir


#


配置对接管道

a1.sources.r1.channels= c1


#


监听目录

a1.sources.r1.spoolDir= /home/grid/jboss/source


#


定义读取至管道后,删除源文件

a1.sources.r1.deletePolicy= immediate


#


定义两个拦截器,分别是添加时间戳和主机名

a1.sources.r1.interceptors=i1 i2


#


定义拦截器为HostInterceptor

a1.sources.r1.interceptors.i1.type= host


#


主机名赋值,此处也可以写其他的

a1.sources.r1.interceptors.i1.hostHeader= host


#


定义拦截器为TimestampInterceptor,默认会向


head


添加时间戳,后续


flume


日志上传的


conf


文件的


HDFS


时间则来源于此

a1.sources.r1.interceptors.i2.type= timestamp


#


TODO


数据出口(


sink




#


定义类型为


AVRO


方式,使用


RPC


方式调用,故此需要端口号

a1.sinks.k1.type= avro


#


配置对接管道

a1.sinks.k1.channel= c1


# flume


日志上传的


flume


组件的


host

a1.sinks.k1.hostname= my1


# flume


日志上传的


flume


组件的端口(与其


flume


日志上传的sources相呼应)

a1.sinks.k1.port= 4141


#


定义类型为


AVRO


方式,使用


RPC


方式调用,故此需要端口号

a1.sinks.k2.type= avro


#


配置对接管道

a1.sinks.k2.channel= c1


# flume


日志上传的


flume


组件的


host

a1.sinks.k2.hostname= my2


# flume


日志存储的


flume


组件的端口(与其


flume


日志存储的sources相呼应)

a1.sinks.k2.port= 4141


# TODO


配置


SINK


组,能够完成数据一个组内数据均衡发送效果


#


定义组名

a1.sinkgroups= g1


#


定义分组的


SINK

a1.sinkgroups.g1.sinks= k1 k2


#


负载均衡

a1.sinkgroups.g1.processor.type= load_balance


#


发生冲突时的强制性重传

a1.sinkgroups.g1.processor.backoff= true


#


传输数据选择机制


,


默认是轮调


,


生产机设置为随机


,


后期优化可

测试

调整

a1.sinkgroups.g1.processor.selector= random


#


TODO


管道(channels)


#


临时存储方式为硬盘

a1.channels.c1.type= file


#


检查传输及断点续传等验证文件,可原理可参考


MR


合并小文件的方式

a1.channels.c1.checkpointDir= /home/grid/flume/tmp/checkpoint


#


传输数据

a1.channels.c1.dataDirs= /home/grid/flume/tmp/data


flume


日志存储配置文件(flume-conf.properties):


# TODO


定义数据入口(sources)、管道(channels)、数据出口(


sink




#


定义数据入口(sources)

agent.sources= r1


#


定义数据出口(


sink



agent.sinks= k1


#


管道(channels)

agent.channels= c1


#


TODO


配置数据入口(sources)


#


定义数据源


AVRO


方式,使用


RPC


方式接收,故此需要端口号

agent.sources.r1.type= avro


#


配置对接管道

agent.sources.r1.channels= c1


#


限制接收数据的发送方


ID,


0.0.0.0是接收任何


IP


,不做限制

agent.sources.r1.bind= 0.0.0.0


#


接收端口(与其


flume


客户端的


sink


相呼应)

agent.sources.r1.port= 4141


#


TODO


数据出口(


sink




#


定义类型为


HDFS


方式存储

agent.sinks.k1.type= hdfs


#


配置对接管道

agent.sinks.k1.channel= c1


#


配置


HDFS


存储目录,此处也可使用


%{host}


主机名

agent.sinks.k1.hdfs.path= hdfs://my1:9000/log/%Y%m%d/%H/


#


设置压缩与非压缩,此处非压缩

agent.sinks.k1.hdfs.fileType=DataStream


#


格式化文件


,


可选“Text” or “Writable”,此处选择文本方式

agent.sinks.k1.hdfs.writeFormat= Text


#


文件头前缀名,亦可增加/%Y%m%d/%H/等属性

agent.sinks.k1.hdfs.filePrefix= processlog


#


读取到指定记录数


,


将一次写入数据存入HDFS中,


0


则表示不启用

agent.sinks.k1.hdfs.rollCount= 0


#


读取到指定


KB


的文件大小,将一次写入数据存入HDFS中,


0


则表示参数不启用

agent.sinks.k1.hdfs.rollSize= 0


#


读取到指定秒数的日志读取,将一次写入数据存入HDFS中,


300





60


×


5





5


分钟)

agent.sinks.k1.hdfs.rollInterval= 300


#


一次事件写入hdfs的个数


agent.sinks.k1.hdfs.batchSize = 1000


0


#


TODO


管道(channels)


#


临时存储方式为硬盘

agent.channels.c1.type= file


#


检查传输及断点续传等验证文件,可原理可参考


MR


合并小文件的方式

agent.channels.c1.checkpointDir= /home/grid/flume/tmp/checkpoint


#


传输数据

agent.channels.c1.dataDirs= /home/grid/flume/tmp/data