当消费端是批量接收消息,配置中的自动提交需要关闭,同时要把手动提交打开
kafka:
###########【Kafka集群】###########
bootstrap-servers: 192.168.188.128:9092
producer:
retries: 0 # 重试次数
acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
batch-size: 16384 # 批量大小
buffer-memory: 33554432 # 生产端缓冲区大小
# Kafka提供的序列化和反序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#额外的,没有直接有properties对应的参数,将存放到下面这个Map对象中,一并初始化
# properties:
# #自定义分区器
# partitioner.class: com.example.demo.kafka.CustomizePartitioner
consumer:
group-id: javagroup
enable-auto-commit: false
auto-commit-interval: 1000
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 50
# 批量消费每次最多消费多少条消息
listener:
type: batch
ack-mode: manual_immediate
注意:批量消费和批量发送是无关的,可以一条条发送批量消费也可以批量发送批量消费
当批量发送时:
Producer:
代码如下:
@Data
public class KafkaMessage {
public Integer index;
public String id;
public String value;
}
@GetMapping("BatchSend")
public void sendProducerRecord(){
for(int i=0; i<100; i++) {
KafkaMessage kafkaMessage = new KafkaMessage();
kafkaMessage.setIndex(i);
kafkaMessage.setId(UUID.randomUUID().toString());
kafkaMessage.setValue("producerRecord " + i);
ProducerRecord<String, Object> producerRecord =
new ProducerRecord<>("topic1", "key1", JSONObject.toJSONString(kafkaMessage));
kafkaTemplate.send(producerRecord);
}
}
Consumer:
代码:
@KafkaListener(id = "consumer1",groupId = "javagroup", topics = "topic1")
public void onMessage3(List<ConsumerRecord<?, ?>> records) {
System.out.println(">>>批量消费一次,records.size()="+records.size());
for (ConsumerRecord<?, ?> record : records) {
System.out.println("****"+record.value());
}
}
ApiPost发送请求:
当Producer发送一条消息,Consumer批量消费时:
Producer:
@GetMapping("Producer")
public void sendMessage(String normalMessage){
kafkaTemplate.send("topic1",normalMessage);
}
Consumer不变
ApiPost发送请求:
注意:如果把配置文件里批量消费关掉
但是Consumer还是批量接收时,会报错:
因此批量消费时务必修改配置文件中的批量监听以及手动提交
当配置文件中批量监控以及手动提交开启
而Consumer为简单消费时:
无论批量发送还是简单发送都会报错;
总结:
配置关闭,批量消费报错
配置打开,简单消费报错
因此配置和消费要保持一致
当Producer批量发送,配置文件批量监听手动提交都关闭,而Consumer为单体消费时:
@KafkaListener(topics = {"topic1"})
public void onMessage1(ConsumerRecord<?,?>record){
System.out.println("简单消费: "+record.topic()+"_"+record.partition()+"_"+"_"+record.value());
}
二.回调:
kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,
// 带回掉的Producer .addCallback(SuccessCallback<>,FailureCallback<>)
@GetMapping("CallBackOne")
public void sendMessage2(String callBackMessage){
kafkaTemplate.send("topic1",callBackMessage).addCallback(successCallBack->{
String topic = successCallBack.getRecordMetadata().topic();
int partition = successCallBack.getRecordMetadata().partition();
long offset = successCallBack.getRecordMetadata().offset();
System.out.println("发送消息成功: "+topic+"-"+partition+"-"+offset);
},failureCallBack->{
System.out.println("发送消息失败:"+failureCallBack.getMessage());
});
}
第二种写法:
//带回调的Producer addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {})
@GetMapping("CallBackTwo")
public void sendMessage3(String callBackMessage){
kafkaTemplate.send("topic2",callBackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败: "+ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功: "+"-"+result.getRecordMetadata().offset());
}
});
}
补充:全局回调
注意:这样设置以后,该单例的kafkaTemplate就有了一个回调,注意是全局回调,只要一个实例中设置,全局生效
@Component
@Slf4j
public class KafkaSendResultHandler implements ProducerListener {
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
System.out.println("Message send success : " + producerRecord.toString());
}
@Override
public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
System.out.println("Message send error : " + producerRecord.toString());
}
}
@Autowired(required = false)
private KafkaSendResultHandler producerListener;
//全局回调
@GetMapping("GlobalCallBack")
public void testProducerListen() throws InterruptedException {
kafkaTemplate.setProducerListener(producerListener);
kafkaTemplate.send("topic1", "test producer listen");
Thread.sleep(1000);
}
此时如果调用另一个接口:
此接口虽然未设置回调,但由于是全局回调,所以回调依然生效
三:自定义分区器
我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:
① 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
② 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;
③ patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;
※ 我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区,
public class CustomizePartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 得到 topic 的 partitions 信息
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
System.out.println("partitionInfoList"+partitionInfoList);
System.out.println("topic: "+topic+"-"+"key: "+key.toString()+"-"+"value: "+value.toString());
int size = partitionInfoList.size();
System.out.println("分区有: "+size+"个");
//模拟某客服
if (key.toString().equals("10000")||key.toString().equals("11111")){
//放到最后一个分区中
return size-1;
}
String phoneNum= key.toString();
return phoneNum.substring(0,3).hashCode()%(size-1);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
编写一个Kafka生产者来使用自定义分区器:
/**
* @program:demo
* @description:测试自定义分区器Producer
* @auth0r: SYP
* @create: Creted on 2022-05-16 11:38
**/
public class PartitionerProducer{
private static final String[] PHONE_NUMS=new String[]{"10000", "10000", "11111", "13700000003", "13700000004",
"10000", "15500000006", "11111", "15500000008",
"17600000009", "10000", "17600000011"
};
public static void main(String[] args) throws ExecutionException, InterruptedException {
//定义配置信息
Properties props = new Properties();
//kafka地址,多个地址用逗号分隔
props.put("bootstrap.servers","192.168.188.128:9092");
props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化
props.put("retries", 3);// 请求失败重试的次数
props.put("batch.size", 16384);// batch的大小
props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量
//设置分区器
props.put("partitioner.class","com.example.demo.kafka.CustomizePartitioner");
//设置序列化类,可以写类的全路径
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
int count=0;
int length=PHONE_NUMS.length;
while (count<10){
Random rand = new Random();
String phoneNum=PHONE_NUMS[rand.nextInt(length)];
ProducerRecord<String, String> record = new ProducerRecord<>("topic4", phoneNum, phoneNum);
RecordMetadata metadata = producer.send(record).get();
String result="phoneNum ["+record.value()+"] has been sent to partition " +metadata.partition();
System.out.println(result);
Thread.sleep(500);
count++;
}
producer.close();
}
}
编写一个消费者来消费数据:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* @program:demo
* @description:自定义分区器Consumer
* @auth0r: SYP
* @create: Creted on 2022-05-18 09:42
**/
public class PartitionerConsumer {
private static KafkaConsumer<String,String> consumer;
private final static String TOPIC="topic4";
public PartitionerConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers","192.168.188.128:9092");
//每个消费者分配独立的组号
props.put("group.id","javagroup");
//如果value合法,则自动提交偏移量
props.put("enable.auto.commit", "true");
//设置多久一次更新被消费消息的偏移量
props.put("auto.commit.interval.ms", "1000");
//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
props.put("session.timeout.ms", "30000");
//自动重置offset
//earliest 在偏移量无效的情况下 消费者将从起始位置读取分区的记录
//latest 在偏移量无效的情况下 消费者将从最新位置读取分区的记录
props.put("auto.offset.reset","earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer=new KafkaConsumer<String,String>(props);
}
public void consume(){
//消费消息
consumer.subscribe(Arrays.asList(TOPIC));
try {
while (true){
//拉取records
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String,String> record: records){
System.out.printf("消费端"+"partition=%d, offset=%d, key=%s, value=%s",record.partition(),
record.offset(),record.key(),record.value());
System.out.println();
}
}
}finally {
consumer.close();
}
}
public static void main(String[] args) {
new PartitionerConsumer().consume();
}
}
我们可以看一下
topic4下面有3个partition
分别运行生产者和消费者代码并查看测试结果:
四:kafka事务提交:
如果在发送时需要创建事务.可以使用kafkaTemplate的executeIntransaction方法来声明事务:
@GetMapping("transaction")
public void snedMessage7(){
// 声明事务:后面报错消息不会发出去
// kafkaTemplate.executeInTransaction(operationsCallBack->{
// operationsCallBack.send("topic1","test executeInTransaction");
// throw new RuntimeException("fail");
// });
//不声明事务:后面报错但前面消息已经发送成功了
kafkaTemplate.send("topic1","test executeInTransaction");
throw new RuntimeException("fail");
}
调用接口可以发现不声明事务时,后面报错但前面消息已经发送成功了
声明事务:
@GetMapping("transaction")
@Transactional
public void snedMessage7(){
// 声明事务:后面报错消息不会发出去
kafkaTemplate.executeInTransaction(operationsCallBack->{
operationsCallBack.send("topic1","test executeInTransaction");
throw new RuntimeException("fail");
});
//不声明事务:后面报错但前面消息已经发送成功了
// kafkaTemplate.send("topic1","test executeInTransaction");
// throw new RuntimeException("fail");
}
调用接口报错:
Producer factory does not support transactions
这是因为:使用kafka事务发送消息的时候没有开启事务
需要在配置里打开事务:
继续调用报错:
Could not create Kafka transaction; nested exception is org.apache.kafka.common.config.ConfigException: Must set retries to non-zero when using the idempotent producer.
修改配置:
继续调用报错如下:
Could not create Kafka transaction; nested exception is org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.
修改配置:
继续调用报错如下:
Failing batch since transaction was aborted 这说明事务生效
继续调用
No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
需要添加@Transactional注解
五
:消费者
1、指定topic、partition、offset消费
前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供,
/*监听topic4的0号分区,同时监听topic1的0号分区和topic2的1号分区里面offset从8开始的消息。
注意:topics和topicPartitions不能同时使用;*/
@KafkaListener(id = "consumer1",groupId = "javagroup",topicPartitions = {
@TopicPartition(topic = "topic1", partitions = { "0" }),
@TopicPartition(topic = "topic4", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
public void onMessage2(ConsumerRecord<?, ?> record) {
System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
}
运行PartitionerProducer后
之所以没有topic1的,是因为我们没有向topic1发送消息
属性解释:
① id:消费者ID;
② groupId:消费组ID;
③ topics:监听的topic,可监听多个;
④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。
上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。
注意:topics和topicPartitions不能同时使用;
2.ConsumerAwareListenerErrorHandler 异常处理器
通过异常处理器我们可以处理consumer在消费时发生的异常
新建一个ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器,
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.stereotype.Component;
/**
* @program:demo
* @description:
* @auth0r: SYP
* @create: Creted on 2022-05-17 11:21
**/
@Component
public class KafkaErrorHnadler {
新建一个异常处理器,用@Bean注入
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler(){
return(message,exception,consumer)->{
System.out.println("消费异常: "+message.getPayload());
return null;
};
}
}
//将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器,
@KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?,?> record) throws Exception {
throw new Exception("简单消费-模拟异常");
}
// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
@KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
System.out.println("批量消费一次...");
throw new Exception("批量消费-模拟异常");
}
执行看效果:
3.消息过滤器
消息过滤器可以在消息抵达Consumer之前被拦截,在实际应用中,我们可以根据业务需要筛选出需要的信息再交由KafkaListener处理,不需要的消息过滤掉
配置消息过滤只需要为监听器工厂配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息会被过滤掉,返回false时消息能正常抵达监听器
@Component
public class KafkaListenerFilter {
@Autowired
ConsumerFactory consumerFactory;
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 被过滤的消息将被丢弃
factory.setAckDiscarded(true);
//消息过滤策略
factory.setRecordFilterStrategy(consumerRecord -> {
if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
return false;
}
return true;
});
return factory;
}
}
//消息过滤监听
@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
public void onMessage6(ConsumerRecord<?,?> record){
System.out.println(record.value());
}
//消息过滤器
@GetMapping("KafkaFilter")
public void sendMessage6(){
for (int i=1;i<=100;i++){
kafkaTemplate.send("topic1",i+"");
}
}
4.消息转发
在实际开发中,我们可能有这样的需求,应用A从topicA获取消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后转发至其他应用
在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下,
/**
* @Description: 从topic1接收到的消息经过处理后转发到topic2,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容
* @Param: null
* @return:
* @Author: SYP
* @date: 2022/5/17
*/
@KafkaListener(topics = {"topic1"})
@SendTo("topic2")
public String onMessage7(ConsumerRecord<?,?> record){
System.out.println("topic: "+record.topic()+" partion: "+record.partition()+" offset: "+record.offset()+" value: "+record.value());
return record.value()+"-forward message";
}
@KafkaListener(topics = {"topic2"})
public void onMessage8(ConsumerRecord<?,?> record){
System.out.println("topic: "+record.topic()+" partion: "+record.partition()+" offset: "+record.offset()+" value: "+record.value());
}
6.定时启动.停止监听器
默认情况下,当消费者项目启动时,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢
—-使用KafkaListenerEndpointRegistry,下面我们就来实现:
① 禁止监听器自启动;
② 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;
新建一个任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry在SpringIO中已经被注册为bean,直接注入,设置禁止KafkaListener自启动,
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* @program:demo
* @description:
* @auth0r: SYP
* @create: Creted on 2022-05-17 14:45
**/
@EnableScheduling
@Component
public class CronTimer {
/**
* @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
* 而是会被注册在KafkaListenerEndpointRegistry中,
* 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
**/
@Autowired(required = false)
private KafkaListenerEndpointRegistry registry;
@Autowired
private ConsumerFactory consumerFactory;
@KafkaListener(id = "timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
public void onMessage1(ConsumerRecord<?,?> record){
System.out.println("消费成功: "+record.topic()+"-"+record.partition()+"-"+record.offset()+"-"+record.value());
}
//定时启动监听器
@Scheduled(cron = "0 47 15 * * ? ")
public void startListener(){
System.out.println("启动监听器...");
if (!registry.getListenerContainer("timingConsumer").isRunning()){
registry.getListenerContainer("timingConsumer").start();
}
}
@Scheduled(cron = "0 48 15 * * ?")
public void shutdownListener1(){
System.out.println("关闭监听器...");
registry.getListenerContainer("timingConsumer").pause();
}
}
@Autowired
ConsumerFactory consumerFactory;
// 监听器容器工厂(设置禁止KafkaListener自启动)
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory(){
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//禁止kafkaListener自启动
container.setAutoStartup(false);
return container;
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* @program:demo
* @description:
* @auth0r: SYP
* @create: Creted on 2022-05-17 14:45
**/
@EnableScheduling
@Component
public class CronTimer {
/**
* @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
* 而是会被注册在KafkaListenerEndpointRegistry中,
* 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
**/
@Autowired(required = false)
private KafkaListenerEndpointRegistry registry;
@Autowired
private ConsumerFactory consumerFactory;
@KafkaListener(id = "timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
public void onMessage1(ConsumerRecord<?,?> record){
System.out.println("消费成功: "+record.topic()+"-"+record.partition()+"-"+record.offset()+"-"+record.value());
}
//定时启动监听器
@Scheduled(cron = "0 34 15 * * ? ")
public void startListener(){
System.out.println("启动监听器...");
if (!registry.getListenerContainer("timingConsumer").isRunning()){
registry.getListenerContainer("timingConsumer").start();
}
}
@Scheduled(cron = "0 35 15 * * ?")
public void shutdownListener1(){
System.out.println("关闭监听器...");
registry.getListenerContainer("timingConsumer").pause();
}
}
@GetMapping("CronTimer")
public void sendMessage8() throws InterruptedException {
System.out.println("定时任务开启");
int i=0;
while (true){
kafkaTemplate.send("topic1",i+"");
i++;
Thread.sleep(3000);
}
}