2.1客户端开发
一个正常的生产逻辑需要具备以下几个步骤:
(1)配置生产者客户端参数以及创建相应的生产者实例。
(2)构建待发送的消息。
(3)发送消息。
(4)关闭生产者实例。
生产者客户端示例代码
public class KafkaProducerAnalysis {
//配置客户端参数
public static final String brokerList = ” localhost:9092 ” ;
public static final StrIng topic = ” topic-demo ”;
public static Properties intConfig() {
Properties props= new Properties() ;
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) ;
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo") ;
return props;
}
//主函数 创建生产者 发送消息
public static vo main (String[] args) {
//①获取配置信息
Properties props = intConfig();
//②创建kafka生产者
KafkaProducer<String , String> producer= new KafkaProducer<>(props);
//③构建消息对象
ProducerRecord<String, String> record = new ProducerRecord<> (topic,"hello , Kafka !” ) ;
//④发送消息
try {
producer.send(record);//发后即忘
}catch(Exception e) {
e.printStackTrace();//异常处理:实际开发中 需要重新处理异常,不只是简单的打印
}
}
}
2.1.1 必要的参数配置
在创建生产者实例之前有三个参数是必须配置的:bootstrap.servers、key serializer 和 value serializer
① bootstrap.servers :
该参数用来指定生产者客户端连接 Kafka 集群所需的 broker地址清单,
具体的内容格式为 host1:port1,host2:port2 ,可以设置一个或多个地址,此参数的默认值为“”。
并不需要所有的broker地址,建议写两个以上,保证其中一个宕机,可以连接另一个。
② key serializer 和 value serializer:
broker 端接收的消息必须以字节数组(byte[])的形式存在。
在发broker 前需要将消息中对应的 key 和 value 做相应的序列化操作来转换成字节数组。
这两个参数分别用来指定 key 和value 列化操作的序列器,这两个参数无默认值。必须填写序列化器的定名
KafkaProducer参数众多,我们可以直接使用客户端中的org.apache.kafka.clients.producer.ProducerConfig类
每个参数在 ProducerConfig 类中都有对应的名称,避免参数全类名书写错误
2.1.2 消息的发送
创建完生产者实例后,就需要构建消息,即创建ProduceRecord对象,对应的构造方法:
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, Long timestamp, K key , V value)
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, K key , V value)
public ProducerRecord(String topic , K key, V value)
public ProducerRecord(String topic, V value)
发送消息的三种模式
1.发后即忘
只管往Kafka发送消息,不关系消息是否正确到达。 发送性能最高,可靠性最差
try {
producer.send(record);//发后即忘
}catch(Exception e){
e.printStackTrace();
}
2.同步发送sync
同步发送消息可靠性最高,性能最差,要么发送成功,要么失败。需要阻塞等待消息发送完再发送下一条。
KafkaProducer的send()方法 返回值不是void类型 而是Future<RecordMetadata>
要实现同步发送,可以利用返回的Future对象实现,在执行 send()方法之后直接链式调用get()方法
阻塞等待Kafka的响应,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交给由外层逻辑处理。
try {
producer.send(record).get;//同步
}catch(ExecutionException | InterruptedException e){
e.printStackTrace();
}
也可以在send()方法后 不直接链式调用get()方法,如下
try {
Future<RecordMetadata> future= producer.send(record) ;
RecordMetadata metadata =future.get();
System out.println(metadata top () + ” - " +metadata.partition() + ”: ” + metadata . offset() );
}catch((ExecutionException | InterruptedException e){
e.printStackTrace();
}
这样可以获取一个 RecordMetadata 对象,在 RecordMetadata 对象里包含了消息的一些元数据信息,
比如当前消息的主题、分区号、分区中的偏移量( offset 〕、时间戳等。
如果在应用代码中需要这些信息,则可以使用这个方式,如果不需要,则直接链式调用get()的方式更省事。
3.异步发送async
一般是在send()方法中指定一个Callback的回调函数,Kafka在返回响应时,
调用该回调函数来实现异步的发送确认。
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata , Excepti on exception){
if (exception ! = null) {
exception.printStackTrace();
} else {
System.out.println(metadata.topic() +”-” +metadata.partition() + ”:” +metadata.offset());
}
}
}
onCompletion()方法的两个参数是互斥的,消息发送成功时,metadata不为null, exception为null;失败相反
2.1.3 序列化
生产者需要用列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka,
消费者需要用反序列器(Deserializer)把从Kafka中收到的字节数组转换成相应的对象。
生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的。
2.1.4 分区器(生产者消息 分区分配策略)
分区器的作用就是为消息分配分区。
1.指定了partition字段
如果消息 ProducerRecord 中指定了partition字段,那么就不需要分区器,partition代表的就是所要发往的分区号。
2.没有指定partition字段
(1)使用Kafka默认分区器
ProducerRecord 中没有指定partition字段,会使用Kafka默认分区器。
默认分区器是 orgapche.kafka.clients.producer.intenals.DefaultPartitioner
它实现了 org.apache.kafka.clients.producer.Partitioner接口,接口中定义了2个方法:
public int partition(String topic , Object key , byte[] keyBytes,
Object value , byte[] valueBytes, Cluster cluster);
public void close();
其中partition()方法定义了分区分配逻辑
①如果key 不为null。对key做哈希(MurmurHash2算法,高运算性能和低配碰撞率),得到的hash值作为分区号
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
②如果key为null。 消息将会以轮询的方式发往主题内的各个可用分区。(可用分区:存在leader副本)
注意:key不为null时,计算得到的分区号是有所分区中的任意一个
key为null时,计算得到的分区号仅为可用分区的中任意一个
DefaultPartitioner源码解读
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
/* 首先通过cluster从元数据中获取topic所有的分区信息 */
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//拿到该topic的分区数
int numPartitions = partitions.size();
//如果消息记录中没有指定key
if (keyBytes == null) {
//则获取一个自增的值
int nextValue = nextValue(topic);
//通过cluster拿到所有可用的分区(可用的分区这里指的是该分区存在首领副本)
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
//如果该topic存在可用的分区
if (availablePartitions.size() > 0) {
//那么将nextValue转成正数之后对可用分区数进行取余
int part = Utils.toPositive(nextValue) % availablePartitions.size();
//然后从可用分区中返回一个分区
return availablePartitions.get(part).partition();
} else { // 如果不存在可用的分区
//那么就从所有不可用的分区中通过取余的方式返回一个不可用的分区
return Utils.toPositive(nextValue) % numPartitions;
}
} else { // 如果消息记录中指定了key
// 则使用该key进行hash操作,然后对所有的分区数进行取余操作,这里的hash算法采用的是murmur2算法,然后再转成正数
//toPositive方法很简单,直接将给定的参数与0X7FFFFFFF进行逻辑与操作。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
//nextValue方法可以理解为是在消息记录中没有指定key的情况下,需要生成一个数用来代替key的hash值
//方法就是最开始先生成一个随机数,之后在这个随机数的基础上每次请求时均进行+1的操作
private int nextValue(String topic) {
//每个topic都对应着一个计数
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) { // 如果是第一次,该topic还没有对应的计数
//那么先生成一个随机数
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
//然后将该随机数与topic对应起来存入map中
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
//之后把这个随机数返回
counter = currentCounter;
}
}
//一旦存入了随机数之后,后续的请求均在该随机数的基础上+1之后进行返回
return counter.getAndIncrement();
}
(2)自定义分区器
使用自定义的分区器,只需同DefaultPartitioner 一样实现Partitioner接口即可。
总结:生产者分区分配策略
1.如果指定了如果消息 ProducerRecord 中指定了partition字段,那么就不需要分区器
2.没有指定partition
(1)默认分区器
①k如果key 不为null。对key做哈希,得到的hash值作为分区号
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
②如果key为null。 消息将会以轮询的方式发往主题内的各个可用分区。
(2)自定义分区器
2.1.5 生产者拦截器
Kafka一共有两种拦截器:生产者拦截器 和 消费者拦截器
生产者拦截器 可以用来在消息发送前做一些准备工作,比如按规则过滤消息、修改消息
也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类需求
生产者拦截器需要自定义实现 org.apache.kafka.clients.producer.Producerlnterceptor接口。
KafkaProducer还可以指定多个拦截器以形成拦截链。按照interceptor.classes 参数配置顺序执行。
2.2 原理分析
本节的内容主要是对Kafka 生产者客户端的内部原理进行分析,
通过了解生产者客户端的整体脉络可以让我们更好地使用它,避免因为一些理解上的偏 而造成使用上的错误。
2.2.1整体架构
2.2.2 元数据的更新
todo
2.2.3 重要的生产者参数
1.ack
用来指定分区中必须有多少副本收到这条消息,之后生产者才会认为这条消息成功写入。
① ack = 1
默认值为1,生产者发送消息后,只要分区的leader副本成功写入消息后,那么它就会收到来自服务端成功响应。
丢数据情况:当leader收到消息,返回响应成功,此时leader挂掉,且是在被其他follower
副本拉取之前leader副本崩溃,重新选举的leader中并有这条消息,数据丢失
②ack = 0
生产者发送消息之后不需要等待任何服务端的响应。
如果在消息从发送到写入 Kafka 的过程中出现某些异常,导致 Kafka 并没有收到这条消息,
那么生产者也无从得知,消息也就丢失了。
③ acks = -1 或者 all 。
生产者在消息发送之后,需要等待ISR 中的所有副本都成功写入消息之
后才能够收到来自服务端的成功响应,此设置可以达到最强的可靠性。
但这并不意味着消息就一定可靠,因ISR中可能只有leader 副本,退化成了 acks=1 的情况
注意 acks 参数配置的值是一个字符串类型,而不是整数类型。配置方式如下:
properties.put(ProducerConfig.ACKS_CONFIG,"0");
2.其他参数见书
申明:本文内容来源于书籍《深入理解kafka核心设计与实践原理》 个人读书笔记仅用于学习,侵权删。
版权声明:本文为lightupworld原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。