消息中间件学习笔记(二)——kafka

  • Post author:
  • Post category:其他



目录


一、kafka基本概念


1、kafka核心API


2、topic、partition


3、生产者、消费者


a)、生产者


b)、消费者


4、kafka的应用场景


二、kafka安装和启动


1、安装


2、启动


3、设置多个broker


4、利用Kafka Connect进行导入/导出数据


三、应用


1、生产者


2、消费者


a)、消费者故障


b)、自动提交offset场景示例


c)、手动提交offset场景示例


d)、订阅指定的分区


e)、手动控制消费者位置


f)、动态流量控制


g)、批量消费


3、流处理


四、配置


五、操作


六、安全


1、使用SSL加密和认证


2、SASL验证


a)、使用 SASL/Kerberos 认证


b)、使用SASL/PLAIN认证


七、知识点补充


1、配额


八、后记


一、kafka基本概念

kafka——一个分布式流处理平台,它作为一个集群,运行在一台或多台服务器上,通过topic对存储的流数据进行分类,每个消息包含一个key、一个value和一个timestamp。它并不是JMS规范的实现,在设计实现上与JMS不同。

1、kafka核心API


  • Producer API

    :允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。

  • Consumer API

    :允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。

  • Streams API

    :允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。

  • Connector API

    :允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。

2、topic、partition

对于每一个topic,kafka集群都会维持一个分区日志(一个topic的消息分几个区记录),每个分区都是一个有序且顺序不可变的记录集,分区中每个消息都会分配一个唯一标识id表示顺序(称之为offset—偏移量)。

kafka集群对所有已发布的消息,无论其是否被消费,都将保存直至过期(消息保存期限通过参数配置)。事实上,每一个消费者唯一保存的消息元数据是offset,消费的实际上是消息位置,消费之后,保存的offset发生变化,而消息仍然存在可供其他消费者消费。一般来说,消费者消费消息后会线性增加offset(即顺序消费),不过由于offset由消费者控制,所以消费者可以按照任何顺序来进行消费,比如将offset进行重置以重新处理酒数据,或者跳过最近记录从“现在开始消费”。这些细节说明对于kafka来说消费者是非常廉价的,消费者增加或减少对集群或者其他消费者没有太大影响。

每个partition(分区)都会在已配置的kafka集群服务器上进行备份(即对于一个分区P0,每个服务器都会有P0),以提高容错性。每个分区会选择一台服务器作为leader,零台或多台服务器作为followers,leader负责处理对该分区的所有读写操作,followers只需要被动同步leader上该分区的数据。当leader服务器挂掉后,followers会自动选出一台作为新leader。对于拥有N个副本的topic(一个leader,N-1个followers),最多容忍N-1个服务器故障。每台服务器都会成为某些分区的leader和某些分区的follower,实现负载平衡,避免对分区的读写操作一直集中在某几台服务器上。

总结partition的优点:第一,利于扩展。单个分区受限于服务器容量限制,但是一个topic可以有多个分区,因此通过增加分区数量,可以处理无限量的数据。第二,可以作为并行处理的单元。

3、生产者、消费者

a)、生产者

生产者将数据发布到所选topic中。生产者负责选择将数据发布到topic哪个分区中,可以使用循环选择方式实现简单的负载均衡,也可以根据某种算法按照权重选择,开发者自己设计和选择算法。

kafka保证发送到同一topic的同一分区的消息的偏移量按照顺序设置,即在同一分区,早发送的消息的偏移量一定比晚发送的消息的偏移量小。

b)、消费者

消费者使用一个消费组名称进行标识,即kafka中的消费者是一组消费者而不是单个进程,发布到topic的消息将分配给订阅该topic的所有消费组,一个消费组可能有多个消费者实例(便于扩展与容错),对于一个消息和一个消费组来说,每次将仅有一个消费者实例进行消费,同时实现负载平衡。kafka会将分区划分到某一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。维护消费组中的消费关系由Kafka协议动态处理,如果新的实例加入组,新实例将从组中其他成员处接管一些 partition分区,如果一个实例消失,该实例拥有的分区将被分发到剩余的实例。

kafka消费时,只保证分区内的消息有序,而不保证一个topic内不同分区的消息有序,若要使topic上的所有消息有序,可让该topic只拥有一个分区来实现,这也意味着订阅该topic的消费者组只有一个消费者实例。

kafka相比于其他消息中间件有更严格的顺序保证。传统消息中间件在消费队列中消息时,虽然服务器会按照顺序输出消息,但是消息时异步地传递给消费者,在并行消费的情况下,这些消息会无序的到达不同的消费者,即并行消费情况下消息的顺序发生丢失。虽然可以使用“唯一消费者”(即只让一个进程从队列中消费)达到顺序消费的目的,但是这样就意味着不能并行处理数据。在kafka的设计中,topic的分区是一个并行的概念。通过将分区分配给消费者组中的一个消费者实例,确保该消费者实例是该消费者组中分区的唯一读者,达到顺序消费的目的;同时,众多分区保证了消费者实例间的负载均衡,达到了并行消费的目的。需注意,消费者组中的消费者实例数量不能超过分区的数量,否则多出的消费者实例一直处于空等待,不会收到消息。

4、kafka的应用场景

kafka可以作为一个消息系统、存储系统。

kafka也可以用于流处理。流处理持续获取输入topic的消息,进行处理后,写入到输出topic。例如,一个零售APP,接收销售和出货的输入流,统计数量或调整价格后输出。可以直接使用producer和consumer API进行简单的处理。对于复杂的转换,Kafka提供了更强大的Streams API,可构建聚合计算或连接流到一起的复杂应用程序。

二、kafka安装和启动

1、安装

解压kafka文件后,bin/windows下的bat文件是程序运行时的脚本文件,bin下的其他.sh文件是linux需要的shell脚本,在windows环境下运行时不需要。config下是配置文件,启动前需要做一定的修改。

2、启动

  • 首先启动zookeeper,编辑zookeeper.properties,自行设置保存数据目录dataDir,端口默认2181。
  • 启动zookeeper:打开cmd窗口,切换到kafka目录下,命令行输入bin\windows\zookeeper-server-start.bat config\zookeeper.properties运行(笔者在测试时,发现jdk所在路径有空格会报无法加载主类的错误,所以jdk及kafka所在目录最好不要有空格)。
  • 启动kafka:编辑server.properties,设置log.dirs目录,通过bin\windows\kafka-server-start.bat config\server.properties命令启动kafka。(启动时,若jdk版本不对,会报UnsupportedClassVersionError错误)。
  • 创建主题:通过命令bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topicLxwTest创建一个topic。
  • 查看主题:通过命令bin\windows\kafka-topics.bat –list –zookeeper localhost:2181查看topic。
  • 启动生产者:bin\windows\kafka-console-producer.bat –broker-list localhost:9092 –topic topicLxwTest
  • 启动消费者:bin\windows\kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic topicLxwTest –from-beginning(注意:kafka高版本中,启动消费者命令中zookeeper项变为了bootstrap-server)

3、设置多个broker

目前我们只是运行了一个broker,下面多运行几个broker。

  • 复制两份config下的server.properties文件,命名为server-1.properties、server-2.properties
  • 修改两个文件中的broker.id、listeners、log.dir,如设置server-1.的broker.id为1,listeners为PLAINTEXT://:9093
  • 之前已运行了zookeeper和一个节点,通过命令再运行两个新节点:bin\windows\kafka-server-start.bat config\server-1.properties
  • 新建一个topic,备份设置为3:bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 3 –partitions 1 –topic topic1
  • 通过describe topics命令查看topic描述:bin\windows\kafka-topics.bat –describe –zookeeper localhost:2181 –topic topic1

  • 通过测试发现,关闭掉作为leader的broker后,消息并没有丢失,因为kafka会将消息在每个broker进行备份,具体测试过程见

    http://www.orchome.com/6

4、利用Kafka Connect进行导入/导出数据

以上是通过命令行进行消息的生产和消费,实际也可能使用其他数据源,或者将kafka中的数据导入到其他源中,这时可以使用Kafka Connect。下面简单演示将txt中的数据导入到kafka topic中,再把topic中的数据导入到另一个文件。

  • 在kafka目录下新建一个文件:test.txt,作为数据来源,在test.txt写几行数据。
  • 查看connect-standalone.properties、connect-file-sink.properties、connect-file-source.properties这几个配置文件,分别配置了连接的节点、输入文件、输出文件、topic等信息,注意bootstrap.servers、topic、file、offset.storage.file.filename等配置信息。
  • 启动zookeeper和kafka(注意所启动kafka的server.properties与上述connect-standalone.properties中的配置对应)。
  • 通过命令运行Kafka Connect:bin\windows\connect-standalone.bat config\connect-standalone.properties config\connect-file-source.properties config\connect-file-sink.properties(若报ClosedChannelException错误,打开kafka的server.properties中的listeners与advertised.listeners配置)。
  • 运行后,发现connect-test与指定位置处的test.sink.txt中存入了test.txt的数据,继续在test.txt写入数据,connect-test与test.sink.txt中也写入了相应数据。

三、应用

1、生产者

通过下面描述,在maven工程中引入jar包。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
</dependency>

编写一个简单的生产者示例:

Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String,String> producer = new KafkaProducer<String, String>(props);

for (int i=0;i<10;i++){
    producer.send(new ProducerRecord<String, String>("topicTest2",Integer.toString(i), Integer.toString(i)));
}
producer.close();
System.out.println("kafka生产者测试结束");

send方法是异步的,会添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率,而不用去阻塞等待每条消息的响应。send方法返回一个Future<RecordMetadata>,RecordMetadata包含消息发送的分区、指定的offset、消息时间戳等信息。如果调用返回的Future的get方法,那么将会使发送阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。

也可以使用带有回调参数的send重载方法:send(ProducerRecord, Callback)。发送到同一分区的消息,先发送的消息的Callback一定比后发送的先执行。

producer.send(new ProducerRecord<String, String>("topicTest2", Integer.toString(i)+"b", Integer.toString(i)),
    new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if(e != null)
                e.printStackTrace();
            System.out.println("The offset of the record we just sent is: " + recordMetadata.offset());
        }
    });

相关异常:

  • InterruptException – 如果线程在阻塞中断
  • SerializationException – 如果key或value不是给定有效配置的serializers
  • TimeoutException – 如果获取元数据或消息分配内存话费的时间超过max.block.ms
  • KafkaException – Kafka有关的错误

2、消费者

kafka的消费者不是线程安全的。

在消费者中与offset相关的两个概念需要区分:

  • 消费者的位置:给出了下一条待消费消息的offset。它比消费者在该分区中看到的最大偏移量要大一个,在每次消费者调用poll(long)接收消息时自动增长。
  • 已提交的位置:已安全保存的最后偏移量,如果进程失败或重新启动时,消费者将恢复到这个偏移量。消费者可以选择定期自动提交偏移量,也可以选择通过调用commit API来手动的控制(如:commitSync 和 commitAsync)(如此设计,让消费者掌握消息消费的控制权,消费者可以控制一条消息什么时候才被认为是已被消费的)。

a)、消费者故障

Consumer调用poll(long)时,该消费者将自动加入到消费者组中。只要持续的调用poll,消费者将一直保持可用,并继续从分配的分区中接收消息。此外,消费者会向服务器定时发送心跳。 如果消费者崩溃或无法在session.timeout.ms配置的时间内发送心跳,则消费者将被视为死亡,分配给该消费者的分区将被重新分配给同一消费者组的其他消费者。通过前面的原理分析可知,一般情况下分区将会按照负载均衡原则自动分配给一个消费者,开发人员也可以通过使用assign(Collection)手动分配指定分区,如果使用手动分配,那么自动动态分区分配和协调消费者组将失效。

还有一种可能,消费可能遇到“活锁”的情况,它是在持续的发送心跳,但是没有处理消息。为了预防消费者在这种情况下一直持有分区,可以使用max.poll.interval.ms活跃检测机制。 配置该项后,如果调用poll的频率过小,则该消费者将主动地离开组,以便其他消费者接管该分区。 发生这种情况时,我们会看到offset提交失败(调用commitSync()引发的CommitFailedException)。这是一种安全机制,保障只有活动成员能够提交offset。max.poll.interval.ms的大小需要开发者做决策,适当增大可以让消费者有时间处理返回的消息(poll会返回一批消息),过大则会延迟组内的重新平衡。

b)、自动提交offset场景示例

Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "testConsumerGroupOne");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("topicTest2","topicLxwTest"));//订阅了两个主题
while(true){
    ConsumerRecords<String,String> records= consumer.poll(100);
    for(ConsumerRecord<String,String> record : records){
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

设置enable.auto.commit自动提交偏移量,设置auto.commit.interval.ms控制自动提交的频率。

集群是通过配置bootstrap.servers指定一个或多个broker。不用指定全部的broker,它将自动发现集群中的其余的borker(最好指定多个,万一有服务器故障)。

c)、手动提交offset场景示例

我们也可以手动控制offset提交,可以让消费者对消息进行一定的逻辑处理,未处理完成前,这个消息就不应该认为是已经消费的,直到完成了整个处理,再去手动提交offset。

Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "testConsumerGroupOne");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("topicTest2","topicLxwTest"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
    }
    if (buffer.size() >= minBatchSize) {
        //将buffer的数据进行提交或处理
        System.out.println("提交数据");
        consumer.commitSync();//手动提offset
        buffer.clear();
    }
}

上面的例子使用commitSync表示所有收到的消息”已提交”,我们也可以进行更精细的控制:处理完每个分区的消息后,进行该分区偏移量的提交,并且制定一个明确的偏移量。需要注意的是,已提交的offset应始终是程序将读取的下一条消息的offset,因此在程序中对得到的目前的offset进行了+1的处理。

ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    for (ConsumerRecord<String, String> record : partitionRecords) {
        System.out.println(record.offset() + ": " + record.value());
    }
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}

d)、订阅指定的分区

上面例子中,consumer订阅topic,由kafka自动动态分配该topic的一个或一个以上分区给该消费者,我们也可以通过assign方式直接手动指定topic分区。手动分配分区(即assgin)和动态分配分区的订阅topic模式(即subcribe)不能混合使用,一旦手动分配分区,分区的设置就只能通过调用assign修改。因为手动分配不会进行分组协调,因此消费者故障不会引发分区重新平衡。每一个消费者是独立工作的(即使和其他的消费者共享GroupId)。

这种模式的使用场景有可能为:该消费者进程与所订阅的topic的某一个分区在一台系统上,或者“距离”比较近,则可以手动指定分区,而不是让kafka自动分配一个“距离”可能非常远的分区。

String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

e)、手动控制消费者位置

大多数情况下,消费者只是简单的从头到尾的消费消息。但是kafka也支持手动控制消费的位置,可以消费之前的消息也可以跳过最近的消息。可以使用seek(TopicPartition, long)方法指定新的消费位置,也可使用seekToBeginning(Collection) 和 seekToEnd(Collection)方法查找服务器保留的最早和最新的offset。

f)、动态流量控制

若果消费者被分配了多个分区,在某些情况下,消费者需要首先消费一些指定的分区,当指定的分区有少量或者已经没有可消费的数据时,则开始消费其他分区。例如处理器从2个分区获取消息并把这两个分区的消息进行合并,当其中一个长时间落后另一个,则暂停消费快的那一个,以便落后的赶上来。可以使用pause(Collection)和resume(Collection)来暂停消费指定的分区、重新开始消费指定的分区。

g)、批量消费

可以发现,poll拉取的消息是一批消息而不是一个消息,拉取消息数目和等待时间(等待一定时间内消息数目不够任然拉取消息)可以通过消费者配置项配置。批量消费可以让服务器端减少IO操作,提高效率。

3、流处理

通过下面描述,在maven工程中引入jar包。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.1.0</version>
</dependency>

Kafka Streams从一个或多个输入topic进行连续的计算并输出到0或多个外部topic中。KafkaStreams类管理Kafka Streams实例的生命周期,在内部,KafkaStreams实例包含一个正常的KafkaProducer和KafkaConsumer实例,用于读取和写入。可以通过TopologyBuilder类定义一个计算逻辑处理器,或者也可以通过提供的高级别KStream DSL来定义转换的KStreamBuilder。

Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsConfig config = new StreamsConfig(props);

KStreamBuilder builder = new KStreamBuilder();
builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

四、配置

kafka的配置包括Broker、Topic、Producer、Customer、Streams、Connect、AdminClient等的配置,内容较多,详见

消息中间件学习笔记(三)——kafka配置

五、操作


  • 添加topic

    (数据发布到不存在的topic时,系统会自动创建该topic):

    bin\windows\kafka-topics.bat –zookeeper zk_host:port/chroot –create –topic my_topic_name –partitions 20 –replication-factor 3 –config x=y

    partitions:分区数;replication-factor:备份数(如若设置为3,最多允许两个服务器挂掉而不丢失数据)

  • 修改topic配置:


    bin\windows\kafka-topics.bat –zookeeper zk_host:port/chroot –alter –topic my_topic_name –partitions 40

  • 增加topic配置项:


    bin\windows\kafka-configs.bat –zookeeper zk_host:port/chroot –entity-type topics –entity-name my_topic_name –alter –add-config x=y

  • 删除topic配置项:


    bin\windows\kafka-configs.bat –zookeeper zk_host:port/chroot –entity-type topics –entity-name my_topic_name –alter –delete-config x

  • 删除topic

    (需要服务器配置delete.topic.enable=true)




    bin\windows\kafka-topics.bat –zookeeper zk_host:port/chroot –delete –topic my_topic_name

  • 查看所有topic:


    bin\windows\kafka-topics.bat –zookeeper 127.0.0.1:2181 –list

  • 故障后恢复平衡leader:


    Kafka有一个首选副本的概念,如果一个分区的副本列表是1,5,9,节点1将优先被选为leader,因为它较早存在于副本中。当节点1所在服务器关机或故障后,以该服务器作为leader的分区将重新选择节点,这意味着在默认情况下,当这个服务器重启之后,它的所有分区都将仅作为follower,不再用于客户端的读写操作。为了避免这种不平衡,可以通过运行以下命令让Kafka集群尝试恢复已经修复正常的服务器的原分区leader地位

    bin\windows\kafka-preferred-replica-election.bat –zookeeper zk_host:port/chroot

    也可以通过配置项auto.leader.rebalance.enable=true让系统自动执行

  • 列出消费者组中所有消费者:


    bin\windows\kafka-consumer-groups.bat –bootstrap-server localhost:9092 –list

  • 查看consumer位置

    (查看消费者群中所有消费者目前的位置):

    bin\windows\kafka-consumer-groups.bat –bootstrap-server localhost:9092 –describe –group my-group

六、安全

kafka目前支持以下安全措施(安全是可选的,kafka支持非安全集群,以及经过身份验证,未认证,加密和未加密客户端的组合):

  • 使用SSL或SASL验证来自客户端(producers和consumers)、其他brokers和工具的连接。Kafka支持以下SASL机制:

    SASL/GSSAPI (Kerberos) – 从版本0.9.0.0开始

    SASL/PLAIN – 从版本0.10.0.0开始

    SASL/SCRAM-SHA-256 和 SASL/SCRAM-SHA-512 – 从版本0.10.2.0开始

    SASL/OAUTHBEARER – 从2.0版本开始
  • 验证从brokers 到 ZooKeeper的连接
  • 对brokers与clients之间、brokers之间或brokers与工具之间使用SSL传输对数据加密(注意,启用SSL时性能会下降,其大小取决于CPU类型和JVM实现)。
  • 授权客户端的读写操作
  • 验证是可插拔的,并且支持与外部验证服务的集成

1、使用SSL加密和认证

kafka允许客户端通过SSL方式连接(SSL的介绍见

小知识——SSL介绍

),步骤大致为:生成密钥和证书——建立自己的CA——签名证书——broker配置——客户端配置。生成密钥及证书使用java的keytool,生成CA使用openssl,下面仅根据做简要流程展示(笔者未验证是否可行),若以后实际用到,将另外开文章详细介绍keytool及openssl的使用方法。

  1. 为每个broker生成密钥及证书:

    keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey

    可以通过以下命令验证生成的证书:

    keytool -list -v -keystore server.keystore.jks
  2. 通过openssl生成自己的CA:

    openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
  3. 将生成的CA添加到“客户端信任存储区”,这样客户端才能详细这个CA:

    keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

    注意:若果将Kafka broker配置成需要客户端授权, 可以在Kafka broker配置中设置setting ssl.client.auth属性为“requested”或者“required”。然后同样也给 Kafka broker 提供一个信任存储区,并且这个信任存储区应该拥有所有给客户端密钥签名的 CA 证书。

    keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert

    客户端的信任存储区(truststore)保存所有客户端需要信任的证书,导入一个证书到一台机器的信任存储区,意味着这台机器将信任所有被这个证书签名的证书。当在大型的 Kafka 集群中部署 SSL 时,这一特征特别有用,你可以用一个单一的 CA 给集群中的所有证书签名,而且可以让所有机器共享信任该 CA 的信任存储区,这样,每个机器都可以认证除自己之外的全部机器。
  4. 用生成的CA为第一步生成的所有证书签名:

    1. ) 首先从密钥库中导出证书:

      keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
    2. ) 然后用CA签名证书:

      openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234
    3. ) 然后把CA 的证书和被签名的证书一起导入密钥库中:

      keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert

      keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
  5. 配置broker:

    1. ) 配置listeners(若broker 之间的通信没有启用 SSL,listeners中PLAINTEXT和SSL两个协议都需要提供端口信息):

      listeners=PLAINTEXT://host.name:port,SSL://host.name:port
    2. ) 配置以下信息:

      ssl.keystore.location=/var/private/ssl/server.keystore.jks

      ssl.keystore.password=test1234

      ssl.key.password=test1234

      ssl.truststore.location=/var/private/ssl/server.truststore.jks

      ssl.truststore.password=test1234
    3. ) 可选配置:

      ssl.client.auth=none(“required”=>客户端授权是必须的,“requested”=>客户端授权是需要的,但是没有证书依然可以连接。因为“requested”会造成错误的安全感,而且在客户端配置错误的情况下依然可以连接成功,所以不鼓励使用)

      ssl.cipher.suites(指定的密码套件,由授权、加密、MAC和密钥交换算法组成,用于给使用TLS或者SSL协议的网络协商安全设置)

      ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1(列出你将要从客户端接收的SSL协议。注意,SSL已经废弃,支持TLS,并且不建议在生产环境中使用SSL)

      ssl.keystore.type=JKS

      ssl.truststore.type=JKS

      ssl.secure.random.implementation=SHA1PRNG
    4. ) 若想broker间的通信启用 SSL(默认为PLAINTEXT),配置:security.inter.broker.protocol=SSL
    5. ) 用以下命令可以快速验证服务器的 keystore 和 truststore 是否设置正确(TLSv1需要在ssl.enabled.protocols中列出):

      openssl s_client -debug -connect localhost:9093 -tls1
  6. 配置客户端:

    1. ) 若broker不需要客户端授权,下面是最小配置:

      security.protocol=SSL

      ssl.truststore.location=/var/private/ssl/client.truststore.jks

      ssl.truststore.password=test1234
    2. ) 若要客户端授权, 必须像第一步一样创建一个 keystore,同时配置下面这些信息:

      ssl.keystore.location=/var/private/ssl/client.keystore.jks

      ssl.keystore.password=test1234

      ssl.key.password=test1234
    3. ) 若broker设置其他可选配置,客户端也要做相应配置。

2、SASL验证

SASL全称Simple Authentication and Security Layer,是一种用来扩充C/S模式验证能力的机制。下面仅对kafka的SASL实现流程做简要介绍,具体概念及详细使用方法将在以后用到时再做学习。

kafka使用JAAS(java的验证和授权api)来完成sasl(需要先大致了解JAAS的原理),使用前先配置kafka的brokers和clients的jaas。在jaas的配置文件中有一些节点名称,其中,

KafkaServer

是broker在jaas文件中的默认节点名称,

Client

是用于认证SASL与zookeeper之间连接的节点,

KafkaClient

是kafka客户端节点。配置时,建立几个XXX.jaas.conf文件(如kafka_server.jaas.conf等),运行kafka server、kafka client时将配置文件作为参数传递给JVM(如-Djava.security.auth.login.config=/etc/kafka/kafka_server.jaas.conf)。客户端在配置jaas时有个例外,可以不按照上面所说的配置文件配置后指定为参数的方式进行配置,而是通过客户端属性sasl.jaas.config进行配置。不同的SASL机制,配置文件也略有不同。

kafka支持以下四种SASL机制(一个broker节点上可以同时使用多种机制):

  • GSSAPI (Kerberos)
  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512

kafka使用SASL验证时,需要在server.properties中的listeners添加SASL端口(SASL_PLAINTEXT 或者 SASL_SSL)如:listeners=SASL_PLAINTEXT://host.name:port。如果只配置了这一个端口,需要确保broker之间的通信中使用了相同的协议:security.inter.broker.protocol=SASL_PLAINTEXT(or SASL_SSL)。

a)、使用 SASL/Kerberos 认证

需要用到Kerberos 认证协议服务器、身份验证证书,关键文件有:krb5.conf、xxxx.keytab,关于kerberos的概念及使用,以后实际用到时再做学习。

首先配置broker,建立一个kafka_server.jaas.conf的文件,文件内容示例如下:

KafkaServer {


com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

storeKey=true

keyTab=”/etc/security/keytabs/kafka_server.keytab”

principal=”kafka/kafka1.hostname.com@EXAMPLE.COM”;

useTicketCache=false

};

Client {


com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

storeKey=true

keyTab=”/etc/security/keytabs/kafka_server.keytab”

principal=”kafka/kafka1.hostname.com@EXAMPLE.COM”;

useTicketCache=false

};

在server.properties中确保SASL端口和安全机制配置正确,同时确保配置和Kafka broker证书中名称相匹配(上面示例证书中名称为kafka/kafka1.hostname.com@EXAMPLE.COM,所以sasl.kerberos.service.name为kafka):

listeners=SASL_PLAINTEXT://host.name:port

security.inter.broker.protocol=SASL_PLAINTEXT

sasl.mechanism.inter.broker.protocol=GSSAPI

sasl.enabled.mechanisms=GSSAPI

sasl.kerberos.service.name=kafka

将JAAS和krb5(可选)的文件位置作为JVM参数传递给broker:

-Djava.security.krb5.conf=/etc/kafka/krb5.conf

-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf

下面配置客户端,同样建立一个kafka_client.jaas.conf配置文件,将JAAS和krb5(可选)的文件位置作为JVM参数传递给客户端所在服务器,同时确保客户端属性配置正确:

KafkaClient {


com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

storeKey=true

keyTab=”/etc/security/keytabs/kafka_server.keytab”

principal=”kafka/kafka1.hostname.com@EXAMPLE.COM”;

useTicketCache=false

};

Client {


com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

storeKey=true

keyTab=”/etc/security/keytabs/kafka_server.keytab”

principal=”kafka/kafka1.hostname.com@EXAMPLE.COM”;

useTicketCache=false

};

security.protocol=SASL_PLAINTEXT

sasl.mechanism=GSSAPI

sasl.kerberos.service.name=kafka

b)、使用SASL/PLAIN认证

SASL/PLAIN是一种简单的username/password认证机制,通常与TLS加密一起使用。不需要Kerberos,同样需要设置JAAS。broker和client设置示例如下:

KafkaServer {


org.apache.kafka.common.security.plain.PlainLoginModule required

username=”admin”

password=”admin-secret”

user_admin=”admin-secret”

user_alice=”alice-secret”;

};

listeners=SASL_SSL://host.name:port

security.inter.broker.protocol=SASL_SSL

sasl.mechanism.inter.broker.protocol=PLAIN

sasl.enabled.mechanisms=PLAIN

KafkaClient {


org.apache.kafka.common.security.plain.PlainLoginModule required

username=”alice”

password=”alice-secret”;

};

security.protocol=SASL_SSL

sasl.mechanism=PLAIN

七、知识点补充

1、配额

从0.9开始,kafka可以对生产、消费客户端进行配额,通过配额来控制客户端使用broker资源。配额的原因是:有些生产/消费客户端可能生成/消费大量数据,从而垄断相关broker的资源,造成网络饱和,对其他客户端和broker本身造成DOS攻击,通过配额可以避免这个问题,这种问题才大型多节点集群中尤为重要。

八、后记

以上是关于kafka的入门学习笔记,还有kafka安全方面的更多应用、connector使用、stream使用等内容,请参考下面的参考链接部分,以后实际用到时再做补充。



参考链接:


http://kafka.apachecn.org



https://www.jianshu.com/p/d3e963ff8b70



http://www.orchome.com/kafka/index



版权声明:本文为lvxiaowei777原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。