自动提交方案
示例代码:
from kafka import KafkaConsumer
if __name__ == '__main__':
# 1.创建消费者对象
consumer = KafkaConsumer(
'test_kafka',
bootstrap_servers=['node1:9092', 'node2:9093', 'node3:9094'],
group_id='g_01',
enable_auto_commit=True, # 是否自动提交偏移量信息
auto_commit_interval_ms=1, # 每隔多长时间提交一次偏移量信息
# auto_offset_reset='earliest' # 偏移量默认读取位置
)
# 2.获取消息数据
for msg in consumer:
topic = msg.topic
partition = msg.partition
offset = msg.offset
key = msg.key
value = msg.value
print(f'topic为:{topic},分片为:{partition},偏移量为:{offset}, 消息key为:{key}, 消息value为:{value}')
手动提交方案
除非对偏移量信息需要严格把控,建议自己维护偏移量信息,进行处理即可!
示例代码:
from kafka import KafkaConsumer
if __name__ == '__main__':
# 1.创建消费者对象
consumer = KafkaConsumer(
'test_kafka',
bootstrap_servers=['node1:9092', 'node2:9093', 'node3:9094'],
group_id='g_03',
enable_auto_commit=False, # 是否自动提交偏移量信息
# auto_commit_interval_ms=1, # 每隔多长时间提交一次偏移量信息
# auto_offset_reset='earliest' # 偏移量默认读取位置
)
# 2.获取消息数据
for msg in consumer:
topic = msg.topic
partition = msg.partition
offset = msg.offset
key = msg.key
value = msg.value
print(f'topic为:{topic},分片为:{partition},偏移量为:{offset}, 消息key为:{key}, 消息value为:{value}')
# 手动提交偏移量信息:当选择手动的时候,千万不能丢失以下操作,否则会导致大量的重复消费处理
# consumer.commit() # 同步提交
consumer.commit_async() # 异步提交(推荐)
版权声明:本文为weixin_44799217原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。