首先创建Consumer需要的配置信息,最基本的有五个信息:
- Kafka集群的地址。
- 发送的Message中Key的序列化方式。
- 发送的Message中Value的序列化方式。
- 指定Consumer Group。
-
指定拉取Message范围的策略。
Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:Port"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_1"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // earliest, none
然后传入上面实例化好的配置信息,实例化Consumer:
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
然后通过Consumer的
subscribe(Collection<String> topics)
方法订阅Topic:
consumer.subscribe(Arrays.asList("first_topic"));
最后获取Topic里的Message,将Message信息输出到日志中:
while(true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for(ConsumerRecord<String, String> record : records) { logger.info("Key: " + record.key() + ", Value: " + record.value()); logger.info("Partition: " + record.partition() + ", Offset: " + record.offset()); } }
Consumer的
poll(Duration timeout)
方法可以设置获取数据的时间间隔,同时回忆一下在之前Consumer章节的
Consumer Poll Options
小节中,说过关于Consumer获取Message的四个配置项,都可以在Properties里进行设置。
启动Java Consumer后,在控制台可以看到如下信息:
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732 [main] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Discovered group coordinator IP:9092 (id: 2147483647 rack: null) [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions [] [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 1 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-0, first_topic-1, first_topic-2] [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-0 to offset 23. [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-1 to offset 24. [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-2 to offset 21.
在上面的信息中,可以看到
Setting newly assigned partitions [first_topic-0, first_topic-1, first_topic-2]
这句话,说明当前这个Consumer会获取
first_topic
这个Topic中全部Partition中的Message。
如果我们再启动一个Consumer,这个Consumer和第一个在同一个组里,看看会有什么输出信息:
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732 [main] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Discovered group coordinator IP:9092 (id: 2147483647 rack: null) [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions [] [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 2 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-2]
可以看到新启动的Consumer会输出
Setting newly assigned partitions [first_topic-2]
这句话,说明新的这个Consumer只会获取
first_topic
这个Topic的一个Partition中的Message。
再回去看看第一个Consumer的控制台:
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Attempt to heartbeat failed since group is rebalancing [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions [first_topic-0, first_topic-1, first_topic-2] [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 2 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-0, first_topic-1]
第一个Consumer新输出在控制台中的信息很关键,首先看到
Attempt to heartbeat failed since group is rebalancing
这句话,说明Kafka会自动重新给Consumer Group里的Consumer分配Topic的Partition。
再看
Setting newly assigned partitions [first_topic-0, first_topic-1]
这句,说明第一个Consumer不会再获取
first_topic-2
这个Partition里的Message了。这也印证了在Consumer章节的
Consumer Group
小节里讲过的概念。
Java Consumer with Assign and Seek
如果我们有一个临时的Consumer,不想加入任何一个Consumer Group,而且需要指定Topic的Partition,以及指定从哪个Message Offset开始获取数据,怎么办?所幸,Kafka提供了这样的API。
首先我们在实例化配置信息时,就不需要指定Consumer Group了:
Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstant.BOOTSTRAP_SERVER); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest, none
然后实例化
TopicPartition
,指定Topic和Partition序号。使用Consumer的
assign(Collection<TopicPartition> partitions)
方法,分配给该Consumer:
TopicPartition topicPartition = new TopicPartition("first_topic", 0); consumer.assign(Arrays.asList(topicPartition));
再然后指定Message Offset:
long offset = 21L; consumer.seek(topicPartition, offset);
运行该Consumer,可以看到如下输出信息:
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=] Fetch offset 21 is out of range for partition first_topic-0, resetting offset [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=] Resetting offset for partition first_topic-0 to offset 22. [main] INFO com.devtalking.jacefu.kafka.tutorial.ConsumerDemoAssignSeek - Key: null, Value: hello world! [main] INFO com.devtalking.jacefu.kafka.tutorial.ConsumerDemoAssignSeek - Partition: 0, Offset: 22
如果我们使用Consumer Group CLI查看,会发现这种操作其实也是临时创建了一个Consumer Group:
root@iZ2ze2booskait1cxxyrljZ:~# kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list consumer_group_1 KMOffsetCache-iZ2ze2booskait1cxxyrljZ