flume监控日志上传到kafka

  • Post author:
  • Post category:其他




安装

  1. 安装1.8版本对应jdk1.8
  2. 修改后缀是template的文件



连接kafka source为exec

  1. 配置flume-conf.properties

agent.sources = s1
agent.channels = c1
agent.sinks = k1

# For each one of the sources, the type is defined
agent.sources.s1.type = exec
agent.sources.s1.channels = c1
# tail整个文件夹  失败 无法发送至kafka
agent.sources.s1.command = tail -fn 400 /home/joy/test/abc.log

# Each sink's type must be defined
agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use
agent.sinks.loggerSink.channel = c1

# Each channel's type is defined.
agent.channels.c1.type = memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

#设置kafka接受器
agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.brokerList=192.168.1.55:9092
agent.sinks.k1.topic=topic1
agemt.sinks.k1.serializer.class=kafka.serializer.StringEncoder
agent.sinks.k1.channel=c1

  1. 启动命令

./bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties -Dflume.root.logger=INFO,console



问题

  1. The channel is full or unexpected failure. The source will try again after 4000 ms


    可能是文件瞬间过多

这个是因为当前被采集的文件过大,深层的原因是文件采集的速度和sink的速度没有匹配好。所以应该可以通过增大keep-alive的值解决:
#channel中最多缓存多少
a1.channels.c1.capacity = 5000
#channel一次最多吐给sink多少
a1.channels.c1.transactionCapacity = 2000
#event的活跃时间
a1.channels.c1.keep-alive = 10
  1. Last read was never committed – resetting position


    https://blog.csdn.net/sinat_34364663/article/details/53116264


    源码解析的很好 里面有个readEvent抛出的这个错误 reset偏移量
  2. Unable to deliver event. Exception follows.

    org.apache.flume.EventDeliveryException: Failed to publish events
  3. (别人的一些报错)[https://blog.csdn.net/lijinqi1987/article/details/77449889]



版权声明:本文为weixin_43808675原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。