springboot集成整合kafka-消费者异常处理器ConsumerAwareListenerErrorHandler

  • Post author:
  • Post category:其他




写在前面:各位看到此博客的小伙伴,如有不对的地方请及时通过私信我或者评论此博客的方式指出,以免误人子弟。多谢!

前面生产者在发送消息异常的时候提供了事务机制,确保消费者不会接收到一些错误的或者不需要的消息,那么当消费者对消息消费处理的时候如果发生了异常呢,当然也需要处理一下异常了,一般我们在监听方法中,只是监听topic中的数据并消费,如果再try catch捕获并处理的话,则会显得代码块非常臃肿不利于维护,当然可以使用springboot提供的@ControllerAdvice注解来统一的异常处理,其实spring-kafka为我们提供了专门的异常处理器(ConsumerAwareListenerErrorHandler),通过异常处理器,我们可以处理consumer在消费时发生的异常。使用异常处理器也很简单,只要注册异常处理器并在监听器中引用使用即可,具体使用方式如下记录:


一、注册异常处理器

/**
 * 自定义消费者异常处理器
 */
@Configuration
public class CustomListenerErrorHandler {

    @Bean
    public ConsumerAwareListenerErrorHandler listenerErrorHandler(){
        return new ConsumerAwareListenerErrorHandler() {
            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
                System.out.println("--- 消费时发生异常 ---");
                return null;
            }
        };
    }

}


二、使用异常处理器

异常处理器注册好之后,将这个异常处理器的BeanName(默认就是方法名)放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器。

    @KafkaListener(topics = {"mytopic"},errorHandler = "listenerErrorHandler")
    public void consumer(List<String> message){
        System.out.println("接收到的消息:" + message);
        // 模拟消费时发生异常
        message.forEach(m ->{
            if(m.equals("prefix:msg-10")){
                throw new RuntimeException("fail");
            }
        });
    }


三、测试下

还是用之前的测试方法:

    @Transactional
    @GetMapping("/send13")
    public void test13() {
        for (int i = 0; i < 23; i++) {
            kafkaTemplate.send(topic, "msg-" + i);
        }
    }

启动项目,访问

http://localhost:8080/send13

结果如下:

如上,可以看到异常处理器已经能正常使用了。



版权声明:本文为H900302原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。