目录
一、kafka基本概念
kafka——一个分布式流处理平台,它作为一个集群,运行在一台或多台服务器上,通过topic对存储的流数据进行分类,每个消息包含一个key、一个value和一个timestamp。它并不是JMS规范的实现,在设计实现上与JMS不同。
1、kafka核心API
-
Producer API
:允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。 -
Consumer API
:允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。 -
Streams API
:允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。 -
Connector API
:允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。
2、topic、partition
对于每一个topic,kafka集群都会维持一个分区日志(一个topic的消息分几个区记录),每个分区都是一个有序且顺序不可变的记录集,分区中每个消息都会分配一个唯一标识id表示顺序(称之为offset—偏移量)。
kafka集群对所有已发布的消息,无论其是否被消费,都将保存直至过期(消息保存期限通过参数配置)。事实上,每一个消费者唯一保存的消息元数据是offset,消费的实际上是消息位置,消费之后,保存的offset发生变化,而消息仍然存在可供其他消费者消费。一般来说,消费者消费消息后会线性增加offset(即顺序消费),不过由于offset由消费者控制,所以消费者可以按照任何顺序来进行消费,比如将offset进行重置以重新处理酒数据,或者跳过最近记录从“现在开始消费”。这些细节说明对于kafka来说消费者是非常廉价的,消费者增加或减少对集群或者其他消费者没有太大影响。
每个partition(分区)都会在已配置的kafka集群服务器上进行备份(即对于一个分区P0,每个服务器都会有P0),以提高容错性。每个分区会选择一台服务器作为leader,零台或多台服务器作为followers,leader负责处理对该分区的所有读写操作,followers只需要被动同步leader上该分区的数据。当leader服务器挂掉后,followers会自动选出一台作为新leader。对于拥有N个副本的topic(一个leader,N-1个followers),最多容忍N-1个服务器故障。每台服务器都会成为某些分区的leader和某些分区的follower,实现负载平衡,避免对分区的读写操作一直集中在某几台服务器上。
总结partition的优点:第一,利于扩展。单个分区受限于服务器容量限制,但是一个topic可以有多个分区,因此通过增加分区数量,可以处理无限量的数据。第二,可以作为并行处理的单元。
3、生产者、消费者
a)、生产者
生产者将数据发布到所选topic中。生产者负责选择将数据发布到topic哪个分区中,可以使用循环选择方式实现简单的负载均衡,也可以根据某种算法按照权重选择,开发者自己设计和选择算法。
kafka保证发送到同一topic的同一分区的消息的偏移量按照顺序设置,即在同一分区,早发送的消息的偏移量一定比晚发送的消息的偏移量小。
b)、消费者
消费者使用一个消费组名称进行标识,即kafka中的消费者是一组消费者而不是单个进程,发布到topic的消息将分配给订阅该topic的所有消费组,一个消费组可能有多个消费者实例(便于扩展与容错),对于一个消息和一个消费组来说,每次将仅有一个消费者实例进行消费,同时实现负载平衡。kafka会将分区划分到某一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。维护消费组中的消费关系由Kafka协议动态处理,如果新的实例加入组,新实例将从组中其他成员处接管一些 partition分区,如果一个实例消失,该实例拥有的分区将被分发到剩余的实例。
kafka消费时,只保证分区内的消息有序,而不保证一个topic内不同分区的消息有序,若要使topic上的所有消息有序,可让该topic只拥有一个分区来实现,这也意味着订阅该topic的消费者组只有一个消费者实例。
kafka相比于其他消息中间件有更严格的顺序保证。传统消息中间件在消费队列中消息时,虽然服务器会按照顺序输出消息,但是消息时异步地传递给消费者,在并行消费的情况下,这些消息会无序的到达不同的消费者,即并行消费情况下消息的顺序发生丢失。虽然可以使用“唯一消费者”(即只让一个进程从队列中消费)达到顺序消费的目的,但是这样就意味着不能并行处理数据。在kafka的设计中,topic的分区是一个并行的概念。通过将分区分配给消费者组中的一个消费者实例,确保该消费者实例是该消费者组中分区的唯一读者,达到顺序消费的目的;同时,众多分区保证了消费者实例间的负载均衡,达到了并行消费的目的。需注意,消费者组中的消费者实例数量不能超过分区的数量,否则多出的消费者实例一直处于空等待,不会收到消息。
4、kafka的应用场景
kafka可以作为一个消息系统、存储系统。
kafka也可以用于流处理。流处理持续获取输入topic的消息,进行处理后,写入到输出topic。例如,一个零售APP,接收销售和出货的输入流,统计数量或调整价格后输出。可以直接使用producer和consumer API进行简单的处理。对于复杂的转换,Kafka提供了更强大的Streams API,可构建聚合计算或连接流到一起的复杂应用程序。
二、kafka安装和启动
1、安装
- 安装jdk
-
下载kafka后解压(高版本内置zookeeper,低版本需要先下载zookeeper),注意不要下载成源码(下载地址:
http://kafka.apache.org/downloads.html
)
解压kafka文件后,bin/windows下的bat文件是程序运行时的脚本文件,bin下的其他.sh文件是linux需要的shell脚本,在windows环境下运行时不需要。config下是配置文件,启动前需要做一定的修改。
2、启动
- 首先启动zookeeper,编辑zookeeper.properties,自行设置保存数据目录dataDir,端口默认2181。
- 启动zookeeper:打开cmd窗口,切换到kafka目录下,命令行输入bin\windows\zookeeper-server-start.bat config\zookeeper.properties运行(笔者在测试时,发现jdk所在路径有空格会报无法加载主类的错误,所以jdk及kafka所在目录最好不要有空格)。
- 启动kafka:编辑server.properties,设置log.dirs目录,通过bin\windows\kafka-server-start.bat config\server.properties命令启动kafka。(启动时,若jdk版本不对,会报UnsupportedClassVersionError错误)。
- 创建主题:通过命令bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topicLxwTest创建一个topic。
- 查看主题:通过命令bin\windows\kafka-topics.bat –list –zookeeper localhost:2181查看topic。
- 启动生产者:bin\windows\kafka-console-producer.bat –broker-list localhost:9092 –topic topicLxwTest
- 启动消费者:bin\windows\kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic topicLxwTest –from-beginning(注意:kafka高版本中,启动消费者命令中zookeeper项变为了bootstrap-server)
3、设置多个broker
目前我们只是运行了一个broker,下面多运行几个broker。
- 复制两份config下的server.properties文件,命名为server-1.properties、server-2.properties
- 修改两个文件中的broker.id、listeners、log.dir,如设置server-1.的broker.id为1,listeners为PLAINTEXT://:9093
- 之前已运行了zookeeper和一个节点,通过命令再运行两个新节点:bin\windows\kafka-server-start.bat config\server-1.properties
- 新建一个topic,备份设置为3:bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 3 –partitions 1 –topic topic1
- 通过describe topics命令查看topic描述:bin\windows\kafka-topics.bat –describe –zookeeper localhost:2181 –topic topic1
-
通过测试发现,关闭掉作为leader的broker后,消息并没有丢失,因为kafka会将消息在每个broker进行备份,具体测试过程见
http://www.orchome.com/6
4、利用Kafka Connect进行导入/导出数据
以上是通过命令行进行消息的生产和消费,实际也可能使用其他数据源,或者将kafka中的数据导入到其他源中,这时可以使用Kafka Connect。下面简单演示将txt中的数据导入到kafka topic中,再把topic中的数据导入到另一个文件。
- 在kafka目录下新建一个文件:test.txt,作为数据来源,在test.txt写几行数据。
- 查看connect-standalone.properties、connect-file-sink.properties、connect-file-source.properties这几个配置文件,分别配置了连接的节点、输入文件、输出文件、topic等信息,注意bootstrap.servers、topic、file、offset.storage.file.filename等配置信息。
- 启动zookeeper和kafka(注意所启动kafka的server.properties与上述connect-standalone.properties中的配置对应)。
- 通过命令运行Kafka Connect:bin\windows\connect-standalone.bat config\connect-standalone.properties config\connect-file-source.properties config\connect-file-sink.properties(若报ClosedChannelException错误,打开kafka的server.properties中的listeners与advertised.listeners配置)。
- 运行后,发现connect-test与指定位置处的test.sink.txt中存入了test.txt的数据,继续在test.txt写入数据,connect-test与test.sink.txt中也写入了相应数据。
三、应用
1、生产者
通过下面描述,在maven工程中引入jar包。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
编写一个简单的生产者示例:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> producer = new KafkaProducer<String, String>(props);
for (int i=0;i<10;i++){
producer.send(new ProducerRecord<String, String>("topicTest2",Integer.toString(i), Integer.toString(i)));
}
producer.close();
System.out.println("kafka生产者测试结束");
send方法是异步的,会添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率,而不用去阻塞等待每条消息的响应。send方法返回一个Future<RecordMetadata>,RecordMetadata包含消息发送的分区、指定的offset、消息时间戳等信息。如果调用返回的Future的get方法,那么将会使发送阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。
也可以使用带有回调参数的send重载方法:send(ProducerRecord, Callback)。发送到同一分区的消息,先发送的消息的Callback一定比后发送的先执行。
producer.send(new ProducerRecord<String, String>("topicTest2", Integer.toString(i)+"b", Integer.toString(i)),
new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + recordMetadata.offset());
}
});
相关异常:
- InterruptException – 如果线程在阻塞中断
- SerializationException – 如果key或value不是给定有效配置的serializers
- TimeoutException – 如果获取元数据或消息分配内存话费的时间超过max.block.ms
- KafkaException – Kafka有关的错误
2、消费者
kafka的消费者不是线程安全的。
在消费者中与offset相关的两个概念需要区分:
- 消费者的位置:给出了下一条待消费消息的offset。它比消费者在该分区中看到的最大偏移量要大一个,在每次消费者调用poll(long)接收消息时自动增长。
- 已提交的位置:已安全保存的最后偏移量,如果进程失败或重新启动时,消费者将恢复到这个偏移量。消费者可以选择定期自动提交偏移量,也可以选择通过调用commit API来手动的控制(如:commitSync 和 commitAsync)(如此设计,让消费者掌握消息消费的控制权,消费者可以控制一条消息什么时候才被认为是已被消费的)。
a)、消费者故障
Consumer调用poll(long)时,该消费者将自动加入到消费者组中。只要持续的调用poll,消费者将一直保持可用,并继续从分配的分区中接收消息。此外,消费者会向服务器定时发送心跳。 如果消费者崩溃或无法在session.timeout.ms配置的时间内发送心跳,则消费者将被视为死亡,分配给该消费者的分区将被重新分配给同一消费者组的其他消费者。通过前面的原理分析可知,一般情况下分区将会按照负载均衡原则自动分配给一个消费者,开发人员也可以通过使用assign(Collection)手动分配指定分区,如果使用手动分配,那么自动动态分区分配和协调消费者组将失效。
还有一种可能,消费可能遇到“活锁”的情况,它是在持续的发送心跳,但是没有处理消息。为了预防消费者在这种情况下一直持有分区,可以使用max.poll.interval.ms活跃检测机制。 配置该项后,如果调用poll的频率过小,则该消费者将主动地离开组,以便其他消费者接管该分区。 发生这种情况时,我们会看到offset提交失败(调用commitSync()引发的CommitFailedException)。这是一种安全机制,保障只有活动成员能够提交offset。max.poll.interval.ms的大小需要开发者做决策,适当增大可以让消费者有时间处理返回的消息(poll会返回一批消息),过大则会延迟组内的重新平衡。
b)、自动提交offset场景示例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "testConsumerGroupOne");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("topicTest2","topicLxwTest"));//订阅了两个主题
while(true){
ConsumerRecords<String,String> records= consumer.poll(100);
for(ConsumerRecord<String,String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
设置enable.auto.commit自动提交偏移量,设置auto.commit.interval.ms控制自动提交的频率。
集群是通过配置bootstrap.servers指定一个或多个broker。不用指定全部的broker,它将自动发现集群中的其余的borker(最好指定多个,万一有服务器故障)。
c)、手动提交offset场景示例
我们也可以手动控制offset提交,可以让消费者对消息进行一定的逻辑处理,未处理完成前,这个消息就不应该认为是已经消费的,直到完成了整个处理,再去手动提交offset。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "testConsumerGroupOne");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("topicTest2","topicLxwTest"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
//将buffer的数据进行提交或处理
System.out.println("提交数据");
consumer.commitSync();//手动提offset
buffer.clear();
}
}
上面的例子使用commitSync表示所有收到的消息”已提交”,我们也可以进行更精细的控制:处理完每个分区的消息后,进行该分区偏移量的提交,并且制定一个明确的偏移量。需要注意的是,已提交的offset应始终是程序将读取的下一条消息的offset,因此在程序中对得到的目前的offset进行了+1的处理。
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
d)、订阅指定的分区
上面例子中,consumer订阅topic,由kafka自动动态分配该topic的一个或一个以上分区给该消费者,我们也可以通过assign方式直接手动指定topic分区。手动分配分区(即assgin)和动态分配分区的订阅topic模式(即subcribe)不能混合使用,一旦手动分配分区,分区的设置就只能通过调用assign修改。因为手动分配不会进行分组协调,因此消费者故障不会引发分区重新平衡。每一个消费者是独立工作的(即使和其他的消费者共享GroupId)。
这种模式的使用场景有可能为:该消费者进程与所订阅的topic的某一个分区在一台系统上,或者“距离”比较近,则可以手动指定分区,而不是让kafka自动分配一个“距离”可能非常远的分区。
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
e)、手动控制消费者位置
大多数情况下,消费者只是简单的从头到尾的消费消息。但是kafka也支持手动控制消费的位置,可以消费之前的消息也可以跳过最近的消息。可以使用seek(TopicPartition, long)方法指定新的消费位置,也可使用seekToBeginning(Collection) 和 seekToEnd(Collection)方法查找服务器保留的最早和最新的offset。
f)、动态流量控制
若果消费者被分配了多个分区,在某些情况下,消费者需要首先消费一些指定的分区,当指定的分区有少量或者已经没有可消费的数据时,则开始消费其他分区。例如处理器从2个分区获取消息并把这两个分区的消息进行合并,当其中一个长时间落后另一个,则暂停消费快的那一个,以便落后的赶上来。可以使用pause(Collection)和resume(Collection)来暂停消费指定的分区、重新开始消费指定的分区。
g)、批量消费
可以发现,poll拉取的消息是一批消息而不是一个消息,拉取消息数目和等待时间(等待一定时间内消息数目不够任然拉取消息)可以通过消费者配置项配置。批量消费可以让服务器端减少IO操作,提高效率。
3、流处理
通过下面描述,在maven工程中引入jar包。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.1.0</version>
</dependency>
Kafka Streams从一个或多个输入topic进行连续的计算并输出到0或多个外部topic中。KafkaStreams类管理Kafka Streams实例的生命周期,在内部,KafkaStreams实例包含一个正常的KafkaProducer和KafkaConsumer实例,用于读取和写入。可以通过TopologyBuilder类定义一个计算逻辑处理器,或者也可以通过提供的高级别KStream DSL来定义转换的KStreamBuilder。
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsConfig config = new StreamsConfig(props);
KStreamBuilder builder = new KStreamBuilder();
builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
四、配置
kafka的配置包括Broker、Topic、Producer、Customer、Streams、Connect、AdminClient等的配置,内容较多,详见
消息中间件学习笔记(三)——kafka配置
。
五、操作
-
添加topic
(数据发布到不存在的topic时,系统会自动创建该topic):
bin\windows\kafka-topics.bat –zookeeper zk_host:port/chroot –create –topic my_topic_name –partitions 20 –replication-factor 3 –config x=y
partitions:分区数;replication-factor:备份数(如若设置为3,最多允许两个服务器挂掉而不丢失数据) -
修改topic配置:
bin\windows\kafka-topics.bat –zookeeper zk_host:port/chroot –alter –topic my_topic_name –partitions 40 -
增加topic配置项:
bin\windows\kafka-configs.bat –zookeeper zk_host:port/chroot –entity-type topics –entity-name my_topic_name –alter –add-config x=y -
删除topic配置项:
bin\windows\kafka-configs.bat –zookeeper zk_host:port/chroot –entity-type topics –entity-name my_topic_name –alter –delete-config x -
删除topic
(需要服务器配置delete.topic.enable=true)
:
bin\windows\kafka-topics.bat –zookeeper zk_host:port/chroot –delete –topic my_topic_name -
查看所有topic:
bin\windows\kafka-topics.bat –zookeeper 127.0.0.1:2181 –list -
故障后恢复平衡leader:
Kafka有一个首选副本的概念,如果一个分区的副本列表是1,5,9,节点1将优先被选为leader,因为它较早存在于副本中。当节点1所在服务器关机或故障后,以该服务器作为leader的分区将重新选择节点,这意味着在默认情况下,当这个服务器重启之后,它的所有分区都将仅作为follower,不再用于客户端的读写操作。为了避免这种不平衡,可以通过运行以下命令让Kafka集群尝试恢复已经修复正常的服务器的原分区leader地位
bin\windows\kafka-preferred-replica-election.bat –zookeeper zk_host:port/chroot
也可以通过配置项auto.leader.rebalance.enable=true让系统自动执行 -
列出消费者组中所有消费者:
bin\windows\kafka-consumer-groups.bat –bootstrap-server localhost:9092 –list -
查看consumer位置
(查看消费者群中所有消费者目前的位置):
bin\windows\kafka-consumer-groups.bat –bootstrap-server localhost:9092 –describe –group my-group
六、安全
kafka目前支持以下安全措施(安全是可选的,kafka支持非安全集群,以及经过身份验证,未认证,加密和未加密客户端的组合):
-
使用SSL或SASL验证来自客户端(producers和consumers)、其他brokers和工具的连接。Kafka支持以下SASL机制:
SASL/GSSAPI (Kerberos) – 从版本0.9.0.0开始
SASL/PLAIN – 从版本0.10.0.0开始
SASL/SCRAM-SHA-256 和 SASL/SCRAM-SHA-512 – 从版本0.10.2.0开始
SASL/OAUTHBEARER – 从2.0版本开始 - 验证从brokers 到 ZooKeeper的连接
- 对brokers与clients之间、brokers之间或brokers与工具之间使用SSL传输对数据加密(注意,启用SSL时性能会下降,其大小取决于CPU类型和JVM实现)。
- 授权客户端的读写操作
- 验证是可插拔的,并且支持与外部验证服务的集成
1、使用SSL加密和认证
kafka允许客户端通过SSL方式连接(SSL的介绍见
小知识——SSL介绍
),步骤大致为:生成密钥和证书——建立自己的CA——签名证书——broker配置——客户端配置。生成密钥及证书使用java的keytool,生成CA使用openssl,下面仅根据做简要流程展示(笔者未验证是否可行),若以后实际用到,将另外开文章详细介绍keytool及openssl的使用方法。
-
为每个broker生成密钥及证书:
keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey
可以通过以下命令验证生成的证书:
keytool -list -v -keystore server.keystore.jks -
通过openssl生成自己的CA:
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -
将生成的CA添加到“客户端信任存储区”,这样客户端才能详细这个CA:
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
注意:若果将Kafka broker配置成需要客户端授权, 可以在Kafka broker配置中设置setting ssl.client.auth属性为“requested”或者“required”。然后同样也给 Kafka broker 提供一个信任存储区,并且这个信任存储区应该拥有所有给客户端密钥签名的 CA 证书。
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
客户端的信任存储区(truststore)保存所有客户端需要信任的证书,导入一个证书到一台机器的信任存储区,意味着这台机器将信任所有被这个证书签名的证书。当在大型的 Kafka 集群中部署 SSL 时,这一特征特别有用,你可以用一个单一的 CA 给集群中的所有证书签名,而且可以让所有机器共享信任该 CA 的信任存储区,这样,每个机器都可以认证除自己之外的全部机器。 -
用生成的CA为第一步生成的所有证书签名:
-
) 首先从密钥库中导出证书:
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file -
) 然后用CA签名证书:
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234 -
) 然后把CA 的证书和被签名的证书一起导入密钥库中:
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
-
) 首先从密钥库中导出证书:
-
配置broker:
-
) 配置listeners(若broker 之间的通信没有启用 SSL,listeners中PLAINTEXT和SSL两个协议都需要提供端口信息):
listeners=PLAINTEXT://host.name:port,SSL://host.name:port -
) 配置以下信息:
ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=test1234 -
) 可选配置:
ssl.client.auth=none(“required”=>客户端授权是必须的,“requested”=>客户端授权是需要的,但是没有证书依然可以连接。因为“requested”会造成错误的安全感,而且在客户端配置错误的情况下依然可以连接成功,所以不鼓励使用)
ssl.cipher.suites(指定的密码套件,由授权、加密、MAC和密钥交换算法组成,用于给使用TLS或者SSL协议的网络协商安全设置)
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1(列出你将要从客户端接收的SSL协议。注意,SSL已经废弃,支持TLS,并且不建议在生产环境中使用SSL)
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.secure.random.implementation=SHA1PRNG - ) 若想broker间的通信启用 SSL(默认为PLAINTEXT),配置:security.inter.broker.protocol=SSL
-
) 用以下命令可以快速验证服务器的 keystore 和 truststore 是否设置正确(TLSv1需要在ssl.enabled.protocols中列出):
openssl s_client -debug -connect localhost:9093 -tls1
-
) 配置listeners(若broker 之间的通信没有启用 SSL,listeners中PLAINTEXT和SSL两个协议都需要提供端口信息):
-
配置客户端:
-
) 若broker不需要客户端授权,下面是最小配置:
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=test1234 -
) 若要客户端授权, 必须像第一步一样创建一个 keystore,同时配置下面这些信息:
ssl.keystore.location=/var/private/ssl/client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234 - ) 若broker设置其他可选配置,客户端也要做相应配置。
-
) 若broker不需要客户端授权,下面是最小配置:
2、SASL验证
SASL全称Simple Authentication and Security Layer,是一种用来扩充C/S模式验证能力的机制。下面仅对kafka的SASL实现流程做简要介绍,具体概念及详细使用方法将在以后用到时再做学习。
kafka使用JAAS(java的验证和授权api)来完成sasl(需要先大致了解JAAS的原理),使用前先配置kafka的brokers和clients的jaas。在jaas的配置文件中有一些节点名称,其中,
KafkaServer
是broker在jaas文件中的默认节点名称,
Client
是用于认证SASL与zookeeper之间连接的节点,
KafkaClient
是kafka客户端节点。配置时,建立几个XXX.jaas.conf文件(如kafka_server.jaas.conf等),运行kafka server、kafka client时将配置文件作为参数传递给JVM(如-Djava.security.auth.login.config=/etc/kafka/kafka_server.jaas.conf)。客户端在配置jaas时有个例外,可以不按照上面所说的配置文件配置后指定为参数的方式进行配置,而是通过客户端属性sasl.jaas.config进行配置。不同的SASL机制,配置文件也略有不同。
kafka支持以下四种SASL机制(一个broker节点上可以同时使用多种机制):
- GSSAPI (Kerberos)
- PLAIN
- SCRAM-SHA-256
- SCRAM-SHA-512
kafka使用SASL验证时,需要在server.properties中的listeners添加SASL端口(SASL_PLAINTEXT 或者 SASL_SSL)如:listeners=SASL_PLAINTEXT://host.name:port。如果只配置了这一个端口,需要确保broker之间的通信中使用了相同的协议:security.inter.broker.protocol=SASL_PLAINTEXT(or SASL_SSL)。
a)、使用 SASL/Kerberos 认证
需要用到Kerberos 认证协议服务器、身份验证证书,关键文件有:krb5.conf、xxxx.keytab,关于kerberos的概念及使用,以后实际用到时再做学习。
首先配置broker,建立一个kafka_server.jaas.conf的文件,文件内容示例如下:
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab=”/etc/security/keytabs/kafka_server.keytab”
principal=”kafka/kafka1.hostname.com@EXAMPLE.COM”;
useTicketCache=false
};Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab=”/etc/security/keytabs/kafka_server.keytab”
principal=”kafka/kafka1.hostname.com@EXAMPLE.COM”;
useTicketCache=false
};
在server.properties中确保SASL端口和安全机制配置正确,同时确保配置和Kafka broker证书中名称相匹配(上面示例证书中名称为kafka/kafka1.hostname.com@EXAMPLE.COM,所以sasl.kerberos.service.name为kafka):
listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPIsasl.kerberos.service.name=kafka
将JAAS和krb5(可选)的文件位置作为JVM参数传递给broker:
-Djava.security.krb5.conf=/etc/kafka/krb5.conf
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
下面配置客户端,同样建立一个kafka_client.jaas.conf配置文件,将JAAS和krb5(可选)的文件位置作为JVM参数传递给客户端所在服务器,同时确保客户端属性配置正确:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab=”/etc/security/keytabs/kafka_server.keytab”
principal=”kafka/kafka1.hostname.com@EXAMPLE.COM”;
useTicketCache=false
};Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab=”/etc/security/keytabs/kafka_server.keytab”
principal=”kafka/kafka1.hostname.com@EXAMPLE.COM”;
useTicketCache=false
};
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
b)、使用SASL/PLAIN认证
SASL/PLAIN是一种简单的username/password认证机制,通常与TLS加密一起使用。不需要Kerberos,同样需要设置JAAS。broker和client设置示例如下:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=”admin”
password=”admin-secret”
user_admin=”admin-secret”
user_alice=”alice-secret”;
};
listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=”alice”
password=”alice-secret”;
};
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
七、知识点补充
1、配额
从0.9开始,kafka可以对生产、消费客户端进行配额,通过配额来控制客户端使用broker资源。配额的原因是:有些生产/消费客户端可能生成/消费大量数据,从而垄断相关broker的资源,造成网络饱和,对其他客户端和broker本身造成DOS攻击,通过配额可以避免这个问题,这种问题才大型多节点集群中尤为重要。
八、后记
以上是关于kafka的入门学习笔记,还有kafka安全方面的更多应用、connector使用、stream使用等内容,请参考下面的参考链接部分,以后实际用到时再做补充。
参考链接:
http://kafka.apachecn.org
https://www.jianshu.com/p/d3e963ff8b70
http://www.orchome.com/kafka/index