自动提交
    
程序拉取消息后,满足要求后自动提交,无需程序开发者介入。
- 
配置
 
@Bean("kafkaContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //并发数量
        factory.setConcurrency(concurrency);
        //批量获取
        factory.setBatchListener(true);
        //不自动启动
        factory.setAutoStartup(false);
        factory.getContainerProperties().setPollTimeout(1500);
        //rebalance监听
        factory.getContainerProperties().setConsumerRebalanceListener(new RebalanceListener());
        return factory;
    }
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        //设置自动提交
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //自动提交时间间隔
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //重置位移 从最新的或最旧的消息开始消费
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        return propsMap;
    }
     2.重复消费与丢消息的情景
    
     2.1丢消息
    
在consumer拉取消息之后,达到提交时间AUTO_COMMIT_INTERBAL_MS_CONFIG之后位移(例如1-10)已经自动提交,若此时消息尚未消费完成时,且消费者挂掉了,此时尚未消费的消息会丢失。因为位移1-10已经提交,下次拉取消息会从位移10开始消费。注:提交的位移是下次消费的起点。
     2.2重复消费
    
此情况多发生于AUTO_COMMIT_INTERBAL_MS_CONFIG时间配置过长的情况下。在consumer拉取消息之后,消费者已经完成消费,但此时还未达到AUTO_COMMIT_INTERBAL_MS_CONFIG,位移尚未提交,但消费者挂掉了。下次会重新拉取此批消息,重新处理,导致重复消费。
     手动提交
    
手动提交,程序设计者自行控制提交位移的时间。可由spring自动提交或主动调用提交位移的方法主动提交。
     1.配置
    
@Bean("kafkaContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //并发数量
        factory.setConcurrency(concurrency);
        //批量获取
        factory.setBatchListener(true);
        factory.setAutoStartup(false);
        factory.getContainerProperties().setPollTimeout(1500);
        //手动提交方式 手动调用api接口后马上提交
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        //手动同步提交
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setConsumerRebalanceListener(new RebalanceListener());
        return factory;
    }
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        //自动提交设置为false
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        return propsMap;
    }
     2.手动提交方式及参数介绍
    
     2.1同步手动提交
    
同步阻塞,提交函数有返回后继续执行;失败自动重试;
     2.2异步手动提交
    
异步提交无需等待提交方法的返回;无失败重试机制。
原因:位移异步提交,可能出现位移2提交失败,位移3提交成功,若位移2尝试重新提交并成功了,会把位移从3更新成2,导致下次重新拉取消息3造成重复消费。
     2.3手动提交参数
    
spring kafka:
- 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 将自动提交设置为false; 
- 
factory.getContainerProperties().setSyncCommits(true); 设置手动同步提交,或异步提交; 
- 
factory.getContainerProperties().setAckMode (AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); 
     
      ackMode参数:
     
    
* RECORD:当消息处理器返回处理结果后立即提交offset。
* BATCH:当poll()方法返回的所有消息都被处理后提交offset。(默认值)
* TIME:当poll()方法返回的所有消息都被处理且距离上一次提交offset的时间已经超过了ackTime设置的时间,则提交offset。
* COUNT:当poll()方法返回的所有消息都被处理且自上一次提交偏移量以来接收到了ackCount条记录,则提交offset。
* COUNT_TIME:与TIME和COUNT类似,但如果两种情况中有任意一种满足,则提交offset。
* MANUAL:消息监听器负责手动提交Acknowledgment对象。此后,与BATCH相同的语义将被应用。
* MANUAL_IMMEDIATE:当消息监听器调用Acknowledgment.acknowledge()方法时立即提交offset;
     3.重复消费的情景
    
     3.1重复消费
    
在消费者消费完消息1-消息10后,尚未调用提交位移的方法时,消费者挂掉了。当rebalance之后,此分区分配给新的消费者,会重复拉取消息1-消息10进行消费,从而导致重复消费问题。
 
