SpringBoot整合Kafka下

  • Post author:
  • Post category:其他


当消费端是批量接收消息,配置中的自动提交需要关闭,同时要把手动提交打开

在这里插入图片描述

kafka:
    ###########【Kafka集群】###########
    bootstrap-servers: 192.168.188.128:9092
    producer:
      retries: 0 # 重试次数
      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01、all/-1)
      batch-size: 16384 # 批量大小
      buffer-memory: 33554432 # 生产端缓冲区大小
      # Kafka提供的序列化和反序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #额外的,没有直接有properties对应的参数,将存放到下面这个Map对象中,一并初始化
#      properties:
#        #自定义分区器
#        partitioner.class: com.example.demo.kafka.CustomizePartitioner

    consumer:
      group-id: javagroup
      enable-auto-commit: false
      auto-commit-interval: 1000
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      key-deserializer:  org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer:  org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 50
      # 批量消费每次最多消费多少条消息
    listener:
      type: batch
      ack-mode: manual_immediate

注意:批量消费和批量发送是无关的,可以一条条发送批量消费也可以批量发送批量消费

当批量发送时:

Producer:

在这里插入图片描述

在这里插入图片描述

代码如下:

@Data
public class KafkaMessage {
    public Integer index;
    public String id;
    public String value;
}

@GetMapping("BatchSend")
    public void sendProducerRecord(){
        for(int i=0; i<100; i++) {
            KafkaMessage kafkaMessage = new KafkaMessage();
            kafkaMessage.setIndex(i);
            kafkaMessage.setId(UUID.randomUUID().toString());
            kafkaMessage.setValue("producerRecord " + i);
            ProducerRecord<String, Object> producerRecord =
                    new ProducerRecord<>("topic1", "key1", JSONObject.toJSONString(kafkaMessage));
            kafkaTemplate.send(producerRecord);
        }
    }

Consumer:

在这里插入图片描述

代码:

@KafkaListener(id = "consumer1",groupId = "javagroup", topics = "topic1")
    public void onMessage3(List<ConsumerRecord<?, ?>> records) {
        System.out.println(">>>批量消费一次,records.size()="+records.size());
        for (ConsumerRecord<?, ?> record : records) {
            System.out.println("****"+record.value());
        }
    }

ApiPost发送请求:

在这里插入图片描述

在这里插入图片描述

当Producer发送一条消息,Consumer批量消费时:

Producer:

在这里插入图片描述

    @GetMapping("Producer")
    public void sendMessage(String normalMessage){
        kafkaTemplate.send("topic1",normalMessage);
    }

Consumer不变

ApiPost发送请求:

在这里插入图片描述

在这里插入图片描述

注意:如果把配置文件里批量消费关掉

在这里插入图片描述

但是Consumer还是批量接收时,会报错:

在这里插入图片描述

因此批量消费时务必修改配置文件中的批量监听以及手动提交

当配置文件中批量监控以及手动提交开启

在这里插入图片描述

而Consumer为简单消费时:

在这里插入图片描述

无论批量发送还是简单发送都会报错;

在这里插入图片描述

总结:

配置关闭,批量消费报错

配置打开,简单消费报错

因此配置和消费要保持一致

当Producer批量发送,配置文件批量监听手动提交都关闭,而Consumer为单体消费时:

在这里插入图片描述

@KafkaListener(topics = {"topic1"})
    public void onMessage1(ConsumerRecord<?,?>record){
        System.out.println("简单消费: "+record.topic()+"_"+record.partition()+"_"+"_"+record.value());
    }

在这里插入图片描述

二.回调:

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,

在这里插入图片描述

//    带回掉的Producer  .addCallback(SuccessCallback<>,FailureCallback<>)
    @GetMapping("CallBackOne")
    public void sendMessage2(String callBackMessage){
        kafkaTemplate.send("topic1",callBackMessage).addCallback(successCallBack->{
            String topic = successCallBack.getRecordMetadata().topic();
            int partition = successCallBack.getRecordMetadata().partition();
            long offset = successCallBack.getRecordMetadata().offset();
            System.out.println("发送消息成功: "+topic+"-"+partition+"-"+offset);
        },failureCallBack->{
            System.out.println("发送消息失败:"+failureCallBack.getMessage());
        });
    }

在这里插入图片描述

第二种写法:

在这里插入图片描述

//带回调的Producer addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {})
    @GetMapping("CallBackTwo")
    public void sendMessage3(String callBackMessage){
        kafkaTemplate.send("topic2",callBackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败: "+ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("发送消息成功: "+"-"+result.getRecordMetadata().offset());
            }
        });
    }

在这里插入图片描述

补充:全局回调

注意:这样设置以后,该单例的kafkaTemplate就有了一个回调,注意是全局回调,只要一个实例中设置,全局生效

在这里插入图片描述

@Component
@Slf4j
public class KafkaSendResultHandler implements ProducerListener {

    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
        System.out.println("Message send success : " + producerRecord.toString());

    }

    @Override
    public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
        System.out.println("Message send error : " + producerRecord.toString());
    }
}
    @Autowired(required = false)
    private KafkaSendResultHandler producerListener;
    //全局回调
    @GetMapping("GlobalCallBack")
    public void testProducerListen() throws InterruptedException {
        kafkaTemplate.setProducerListener(producerListener);
        kafkaTemplate.send("topic1", "test producer listen");
        Thread.sleep(1000);
    }

在这里插入图片描述

此时如果调用另一个接口:

在这里插入图片描述

在这里插入图片描述

此接口虽然未设置回调,但由于是全局回调,所以回调依然生效

在这里插入图片描述

三:自定义分区器

我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

① 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;

② 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;

③ patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;

※ 我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区,

public class CustomizePartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 得到 topic 的 partitions 信息
        List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
        System.out.println("partitionInfoList"+partitionInfoList);
        System.out.println("topic: "+topic+"-"+"key: "+key.toString()+"-"+"value: "+value.toString());
        int size = partitionInfoList.size();
        System.out.println("分区有: "+size+"个");
        //模拟某客服
        if (key.toString().equals("10000")||key.toString().equals("11111")){
            //放到最后一个分区中
            return size-1;
        }
        String phoneNum= key.toString();
        return phoneNum.substring(0,3).hashCode()%(size-1);
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

编写一个Kafka生产者来使用自定义分区器:

/**
 * @program:demo
 * @description:测试自定义分区器Producer
 * @auth0r: SYP
 * @create: Creted on 2022-05-16 11:38
 **/
public class PartitionerProducer{
    private static final String[] PHONE_NUMS=new String[]{"10000", "10000", "11111", "13700000003", "13700000004",
            "10000", "15500000006", "11111", "15500000008",
            "17600000009", "10000", "17600000011"
    };

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //定义配置信息
        Properties props = new Properties();
        //kafka地址,多个地址用逗号分隔
        props.put("bootstrap.servers","192.168.188.128:9092");
        props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化
        props.put("retries", 3);// 请求失败重试的次数
        props.put("batch.size", 16384);// batch的大小
        props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
        props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量

        //设置分区器
        props.put("partitioner.class","com.example.demo.kafka.CustomizePartitioner");
        //设置序列化类,可以写类的全路径
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        int count=0;
        int length=PHONE_NUMS.length;

        while (count<10){
            Random rand = new Random();
            String phoneNum=PHONE_NUMS[rand.nextInt(length)];
            ProducerRecord<String, String> record = new ProducerRecord<>("topic4", phoneNum, phoneNum);
            RecordMetadata metadata = producer.send(record).get();
            String result="phoneNum ["+record.value()+"] has been sent to partition " +metadata.partition();
            System.out.println(result);
            Thread.sleep(500);
            count++;
        }
        producer.close();
    }
}

在这里插入图片描述

编写一个消费者来消费数据:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * @program:demo
 * @description:自定义分区器Consumer
 * @auth0r: SYP
 * @create: Creted on 2022-05-18 09:42
 **/
public class PartitionerConsumer {
    private static KafkaConsumer<String,String> consumer;
    private final static String TOPIC="topic4";

    public PartitionerConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers","192.168.188.128:9092");
        //每个消费者分配独立的组号
        props.put("group.id","javagroup");
        //如果value合法,则自动提交偏移量
        props.put("enable.auto.commit", "true");
        //设置多久一次更新被消费消息的偏移量
        props.put("auto.commit.interval.ms", "1000");
        //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
        props.put("session.timeout.ms", "30000");
        //自动重置offset
        //earliest 在偏移量无效的情况下 消费者将从起始位置读取分区的记录
        //latest 在偏移量无效的情况下 消费者将从最新位置读取分区的记录
        props.put("auto.offset.reset","earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer=new KafkaConsumer<String,String>(props);
    }

    public void consume(){
        //消费消息
        consumer.subscribe(Arrays.asList(TOPIC));
        try {
            while (true){
                //拉取records
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String,String> record: records){
                    System.out.printf("消费端"+"partition=%d, offset=%d, key=%s, value=%s",record.partition(),
                            record.offset(),record.key(),record.value());
                    System.out.println();
                }
            }
        }finally {
            consumer.close();
        }
    }

    public static void main(String[] args) {
        new PartitionerConsumer().consume();
    }
}

我们可以看一下

在这里插入图片描述

topic4下面有3个partition

分别运行生产者和消费者代码并查看测试结果:

在这里插入图片描述

四:kafka事务提交:

如果在发送时需要创建事务.可以使用kafkaTemplate的executeIntransaction方法来声明事务:

    @GetMapping("transaction")
    public void snedMessage7(){

        // 声明事务:后面报错消息不会发出去
//        kafkaTemplate.executeInTransaction(operationsCallBack->{
//            operationsCallBack.send("topic1","test executeInTransaction");
//            throw new RuntimeException("fail");
//        });
        //不声明事务:后面报错但前面消息已经发送成功了
        kafkaTemplate.send("topic1","test executeInTransaction");
        throw new RuntimeException("fail");
    }

调用接口可以发现不声明事务时,后面报错但前面消息已经发送成功了

在这里插入图片描述

声明事务:

@GetMapping("transaction")
    @Transactional
    public void snedMessage7(){

        // 声明事务:后面报错消息不会发出去
        kafkaTemplate.executeInTransaction(operationsCallBack->{
            operationsCallBack.send("topic1","test executeInTransaction");
            throw new RuntimeException("fail");
        });
        //不声明事务:后面报错但前面消息已经发送成功了
//        kafkaTemplate.send("topic1","test executeInTransaction");
//        throw new RuntimeException("fail");
    }

调用接口报错:

Producer factory does not support transactions

在这里插入图片描述

这是因为:使用kafka事务发送消息的时候没有开启事务

需要在配置里打开事务:

在这里插入图片描述

继续调用报错:

Could not create Kafka transaction; nested exception is org.apache.kafka.common.config.ConfigException: Must set retries to non-zero when using the idempotent producer.

在这里插入图片描述

修改配置:

在这里插入图片描述

继续调用报错如下:

Could not create Kafka transaction; nested exception is org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.

在这里插入图片描述

修改配置:

在这里插入图片描述

继续调用报错如下:

在这里插入图片描述

Failing batch since transaction was aborted 这说明事务生效

继续调用

在这里插入图片描述

No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record

需要添加@Transactional注解




:消费者

1、指定topic、partition、offset消费

前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供,

    /*监听topic4的0号分区,同时监听topic1的0号分区和topic2的1号分区里面offset从8开始的消息。
    注意:topics和topicPartitions不能同时使用;*/
    @KafkaListener(id = "consumer1",groupId = "javagroup",topicPartitions = {
            @TopicPartition(topic = "topic1", partitions = { "0" }),
            @TopicPartition(topic = "topic4", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
    })
    public void onMessage2(ConsumerRecord<?, ?> record) {
        System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
    }

运行PartitionerProducer后

在这里插入图片描述

之所以没有topic1的,是因为我们没有向topic1发送消息

属性解释:

① id:消费者ID;

② groupId:消费组ID;

③ topics:监听的topic,可监听多个;

④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。

上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。

注意:topics和topicPartitions不能同时使用;

2.ConsumerAwareListenerErrorHandler 异常处理器

通过异常处理器我们可以处理consumer在消费时发生的异常

新建一个ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器,

import org.springframework.context.annotation.Bean;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.stereotype.Component;

/**
 * @program:demo
 * @description:
 * @auth0r: SYP
 * @create: Creted on 2022-05-17 11:21
 **/
@Component
public class KafkaErrorHnadler {
     新建一个异常处理器,用@Bean注入
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler(){
        return(message,exception,consumer)->{
            System.out.println("消费异常: "+message.getPayload());
            return null;
        };
    }
}
    //将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器,
    @KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
    public void onMessage4(ConsumerRecord<?,?> record) throws Exception {
        throw new Exception("简单消费-模拟异常");
    }

    // 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
    @KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
    public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
        System.out.println("批量消费一次...");
        throw new Exception("批量消费-模拟异常");
    }

执行看效果:

在这里插入图片描述

在这里插入图片描述

3.消息过滤器

消息过滤器可以在消息抵达Consumer之前被拦截,在实际应用中,我们可以根据业务需要筛选出需要的信息再交由KafkaListener处理,不需要的消息过滤掉

配置消息过滤只需要为监听器工厂配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息会被过滤掉,返回false时消息能正常抵达监听器

@Component
public class KafkaListenerFilter {
    @Autowired
    ConsumerFactory consumerFactory;

    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        //消息过滤策略
        factory.setRecordFilterStrategy(consumerRecord -> {
            if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
                return false;
            }
            return true;
        });
        return factory;
    }
}
    //消息过滤监听
    @KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
    public void onMessage6(ConsumerRecord<?,?> record){
        System.out.println(record.value());
    }
    //消息过滤器
   @GetMapping("KafkaFilter")
   public void sendMessage6(){
       for (int i=1;i<=100;i++){
           kafkaTemplate.send("topic1",i+"");
       }
   }

在这里插入图片描述

4.消息转发

在实际开发中,我们可能有这样的需求,应用A从topicA获取消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后转发至其他应用

在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下,

    /**
     * @Description: 从topic1接收到的消息经过处理后转发到topic2,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容
     * @Param: null
     * @return:
     * @Author: SYP
     * @date: 2022/5/17
     */
    @KafkaListener(topics = {"topic1"})
    @SendTo("topic2")
    public String onMessage7(ConsumerRecord<?,?> record){
        System.out.println("topic: "+record.topic()+" partion: "+record.partition()+" offset: "+record.offset()+" value: "+record.value());
        return record.value()+"-forward message";
    }


    @KafkaListener(topics = {"topic2"})
    public void onMessage8(ConsumerRecord<?,?> record){
        System.out.println("topic: "+record.topic()+" partion: "+record.partition()+" offset: "+record.offset()+" value: "+record.value());
    }

在这里插入图片描述

6.定时启动.停止监听器

默认情况下,当消费者项目启动时,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢

—-使用KafkaListenerEndpointRegistry,下面我们就来实现:

① 禁止监听器自启动;

② 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;

新建一个任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry在SpringIO中已经被注册为bean,直接注入,设置禁止KafkaListener自启动,

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * @program:demo
 * @description:
 * @auth0r: SYP
 * @create: Creted on 2022-05-17 14:45
 **/
@EnableScheduling
@Component
public class CronTimer {
    /**
     * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
     * 而是会被注册在KafkaListenerEndpointRegistry中,
     * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
     **/
    @Autowired(required = false)
    private KafkaListenerEndpointRegistry registry;
    @Autowired
    private ConsumerFactory consumerFactory;

    @KafkaListener(id = "timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
    public void onMessage1(ConsumerRecord<?,?> record){
        System.out.println("消费成功: "+record.topic()+"-"+record.partition()+"-"+record.offset()+"-"+record.value());
    }

    //定时启动监听器
    @Scheduled(cron = "0 47 15 * * ? ")
    public void startListener(){
        System.out.println("启动监听器...");
        if (!registry.getListenerContainer("timingConsumer").isRunning()){
            registry.getListenerContainer("timingConsumer").start();
        }
    }

    @Scheduled(cron = "0 48 15 * * ?")
    public void shutdownListener1(){
        System.out.println("关闭监听器...");
        registry.getListenerContainer("timingConsumer").pause();
    }

}
	@Autowired
    ConsumerFactory consumerFactory;
    //     监听器容器工厂(设置禁止KafkaListener自启动)
    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory(){
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止kafkaListener自启动
        container.setAutoStartup(false);
        return container;
    }
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * @program:demo
 * @description:
 * @auth0r: SYP
 * @create: Creted on 2022-05-17 14:45
 **/
@EnableScheduling
@Component
public class CronTimer {
    /**
     * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
     * 而是会被注册在KafkaListenerEndpointRegistry中,
     * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
     **/
    @Autowired(required = false)
    private KafkaListenerEndpointRegistry registry;
    @Autowired
    private ConsumerFactory consumerFactory;

    @KafkaListener(id = "timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
    public void onMessage1(ConsumerRecord<?,?> record){
        System.out.println("消费成功: "+record.topic()+"-"+record.partition()+"-"+record.offset()+"-"+record.value());
    }

    //定时启动监听器
    @Scheduled(cron = "0 34 15 * * ? ")
    public void startListener(){
        System.out.println("启动监听器...");
        if (!registry.getListenerContainer("timingConsumer").isRunning()){
            registry.getListenerContainer("timingConsumer").start();
        }
    }

    @Scheduled(cron = "0 35 15 * * ?")
    public void shutdownListener1(){
        System.out.println("关闭监听器...");
        registry.getListenerContainer("timingConsumer").pause();
    }

}
    @GetMapping("CronTimer")
    public void sendMessage8() throws InterruptedException {
        System.out.println("定时任务开启");
        int i=0;
        while (true){
            kafkaTemplate.send("topic1",i+"");
            i++;
            Thread.sleep(3000);
        }
    }

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述



版权声明:本文为shangyupeng_原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。