写在前面:各位看到此博客的小伙伴,如有不对的地方请及时通过私信我或者评论此博客的方式指出,以免误人子弟。多谢!
前面生产者在发送消息异常的时候提供了事务机制,确保消费者不会接收到一些错误的或者不需要的消息,那么当消费者对消息消费处理的时候如果发生了异常呢,当然也需要处理一下异常了,一般我们在监听方法中,只是监听topic中的数据并消费,如果再try catch捕获并处理的话,则会显得代码块非常臃肿不利于维护,当然可以使用springboot提供的@ControllerAdvice注解来统一的异常处理,其实spring-kafka为我们提供了专门的异常处理器(ConsumerAwareListenerErrorHandler),通过异常处理器,我们可以处理consumer在消费时发生的异常。使用异常处理器也很简单,只要注册异常处理器并在监听器中引用使用即可,具体使用方式如下记录:
一、注册异常处理器
/**
* 自定义消费者异常处理器
*/
@Configuration
public class CustomListenerErrorHandler {
@Bean
public ConsumerAwareListenerErrorHandler listenerErrorHandler(){
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
System.out.println("--- 消费时发生异常 ---");
return null;
}
};
}
}
二、使用异常处理器
异常处理器注册好之后,将这个异常处理器的BeanName(默认就是方法名)放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器。
@KafkaListener(topics = {"mytopic"},errorHandler = "listenerErrorHandler")
public void consumer(List<String> message){
System.out.println("接收到的消息:" + message);
// 模拟消费时发生异常
message.forEach(m ->{
if(m.equals("prefix:msg-10")){
throw new RuntimeException("fail");
}
});
}
三、测试下
还是用之前的测试方法:
@Transactional
@GetMapping("/send13")
public void test13() {
for (int i = 0; i < 23; i++) {
kafkaTemplate.send(topic, "msg-" + i);
}
}
启动项目,访问
http://localhost:8080/send13
结果如下:
如上,可以看到异常处理器已经能正常使用了。