Kafka安装与性能测试

  • Post author:
  • Post category:其他

参考官网:
http://kafka.apache.org/0102/documentation.html#quickstart

1.下载解压
tar -zxvf  kafka_2.12-0.10.2.1
cd kafka_2.12-0.10.2.1

2.启动zookeeper:

配置文件:  [root@rhel64-64bit config]# grep -iv -e '^#' -e '^$' zookeeper.properties 
          dataDir=/tmp/zookeeper
          clientPort=2181
          maxClientCnxns=0
启动命令如下:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &>/root/zoo-log &

启动kafka:

配置文件:  [root@rhel64-64bit config]# grep -iv -e '^#' -e '^$' server.properties
           broker.id=0
           num.network.threads=3
           num.io.threads=8
           socket.send.buffer.bytes=102400
           socket.receive.buffer.bytes=102400
           socket.request.max.bytes=104857600
           log.dirs=/tmp/kafka-logs
           num.partitions=1
           num.recovery.threads.per.data.dir=1
           log.retention.hours=168
           log.segment.bytes=1073741824
           log.retention.check.interval.ms=300000
           zookeeper.connect=127.0.0.1:2181
           zookeeper.connection.timeout.ms=6000
           listeners=PLAINTEXT://127.0.0.1:9092
启动命令如下:
bin/kafka-server-start.sh -daemon config/server.properties
或者:
nohup bin/kafka-server-start.sh config/server.properties &>/root/kafka-log &

3.   创建一个topic:
bin/kafka-topics.sh –create –zookeeper 127.0.0.1:2181 –replication-factor 1 –partitions 2 –topic test3
      列出所有topic:
bin/kafka-topics.sh –list –zookeeper 127.0.0.1:2181
      查看某个topic的详细信息:
bin/kafka-topics.sh –describe –zookeeper 127.0.0.1:2181 –topic test3

[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test3
Topic:test3	PartitionCount:2	ReplicationFactor:1	Configs:    
	Topic: test3	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: test3	Partition: 1	Leader: 0	Replicas: 0	Isr: 0

---2个分区Partition: 0和Partition: 1,在/tmp/kafka-logs下生成相应test3-0和test-1目录

4. —–producer发送消息:
bin/kafka-console-producer.sh –broker-list 127.0.0.1:9092 –topic test3
abc
bcd
   —— consumer接收到producer发出的消息:
( 如果不指定group.id的话每次都自动生成新的groupid console-consumer-XXXX,默认参数–new-consumer)
bin/kafka-console-consumer.sh –bootstrap-server 127.0.0.1:9092 –topic test3 –from-beginning
abc
bcd

5.上面的topic test3创建时候指定了 –partitions 2 也就是会有2个分区,下面来测一下partition和consumer的关系
   1个partition只能被同group的一个consumer消费,1个consumer可以消费多个partition,如果consumer数大于
    partition则多出来的consumer将不能消费到任何消息.

1>打开2个consumer窗口:
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test3  --consumer-property group.id=test-group
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test3  --consumer-property group.id=test-group
2>producer发出10条消息:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test3
1     2    3    4    5    6    7     8    9   0

第一个consumer窗口,接收到如下:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test3  --consumer-property group.id=test-group
2
4
6
8
0
第二个consumer窗口,接收到如下:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test3 --consumer-property group.id=test-group
1
3
5
7
9
3>列出所有的group:
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092  --list  --new-consumer
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092  --list  --new-consumer
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
console-consumer-46595   
console-consumer-95465
test-group
test-group2
4>查看group的状态:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group test-group
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

TOPIC  PARTITION CURRENT-OFFSET  LOG-END-OFFSET LAG CONSUMER-ID                                      HOST       CLIENT-ID
test3  0         30              30              0   consumer-1-4d5470d9-da66-4afd-98f5-6d78227eb333 /127.0.0.1 consumer-1
test3  1         30              30              0   consumer-1-c6d1df4d-ec60-4469-ad18-81368efc153e /127.0.0.1 consumer-1
LAG为0说明test3 topic里面的消息已经被testgroup的consumer消费完了
5>此时把前面2个consumer关闭,producer再发出3条信息:
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test3
abc,edc
233,678
eff

[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group test-group
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
Consumer group 'test-group' has no active members.
TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG     CONSUMER-ID    HOST     CLIENT-ID
test3   0          30              31              1         -                          -
test3   1          30              32              2 
显示partition0有1个消息未被消费,partition1有2个消息未被消费

这时,重新开一个consumer,他会从该组上次的最后的offset开始消费,也是上面的CURRENT-OFFSET:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test3  --consumer-property group.id=test-group
233,678
abc,edc
eff
但是此时的消息顺序是和producer的顺序不一样的,可见kafka只能保证分区内的消息的有序性
6>我们也可以修改consumer的读取offset位置来,pull想要的消息:
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test3  --consumer-property group.id=test-group --offset 30 --partition 1 --指定从partition1中偏移量30开始读取消息

6.查看zookeeper里面的znode:
bin/zkCli.sh -server 127.0.0.1:2181  —进入zookeeper的的shell
bin/zookeeper-shell.sh 127.0.0.1:2181

help---帮助查看所有命令:
ZooKeeper -server host:port cmd args
	stat path [watch]
	set path data [version]
	ls path [watch]
	delquota [-n|-b] path

ls /  ----列出所有目录
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, config]

get /brokers/ids/0   ----查看broker node注册信息
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9092"],"
jmx_port":-1,"host":"127.0.0.1","timestamp":"1530676938758","port":9092,"version":4}
cZxid = 0x120
ctime = Wed Jul 04 12:02:18 HKT 2018
mZxid = 0x120
mtime = Wed Jul 04 12:02:18 HKT 2018
pZxid = 0x120
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1646080c2e30004
dataLength = 188
numChildren = 0

如果把broker停止,则获取不到broker的信息了,说明broker注册在zookeeper只是一个临时znode
get /brokers/ids/0
Node does not exist: /brokers/ids/0

7.搭建broker cluster:

1>配置文件:
config/server.properties:
    broker.id=0
    log.dirs=/tmp/kafka-logs
    zookeeper.connect=127.0.0.1:2181
    listeners=PLAINTEXT://127.0.0.1:9092

config/server1.properties:
    broker.id=1
    log.dirs=/tmp/kafka1-logs
    zookeeper.connect=127.0.0.1:2181
    listeners=PLAINTEXT://127.0.0.1:9093

config/server2.properties:
    broker.id=2
    log.dirs=/tmp/kafka2-logs
    zookeeper.connect=127.0.0.1:2181
    listeners=PLAINTEXT://127.0.0.1:9096
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-server-start.sh -daemon config/server.properties
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-server-start.sh -daemon config/server1.properties
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-server-start.sh -daemon config/server2.properties
[root@rhel64-64bit kafka_2.12-0.10.2.1]# netstat -pan|grep 9092

启动后,可以再zookeeper看到三个broker node:
ls /brokers/ids
[0, 1, 2]
2>创建topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 3 --topic test1
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test1   -----如果没有leader则显示是-1
Topic:yooo	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: yooo	Partition: 0	Leader: 2	Replicas: 2,1	Isr: 2,1
	Topic: yooo	Partition: 1	Leader: 0	Replicas: 0,2	Isr: 0,2
	Topic: yooo	Partition: 2	Leader: 1	Replicas: 1,0	Isr: 1,0
3>发送和读取消息
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test1
123
1234
123456

测试结果是从任何的一个broker上读都可以:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9093 --topic test1 --from-beginning
1234
123456
123
4>kill掉9092的broker后,partition0之前的leader是9092的broker.id=0上,现在变为leader在9096的broker.id=2上:
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test1
Topic:yooo	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: yooo	Partition: 0	Leader: 2	Replicas: 2,1	Isr: 2,1
	Topic: yooo	Partition: 1	Leader: 2	Replicas: 0,2	Isr: 2
	Topic: yooo	Partition: 2	Leader: 1	Replicas: 1,0	Isr: 1

再次消费test1 topic的消息:
 ----测试结果是不加partition参数消费不了消息,有待研究为什么?
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9093 --topic test1 --from-beginning --partition 1

8.kafka自带的performance test脚本测试性能:

[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic yoyo
Created topic "yoyo".
[root@rhel64-64bit kafka_2.12-0.10.2.1]# bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic yoyo
Topic:yoyo	PartitionCount:3	ReplicationFactor:1	Configs:
	Topic: yoyo	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
	Topic: yoyo	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: yoyo	Partition: 2	Leader: 1	Replicas: 1	Isr: 1

bin/kafka-producer-perf-test.sh --num-records 500000 --topic yoyo --record-size 1000 --throughput 50000 --producer-props bootstrap.servers=127.0.0.1:9092

--num-records 500000 ,50w条记录,每条--record-size 1000字节,总共500M:
bin/kafka-producer-perf-test.sh --num-records 500000 --topic yoyo --record-size 1000 --throughput 50000 --producer-props bootstrap.servers=127.0.0.1:9092
73036 records sent, 14604.3 records/sec (13.93 MB/sec), 1385.6 ms avg latency, 1996.0 max latency.
131085 records sent, 26211.8 records/sec (25.00 MB/sec), 1171.9 ms avg latency, 1503.0 max latency.
94995 records sent, 18995.2 records/sec (18.12 MB/sec), 1631.8 ms avg latency, 2437.0 max latency.
69405 records sent, 13878.2 records/sec (13.24 MB/sec), 1170.2 ms avg latency, 4569.0 max latency.
500000 records sent, 20747.748869 records/sec (19.79 MB/sec), 1393.22 ms avg latency, 4657.00 ms max latency, 
1284 ms 50th, 3250 ms 95th, 4606 ms 99th, 4649 ms 99.9th.

由上,使用上面脚本可以分别测试不同的分区,副本个数下kafka的性能.


版权声明:本文为aryoyo原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。