使用flume把数据写入kafka,需要改变flume配置文件中的sinks属性
进入flume安装文件的conf下创建一个
.properties
文件 这里创建 c.properties,并对其进行配置
ak.sources = mysource
ak.channels = mychannel
ak.sinks = mysink
ak.sources.mysource.type = spooldir
ak.sources.mysource.channels = mychannel
ak.sources.mysource.spoolDir =/tmp/logs
ak.sinks.mysink.channel = mychannel
ak.sinks.mysink.channel = mychannel
ak.sinks.mysink.type = org.apache.flume.sink.kafka.KafkaSink
ak.sinks.mysink.kafka.topic = flume-data
ak.sinks.mysink.kafka.bootstrap.servers = localhost:9092
ak.sinks.mysink.kafka.flumeBatchSize = 20
ak.sinks.mysink.kafka.producer.acks = 1
ak.sinks.mysink.kafka.producer.linger.ms = 1
ak.sinks.mysink.kafka.producer.compression.type = snappy
ak.channels.mychannel.type = memory
ak.channels.mychannel.capacity = 10000
ak.channels.mychannel.transactionCapacity = 100
配置完之后进入flume安装目录启动flume
bin/flume-ng agent --conf conf --conf-file conf/c.properties --name ak -Dflume.root.logger=DEBUG,console
进入kafka安装目录启动kafka
//先启动 Zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
//在启动 kafka
bin/kafka-server-start.sh -daemon config/server.properties
创建toplic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume-data
新建一个窗口开启消费者等待数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flume-data --from-beginning
上面的flume 配置的是监听 /tmp/logs中的数据,所以我们新建一个文件复制到/tmp/logs中
新建test文件 里面写点数据
hadoop ni hao
hello java
hello flume
hello kafka
把test复制到/tmp/logs中
去消费者窗口查看可以看到
[chs@master kafka_2.11-1.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flume-data --from-beginning
hadoop ni hao
hadoop ni hao
hello java
hello flume
hello kafka
OK成功
版权声明:本文为mingyunxiaohai原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。