kafka下载地址:
https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
前期准备:
cent os7 安装zookeeper3.6.0:
https://blog.csdn.net/weixin_39816740/article/details/104995674
Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群。
上传到目录/usr/local/java/下(rz,sz)解压
解压 tar -zxvf kafka_2.13-2.6.0.tgz
创建存放kafka日志目录 mkdir kafkalogs
跟zookeeper一样修改下文件名字 mv kafka_2.13-2.6.0 kafka
复制2份分别叫kafka2,kafka3
cp -r kafka kafka2
cp -r kafka kafka3
修改下文件名字,不小心整错了:
分别修改修改配置config/server.properties(3个kafka中的配置)
我的zookeeper的连接端口:10.108.3.61:2183,10.108.3.61:2182,10.108.3.61:2181
日志文件路经:/usr/local/java/kafka/kafkalogs
vim kafka/config/server.properties
broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://10.108.3.61:9091 #监听的IP和端口
advertised.listeners=PLAINTEXT://10.108.3.61:9091
log.dirs=/usr/local/java/kafka/kafkalogs #消息存放的目录
zookeeper.connect=10.108.3.61:2183,10.108.3.61:2182,10.108.3.61:2181 #设置zookeeper的连接端口
另外两台机器同样配置,不过注意修改broker.id和ip和端口。
vim kafka2/config/server.properties
broker.id=2 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://10.108.3.61:9092 #监听的IP和端口
advertised.listeners=PLAINTEXT://10.108.3.61:9092
log.dirs=/usr/local/java/kafka2/kafkalogs #消息存放的目录
zookeeper.connect=10.108.3.61:2183,10.108.3.61:2182,10.108.3.61:2181 #设置zookeeper的连接端口
vim kafka3/config/server.properties
broker.id=3 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://10.108.3.61:9093 #监听的IP和端口
advertised.listeners=PLAINTEXT://10.108.3.61:9093
log.dirs=/usr/local/java/kafka3/kafkalogs #消息存放的目录
zookeeper.connect=10.108.3.61:2183,10.108.3.61:2182,10.108.3.61:2181 #设置zookeeper的连接端口
查看启动的zookeeper状态
依次启动 kafka
kafka/bin/kafka-server-start.sh kafka/config/server.properties &
kafka2/bin/kafka-server-start.sh kafka2/config/server.properties &
kafka3/bin/kafka-server-start.sh kafka3/config/server.properties &
查看是否启动成功
netstat -anp|grep 9091
netstat -anp|grep 9092
netstat -anp|grep 9093
创建主题(在kafka目录中)
kafka/bin/kafka-topics.sh –create –zookeeper 10.108.3.61:2181 –replication-factor 1 –partitions 1 –topic 主题名称
查看已有的主题
kafka/bin/kafka-topics.sh –list –zookeeper 10.108.3.61:2181
启动生产者做消息发布
kafka/bin/kafka-console-producer.sh –broker-list 10.108.3.61:9091 –topic ceshi
发布消息
另开一个窗口订阅消息并开启消费者(注意都是在kafka目录)
开启消费者订阅消息:
kafka/bin/kafka-console-consumer.sh –bootstrap-server 10.108.3.61:9091 –from-beginning –topic ceshi
大功告成!
删除topic
kafka/bin/kafka-topics.sh –delete –zookeeper 10.108.3.61:2181 –topic ceshi
查看删除成功