安装
- 安装1.8版本对应jdk1.8
- 修改后缀是template的文件
连接kafka source为exec
- 配置flume-conf.properties
agent.sources = s1
agent.channels = c1
agent.sinks = k1
# For each one of the sources, the type is defined
agent.sources.s1.type = exec
agent.sources.s1.channels = c1
# tail整个文件夹 失败 无法发送至kafka
agent.sources.s1.command = tail -fn 400 /home/joy/test/abc.log
# Each sink's type must be defined
agent.sinks.loggerSink.type = logger
#Specify the channel the sink should use
agent.sinks.loggerSink.channel = c1
# Each channel's type is defined.
agent.channels.c1.type = memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100
#设置kafka接受器
agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.brokerList=192.168.1.55:9092
agent.sinks.k1.topic=topic1
agemt.sinks.k1.serializer.class=kafka.serializer.StringEncoder
agent.sinks.k1.channel=c1
- 启动命令
./bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties -Dflume.root.logger=INFO,console
问题
-
The channel is full or unexpected failure. The source will try again after 4000 ms
可能是文件瞬间过多
这个是因为当前被采集的文件过大,深层的原因是文件采集的速度和sink的速度没有匹配好。所以应该可以通过增大keep-alive的值解决:
#channel中最多缓存多少
a1.channels.c1.capacity = 5000
#channel一次最多吐给sink多少
a1.channels.c1.transactionCapacity = 2000
#event的活跃时间
a1.channels.c1.keep-alive = 10
-
Last read was never committed – resetting position
https://blog.csdn.net/sinat_34364663/article/details/53116264
源码解析的很好 里面有个readEvent抛出的这个错误 reset偏移量 -
Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events - (别人的一些报错)[https://blog.csdn.net/lijinqi1987/article/details/77449889]
版权声明:本文为weixin_43808675原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。