参考官网:
http://kafka.apache.org/0102/documentation.html#quickstart
1.下载解压
tar -zxvf kafka_2.12-0.10.2.1
cd kafka_2.12-0.10.2.1
2.启动zookeeper:
配置文件: [root@rhel64-64bit config]# grep -iv -e '^#' -e '^$' zookeeper.properties
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
启动命令如下:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &>/root/zoo-log &
启动kafka:
配置文件: [root@rhel64-64bit config]# grep -iv -e '^#' -e '^$' server.properties
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
listeners=PLAINTEXT://127.0.0.1:9092
启动命令如下:
bin/kafka-server-start.sh -daemon config/server.properties
或者:
nohup bin/kafka-server-start.sh config/server.properties &>/root/kafka-log &
3. 创建一个topic:
bin/kafka-topics.sh –create –zookeeper 127.0.0.1:2181 –replication-factor 1 –partitions 2 –topic test3
列出所有topic:
bin/kafka-topics.sh –list –zookeeper 127.0.0.1:2181
查看某个topic的详细信息:
bin/kafka-topics.sh –describe –zookeeper 127.0.0.1:2181 –topic test3
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test3
Topic:test3 PartitionCount:2 ReplicationFactor:1 Configs:
Topic: test3 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test3 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
---2个分区Partition: 0和Partition: 1,在/tmp/kafka-logs下生成相应test3-0和test-1目录
4. —–producer发送消息:
bin/kafka-console-producer.sh –broker-list 127.0.0.1:9092 –topic test3
abc
bcd
—— consumer接收到producer发出的消息:
( 如果不指定group.id的话每次都自动生成新的groupid console-consumer-XXXX,默认参数–new-consumer)
bin/kafka-console-consumer.sh –bootstrap-server 127.0.0.1:9092 –topic test3 –from-beginning
abc
bcd
5.上面的topic test3创建时候指定了 –partitions 2 也就是会有2个分区,下面来测一下partition和consumer的关系
1个partition只能被同group的一个consumer消费,1个consumer可以消费多个partition,如果consumer数大于
partition则多出来的consumer将不能消费到任何消息.
1>打开2个consumer窗口:
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test3 --consumer-property group.id=test-group
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test3 --consumer-property group.id=test-group
2>producer发出10条消息:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test3
1 2 3 4 5 6 7 8 9 0
第一个consumer窗口,接收到如下:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test3 --consumer-property group.id=test-group
2
4
6
8
0
第二个consumer窗口,接收到如下:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test3 --consumer-property group.id=test-group
1
3
5
7
9
3>列出所有的group:
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list --new-consumer
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list --new-consumer
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
console-consumer-46595
console-consumer-95465
test-group
test-group2
4>查看group的状态:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group test-group
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test3 0 30 30 0 consumer-1-4d5470d9-da66-4afd-98f5-6d78227eb333 /127.0.0.1 consumer-1
test3 1 30 30 0 consumer-1-c6d1df4d-ec60-4469-ad18-81368efc153e /127.0.0.1 consumer-1
LAG为0说明test3 topic里面的消息已经被testgroup的consumer消费完了
5>此时把前面2个consumer关闭,producer再发出3条信息:
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test3
abc,edc
233,678
eff
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group test-group
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
Consumer group 'test-group' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test3 0 30 31 1 - -
test3 1 30 32 2
显示partition0有1个消息未被消费,partition1有2个消息未被消费
这时,重新开一个consumer,他会从该组上次的最后的offset开始消费,也是上面的CURRENT-OFFSET:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test3 --consumer-property group.id=test-group
233,678
abc,edc
eff
但是此时的消息顺序是和producer的顺序不一样的,可见kafka只能保证分区内的消息的有序性
6>我们也可以修改consumer的读取offset位置来,pull想要的消息:
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test3 --consumer-property group.id=test-group --offset 30 --partition 1 --指定从partition1中偏移量30开始读取消息
6.查看zookeeper里面的znode:
bin/zkCli.sh -server 127.0.0.1:2181 —进入zookeeper的的shell
bin/zookeeper-shell.sh 127.0.0.1:2181
help---帮助查看所有命令:
ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls / ----列出所有目录
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, config]
get /brokers/ids/0 ----查看broker node注册信息
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9092"],"
jmx_port":-1,"host":"127.0.0.1","timestamp":"1530676938758","port":9092,"version":4}
cZxid = 0x120
ctime = Wed Jul 04 12:02:18 HKT 2018
mZxid = 0x120
mtime = Wed Jul 04 12:02:18 HKT 2018
pZxid = 0x120
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1646080c2e30004
dataLength = 188
numChildren = 0
如果把broker停止,则获取不到broker的信息了,说明broker注册在zookeeper只是一个临时znode
get /brokers/ids/0
Node does not exist: /brokers/ids/0
7.搭建broker cluster:
1>配置文件:
config/server.properties:
broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=127.0.0.1:2181
listeners=PLAINTEXT://127.0.0.1:9092
config/server1.properties:
broker.id=1
log.dirs=/tmp/kafka1-logs
zookeeper.connect=127.0.0.1:2181
listeners=PLAINTEXT://127.0.0.1:9093
config/server2.properties:
broker.id=2
log.dirs=/tmp/kafka2-logs
zookeeper.connect=127.0.0.1:2181
listeners=PLAINTEXT://127.0.0.1:9096
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-server-start.sh -daemon config/server.properties
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-server-start.sh -daemon config/server1.properties
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-server-start.sh -daemon config/server2.properties
[root@rhel64-64bit kafka_2.12-0.10.2.1]# netstat -pan|grep 9092
启动后,可以再zookeeper看到三个broker node:
ls /brokers/ids
[0, 1, 2]
2>创建topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 3 --topic test1
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test1 -----如果没有leader则显示是-1
Topic:yooo PartitionCount:3 ReplicationFactor:2 Configs:
Topic: yooo Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: yooo Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: yooo Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0
3>发送和读取消息
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test1
123
1234
123456
测试结果是从任何的一个broker上读都可以:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9093 --topic test1 --from-beginning
1234
123456
123
4>kill掉9092的broker后,partition0之前的leader是9092的broker.id=0上,现在变为leader在9096的broker.id=2上:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test1
Topic:yooo PartitionCount:3 ReplicationFactor:2 Configs:
Topic: yooo Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: yooo Partition: 1 Leader: 2 Replicas: 0,2 Isr: 2
Topic: yooo Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1
再次消费test1 topic的消息:
----测试结果是不加partition参数消费不了消息,有待研究为什么?
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9093 --topic test1 --from-beginning --partition 1
8.kafka自带的performance test脚本测试性能:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic yoyo
Created topic "yoyo".
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic yoyo
Topic:yoyo PartitionCount:3 ReplicationFactor:1 Configs:
Topic: yoyo Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: yoyo Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: yoyo Partition: 2 Leader: 1 Replicas: 1 Isr: 1
bin/kafka-producer-perf-test.sh --num-records 500000 --topic yoyo --record-size 1000 --throughput 50000 --producer-props bootstrap.servers=127.0.0.1:9092
--num-records 500000 ,50w条记录,每条--record-size 1000字节,总共500M:
bin/kafka-producer-perf-test.sh --num-records 500000 --topic yoyo --record-size 1000 --throughput 50000 --producer-props bootstrap.servers=127.0.0.1:9092
73036 records sent, 14604.3 records/sec (13.93 MB/sec), 1385.6 ms avg latency, 1996.0 max latency.
131085 records sent, 26211.8 records/sec (25.00 MB/sec), 1171.9 ms avg latency, 1503.0 max latency.
94995 records sent, 18995.2 records/sec (18.12 MB/sec), 1631.8 ms avg latency, 2437.0 max latency.
69405 records sent, 13878.2 records/sec (13.24 MB/sec), 1170.2 ms avg latency, 4569.0 max latency.
500000 records sent, 20747.748869 records/sec (19.79 MB/sec), 1393.22 ms avg latency, 4657.00 ms max latency,
1284 ms 50th, 3250 ms 95th, 4606 ms 99th, 4649 ms 99.9th.
由上,使用上面脚本可以分别测试不同的分区,副本个数下kafka的性能.