flume监听文件数据–发送到kafka中

  • Post author:
  • Post category:其他


flume安装:(略,见前面内容)

kafka安装:(略,见前面内容)


创建kafka topic


bin/kafka-topics.sh –create –zookeeper master:2181,slave1:2181,slave2:2181 –replication-factor 1 –partitions 2 –topic test

【前面进行了环境配置了就可以直接用kafka,如果没有,那就到kafka的bin目录下,后面master、slave1、slave2是自己集群节点名称】


消费—-重新开启一个页面


./kafka-console-consumer.sh –zookeeper master:2181,slave1:2181,slave2:2181 –topic test


1.启动kafka需要启动zookeeper cd zookeeper Home


[./bin/zkServer.sh start]三个节点分别启动


2.启动kafka:到kafka home


./bin/kafka-server-start.sh config/server.properties


3.如果没有topic,创建topic


查看topic list:

bin/kafka-topics.sh –list –zookeeper master:2181,slave1:2181,slave2:2181

创建topic:

bin/kafka-topics.sh –create –zookeeper master:2181,slave1:2181,slave2:2181 –replication-factor 1 –partitions 2 –topic test


4.启动flume: [tokafka.conf]

bin/flume-ng agent -c conf -f conf/tokafka.conf -n a1 -Dflume.root.logger=INFO,console


[tokafka.conf文件示例]

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -f /home/data/python/flume_test.txt

# 设置kafka接收器

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

# 设置kafka的broker地址和端口号

a1.sinks.k1.brokerList=master:9092

# 设置Kafka的topic

a1.sinks.k1.topic=badou

# 设置序列化的方式

a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder

# use a channel which buffers events in memory

a1.channels.c1.type=memory

a1.channels.c1.capacity = 100000

a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1


5.“写”数据到flume中

因为tokafka.conf中的source以

a1.sources.r1.type = exec

a1.sources.r1.command = tail -f /home/data/python/flume_test.txt

flume监控这个路径下的文件

只要有数据追加到这个文件中,这些数据就会被监控,

通过这个source写入flume中

需要用到【read_write.py】将一个文件中的数据写入到/home/data/python/flume_test.txt

【read_write.py】代码

# -*- coding: utf-8 -*-
import random
import time
readFileName="/home/data/pro_data"
writeFileName="flume_test.txt"
with open(writeFileName,'a+')as wf:
    with open(readFileName,'rb') as f:
        for line in f.readlines():
            for word in line.split(" "):
                ss = line.strip()
                if len(ss)<1:
                    continue
                wf.write(ss+'\n')
            rand_num = random.random()
            time.sleep(rand_num)

操作后consumer窗口运行看到的结果:



版权声明:本文为zoe_ranxiaosu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。