Kafka Topic shell操作+基准测试+java API

  • Post author:
  • Post category:java

Kafka Topic shell操作+基准测试+java API

  • 1- Kafka的相关使用操作

    • shell命令使用
    • Java API的使用
  • 2- Kafka的核心原理:

    • 分片和副本机制

1 消息队列的基本介绍

1.1 消息队列产生背景

什么是消息队列呢?

消息: 数据 只不过这个数据具有一种流动状态
队列: 存储数据的容器 只不过这个容器具有FIFO(先进先出)特性

消息队列: 数据在队列中, 从队列的一端传递到另一端的过程, 数据在整个队列中产生一种流动状态

1.2 常见的消息队列的产品

常见的消息队列的产品:

  • 1- ActiveMQ: 出现时间比较早的一款消息队列的组件, 目前整个社区活跃度非常低, 此软件使用人群也在不断的减少, 此软件在前几年中被Java工程师主要使用
  • 2- RabbitMQ: 目前在Java领域中使用非常频繁的一款消息队列的产品, 其社区活跃度相对不错, 支持多种语言开发
  • 3- RocketMQ: 是由阿里推出一款的消息队列的中间件产品, 目前主要是在阿里系范围内使用, 目前支持的开发语言相对较少一些, 比如成熟的客户端还是JAVA
  • 4- Kafka: 是一款大数据领域下的消息队列的产品, 主要应用在大数据领域在, 在业务领域中使用较少
  • 5- Pulsar: 最近一两年新起的一款消息队列的组件, 也是Aapache顶级开源项目, 目前主要由StreamNative公司进行商业运营中

1.3 消息队列的作用是什么

  • 1- 同步转异步
  • 2- 应用解耦合
  • 3- 流量削峰 : 在秒杀场景中, 突然会有庞大的并发量, 但是过后就没有了
  • 4- 消息驱动系统

1.4 消息队列的两种消费模型

  • 点对点: 数据被生产到容器后, 最终这个数据只能被一个消费方来消费数据
  • 发布订阅: 数据被生产到容器后, 可以被多个消费方同时接收到

2 Kafka的基本介绍

​ Kafka是Apache旗下的一款开源免费的消息队列的中间件产品,最早是由领英公司开发的, 后期共享给Apache, 目前已经是Apache旗下的顶级开源的项目, 采用语言为Scala

​ 官方网站: http://www.kafka.apache.org

适用场景: 数据传递工作, 需要将数据从一端传递到另一端, 此时可以通过Kafka来实现, 不局限两端的程序

​ 在实时领域中, 主要是用于流式的数据处理工作

3 Kafka的架构

Kafka Cluster: kafka集群
broker: kafka的节点
producer: 生产者
consumer: 消费者
Topic: 主题/话题 理解就是一个大的逻辑容器(管道)
	shard: 分片. 一个Topic可以被分为N多个分片, 分片的数量与节点数据没有关系
	replicas: 副本, 可以对每一个分片构建多个副本, 副本数量最多和节点数量一致(包含本身) 保证数据不丢失
zookeeper: 存储管理集群的元数据信息

4 Kafka的安装操作

参考Kafka的集群安装文档 完成整个安装工作即可

如果安装后, 无法启动, 可能遇到的问题:

1) 配置文件中忘记修改broker id 
2) 忘记修改监听的地址, 或者修改了但是前面的注释没有打开

如何启动Kafka集群:

启动zookeeper集群: 每个节点都要执行
cd /export/server/zookeeper/bin
./zkServer.sh start

启动完成后 需要通过 jps -m 查看是否启动 , 并且还需要通过:
./zkServer.sh status 查看状态, 必须看到一个leader 两个follower才认为启动成功了

启动Kafka集群: 

单节点: 每个节点都需要执行
cd /export/server/kafka_2.12-2.4.1/bin
前台启动:
	./kafka-server-start.sh ../config/server.properties
后台启动:
	nohup ./kafka-server-start.sh ../config/server.properties 2>&1 &
注意: 第一次启动, 建议先前台启动, 观察是否可以正常启动, 如果OK, ctrl +C 退出, 然后挂载到后台

如何停止:

单节点: 每个节点都需要执行
cd /export/server/kafka_2.12-2.4.1/bin
操作:
	jps 然后通过 kill -9
	或者:
	./kafka-server-stop.sh

配置一键化脚本: 仅用于启动Kafka 不会启动zookeeper, zookeeper还是需要单独启动, 或者配置zookeeper的一键化脚本

  • 1- 创建一个onekey目录: node1节点
mkdir -p /export/onekey
  • 2- 将资料中提供的一键化脚本上传到此目录下
cd /export/onekey/
上传即可
  • 3- 对shell脚本赋执行的权限
cd /export/onekey/
chmod 755 *.sh
  • 4- 通过对应的脚本来执行启动和关闭即可

5 Kafka的相关使用

​ Kafka是一个消息队列的中间件产品, 主要的作用: 将数据从程序一端传递到另一端的操作, 所以说学习Kafka主要学习如何使用Kafka生产数据, 以及如何使用Kafka消费数据

5.1 Kafka的shell命令使用

  • 1- 如何创建Topic : kafka-topic.sh
./kafka-topics.sh  --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test01 --partitions 3 --replication-factor 2
  • 2- 查看当前有那些topic:
./kafka-topics.sh  --list  --zookeeper node1:2181,node2:2181,node3:2181
  • 3- 查看某一个Topic的详细信息:
 ./kafka-topics.sh --describe --zookeeper  node1:2181,node2:2181,node3:2181 --topic test01
  • 4- 如何删除Topic
./kafka-topics.sh --delete --zookeeper node1:2181,node2:2181,node3:2181 --topic test01
注意: 
	默认情况下, 删除一个topic 仅仅是标记删除, 主要原因: Kafka担心直接删除, 会导致误删数据
	
	如果想执行删除的时候, 直接将topic完整的删除掉: 此时需要在server.properties配置中修改一下配置为True
		delete.topic.enable=true
	
	如果本身Topic中的数据量非常少, 或者没有任何的使用, 此时Topic会自动先执行逻辑删除, 然后在物理删除, 不管是否配置delete.topic.enable
  • 5- 如何修改Topic
Topic 仅允许增大分片, 不允许减少分片 同时也不支持修改副本的数量

增大分区:
./kafka-topics.sh  --alter --zookeeper node1:2181,node2:2181,node3:2181 --topic test01  --partitions 5
  • 6- 如何模拟生产者: 发送数据
./kafka-console-producer.sh  --broker-list node1:9092,node2:9092,node3:9092 --topic test01
>
  • 7- 如何模拟消费者: 消费数据
./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test01

默认从当前的时间开始消费数据, 如果想从头开始消费, 可以添加 --from-beginning  参数即可

5.2 Kafka的基准测试

​ Kafka的基准测试: 主要指的是将安装完成的Kafka集群, 进行测试操作, 测试其能否承载多大的并发量(读写的效率)

​ 注意: 在进行Kafka的基准测试的时候, 受Topic的分片和副本的数量影响会比较大, 一般在测试的时候, 会构建多个topic, 每一个topic设置不同的分片和副本的数量, 比如: 一个设置分片多一些, 副本多一些 一个设置分片多一些, 副本少些…

  • 1- 创建一个Topic
./kafka-topics.sh  --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test02 --partitions 6 --replication-factor 1
  • 2- 测试写入的数据的效率:
cd /export/server/kafka/bin
./kafka-producer-perf-test.sh --topic test02 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1

属性说明:
	--num-records : 发送的总消息量
	--throughput : 指定吞吐量(限流)  -1 不限制
	--record-size: 每条数据的大小(字节)
    --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1 : 设置kafka的地址和消息发送模式
  • 3- 测试读取消息的效率
cd /export/server/kafka/bin
./kafka-consumer-perf-test.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test02 --fetch-size 1048576 --messages 5000000

属性说明:
	--fetch-size : 每次从Kafka端拉取数据量
	--message: 测试的总消息量
假设Kafka的节点数量是无限多的:
	topic分片数量越多, 理论上读写效率越高
	topic副本数量越多, 整体执行效率越差

一般可以将分片的数量设置为副本数量的三倍左右 可以测试出比较最佳的性能  副本调整为1

5.3 Kafka的Java API的操作

  • 1- 创建一个Maven的项目, 导入相关的依赖
	<repositories><!--代码库-->
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases><enabled>true</enabled></releases>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-io</artifactId>
            <version>1.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.6</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <target>1.8</target>
                    <source>1.8</source>
                </configuration>
            </plugin>
        </plugins>
    </build>

  • 2- 创建两个包结构: com.kafka.producer / com.kafka.consumer

5.3.1 演示如何将数据生产到Kafka

package com.itheima.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerTest {

    public static void main(String[] args) {

        // 第一步: 创建kafka的生产者核心对象: KafkaProducer  传入相关的配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        //2. 执行发送数据操作
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
                    "test01", "张三"+i
            );
            producer.send(producerRecord);
        }

        //3. 执行close 释放资源
        producer.close();

    }
}

5.3.2 演示如何从Kafka消费数据

package com.itheima.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerTest {
    public static void main(String[] args) {
        // 1- 创建Kafka的消费者的核心对象: KafkaConsumer
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.put("group.id", "test"); // 消费者组的ID
        props.put("enable.auto.commit", "true"); // 是否自动提交偏移量offset
        props.put("auto.commit.interval.ms", "1000"); // 自动提交的间隔时间
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key值的反序列化的类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的值反序列化的类

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //2. 订阅topic: 表示消费者从那个topic来消费数据  可以指定多个
        consumer.subscribe(Arrays.asList("test01"));

        while (true) {
            // 3. 从kafka中获取消息数据, 参数表示当kafka中没有消息的时候, 等待的超时时间, 如果过了等待的时间, 返回空对象(对象存在, 但是内部没有数据  相当于空容器)
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                long offset = record.offset();
                String key = record.key();
                String value = record.value();
                // 偏移量: 每一条数据 其实就是一个偏移量 , 每个分片单独统计消息到达了第几个偏移量 偏移量从 0 开始的
                System.out.println("消息的偏移量为:"+offset+"; key值为:"+key + "; value的值为:"+ value);
            }
        }

    }

}

版权声明:本文为niko_csdn原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。