消息发送流程
producer发送消息采用的是异步发送的方式。
消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
batch.size:只有数据积累到batch.size之后,sender才会发送数据。
linger.ms:如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据
异步发送API
首先打开IDEA创建maven工程,向pom.xml导入依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
编写代码
a.自定义CustomProducer(不带回调函数)
package com.hhy.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {
public static void main(String[] args) {
//创建生产者的配置对象
Properties properties = new Properties();
//2.给配置对象添加参数
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
// 批次大小 默认16K
properties.put("batch.size", 16384);
// 等待时间
properties.put("linger.ms", 1);
// RecordAccumulator缓冲区大小 默认32M
properties.put("buffer.memory", 33554432);
// key,value序列化
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//3.创建kadka的生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
//4.调用dend方法发送消息
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String,String>("first","hello world" + i));
}
//5.关闭连接
producer.close();
}
}
b.自定义CustomProducer(带回调函数)
当producer收到ack时会进行异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。消息发送失败会自动重试
package com.hhy.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallBack {
public static void main(String[] args) throws InterruptedException {
//创建生产者的配置对象
Properties properties = new Properties();
//2.给配置对象添加参数
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
// 批次大小 默认16K
properties.put("batch.size", 16384);
// 等待时间
properties.put("linger.ms", 1);
// RecordAccumulator缓冲区大小 默认32M
properties.put("buffer.memory", 33554432);
// key,value序列化
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//3.创建kadka的生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
//4.调用dend方法发送消息
for (int i = 0; i < 500; i++) {
producer.send(new ProducerRecord<String, String>("first", "hello world" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
//判断是否发送成功
if (exception == null){
//发送成功打印消息的元数据
System.out.println(metadata);
}else{
//发送失败返回异常
exception.printStackTrace();
}
}
});
//线程睡眠,避免数据发送到同一个分区
Thread.sleep(2);
}
//5.关闭连接
producer.close();
}
}
同步发送API
同步发送后会阻塞线程,直到返回ack.可以通过send方法实现同步发送,需要调用send()方法返回的future对象的get()方法
// 4. 调用send方法,发送消息
for (int i = 0; i < 10; i++) {
// 异步发送 默认
// kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
// 同步发送
kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
}
分区策略
原则
a.指明partition的情况直接作为partition值
b.没指明partition但是有key的情况下,将key的值与topic的partition数进行取模得到partition的值
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
c.没有 partition 值又没有 key 值的情况下, kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,kafka再随机一个分区进行使用
a.代码
//4.调用dend方法发送消息
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String,String>("first",i,"","hello world" + i));
}
b.代码
//4.调用dend方法发送消息
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String,String>("first",i+"","hello world" + i));
}
分区器
a.默认的分区器 DefaultPartitioner
b.自定义分区器 MyPartitioner
package com.hhy.kafka.partitions;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//需求:如果消息中包含hello,发往0号分区,不包含发往1号分区
String log = value.toString();
//判断
if (log.contains("hello")){
return 0;
}
return 1;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs){}
}
通过修改CustomConsumer里的配置信息即可使用自定义分区器(注意要使用自定义分区器全类名)
// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.partitioner.MyPartitioner");