spring-boot 版本 1.5.12
依赖使用spring-kafka1.3.3(对应kafka-clients版本0.11.0.0,请使用于kafka版本对应版本的依赖)
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.12.RELEASE</version>
<relativePath/>
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.3.RELEASE</version>
</dependency>
1、自定义监听工厂 (resources目录下面kafka.properties文件中定义对应参数)
kafka.bootstrapServers=127.0.0.1::9093 kafka.groupId=test kafka.sessionTimeout=30000 kafka.maxPollRecords=50 kafka.autoOffsetReset=latest #kafka.max.poll.interval.ms= kafka.autoCommitIntervalMs=2000 kafka.consumerRequestTimeoutMs=320000 #消费者并发启动个数(对应分区个数)每个listener方法 kafka.concurrency=10
@Configuration
@PropertySource("kafka.properties")
@ConfigurationProperties(prefix = "kafka")
@Data
public class KonkaKafkaConfig {
private String bootstrapServers;
private String groupId;
private String sessionTimeout;
private String maxPollRecords;
private String autoOffsetReset;
private String autoCommitIntervalMs;
private String consumerRequestTimeoutMs;
private Integer concurrency;
@Bean("kafkaListenerContainerFactory")
public KafkaListenerContainerFactory<?> batchFactory(){
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setBatchListener(true);
factory.setConcurrency(concurrency);
return factory;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,consumerRequestTimeoutMs);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaLogMessageDeSer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//每一批数量
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
// props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) 超过对接时间认为是lock
return props;
}
}
2、监听器
@Component
public class KafkaListener {
private final static Logger LOGGER = LoggerFactory.getLogger(KafkaListener.class);
@KafkaListener(containerFactory = "kafkaListenerContainerFactory", topics = "ucenter")
public void consumerListener(List<ConsumerRecord> consumerRecords) {
if (consumerRecords.size() > 0) {
PartitionCounter.addCounter(consumerRecords.get(0).partition(), consumerRecords.size());
}
Iterator<ConsumerRecord> iterator = consumerRecords.iterator();
while (iterator.hasNext()) {
ConsumerRecord consumerRecord = iterator.next();
String key = consumerRecord.key().toString();
String value = consumerRecord.value().toString();
}
}
}
3、spring-boot容器即可
(参数详解看后面文章)
版权声明:本文为asd5629626原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。