汇总Kafka手动提交与自动提交

  • Post author:
  • Post category:其他


自动提交

程序拉取消息后,满足要求后自动提交,无需程序开发者介入。

  1. 配置

@Bean("kafkaContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //并发数量
        factory.setConcurrency(concurrency);
        //批量获取
        factory.setBatchListener(true);
        //不自动启动
        factory.setAutoStartup(false);
        factory.getContainerProperties().setPollTimeout(1500);
        //rebalance监听
        factory.getContainerProperties().setConsumerRebalanceListener(new RebalanceListener());
        return factory;

    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        //设置自动提交
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //自动提交时间间隔
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //重置位移 从最新的或最旧的消息开始消费
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        return propsMap;
    }

2.重复消费与丢消息的情景

2.1丢消息

在consumer拉取消息之后,达到提交时间AUTO_COMMIT_INTERBAL_MS_CONFIG之后位移(例如1-10)已经自动提交,若此时消息尚未消费完成时,且消费者挂掉了,此时尚未消费的消息会丢失。因为位移1-10已经提交,下次拉取消息会从位移10开始消费。注:提交的位移是下次消费的起点。

2.2重复消费

此情况多发生于AUTO_COMMIT_INTERBAL_MS_CONFIG时间配置过长的情况下。在consumer拉取消息之后,消费者已经完成消费,但此时还未达到AUTO_COMMIT_INTERBAL_MS_CONFIG,位移尚未提交,但消费者挂掉了。下次会重新拉取此批消息,重新处理,导致重复消费。

手动提交

手动提交,程序设计者自行控制提交位移的时间。可由spring自动提交或主动调用提交位移的方法主动提交。

1.配置

@Bean("kafkaContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //并发数量
        factory.setConcurrency(concurrency);
        //批量获取
        factory.setBatchListener(true);
        factory.setAutoStartup(false);
        factory.getContainerProperties().setPollTimeout(1500);
        //手动提交方式 手动调用api接口后马上提交
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        //手动同步提交
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setConsumerRebalanceListener(new RebalanceListener());
        return factory;

    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        //自动提交设置为false
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        return propsMap;
    }

2.手动提交方式及参数介绍

2.1同步手动提交

同步阻塞,提交函数有返回后继续执行;失败自动重试;

2.2异步手动提交

异步提交无需等待提交方法的返回;无失败重试机制。

原因:位移异步提交,可能出现位移2提交失败,位移3提交成功,若位移2尝试重新提交并成功了,会把位移从3更新成2,导致下次重新拉取消息3造成重复消费。

2.3手动提交参数

spring kafka:

  1. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 将自动提交设置为false;

  1. factory.getContainerProperties().setSyncCommits(true); 设置手动同步提交,或异步提交;

  1. factory.getContainerProperties().setAckMode (AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);


ackMode参数:

* RECORD:当消息处理器返回处理结果后立即提交offset。
* BATCH:当poll()方法返回的所有消息都被处理后提交offset。(默认值)
* TIME:当poll()方法返回的所有消息都被处理且距离上一次提交offset的时间已经超过了ackTime设置的时间,则提交offset。
* COUNT:当poll()方法返回的所有消息都被处理且自上一次提交偏移量以来接收到了ackCount条记录,则提交offset。
* COUNT_TIME:与TIME和COUNT类似,但如果两种情况中有任意一种满足,则提交offset。
* MANUAL:消息监听器负责手动提交Acknowledgment对象。此后,与BATCH相同的语义将被应用。
* MANUAL_IMMEDIATE:当消息监听器调用Acknowledgment.acknowledge()方法时立即提交offset;

3.重复消费的情景

3.1重复消费

在消费者消费完消息1-消息10后,尚未调用提交位移的方法时,消费者挂掉了。当rebalance之后,此分区分配给新的消费者,会重复拉取消息1-消息10进行消费,从而导致重复消费问题。



版权声明:本文为qq_34505594原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。