Kafka消费者如何提交偏移量信息(自动和手动)

  • Post author:
  • Post category:其他

自动提交方案

示例代码:

from kafka import KafkaConsumer

if __name__ == '__main__':
    # 1.创建消费者对象
    consumer = KafkaConsumer(
        'test_kafka',
        bootstrap_servers=['node1:9092', 'node2:9093', 'node3:9094'],
        group_id='g_01',
        enable_auto_commit=True,  # 是否自动提交偏移量信息
        auto_commit_interval_ms=1,  # 每隔多长时间提交一次偏移量信息
        # auto_offset_reset='earliest'  # 偏移量默认读取位置
    )

    # 2.获取消息数据
    for msg in consumer:
        topic = msg.topic
        partition = msg.partition
        offset = msg.offset
        key = msg.key
        value = msg.value
        print(f'topic为:{topic},分片为:{partition},偏移量为:{offset}, 消息key为:{key}, 消息value为:{value}')

手动提交方案

        除非对偏移量信息需要严格把控,建议自己维护偏移量信息,进行处理即可!

示例代码:

from kafka import KafkaConsumer

if __name__ == '__main__':
    # 1.创建消费者对象
    consumer = KafkaConsumer(
        'test_kafka',
        bootstrap_servers=['node1:9092', 'node2:9093', 'node3:9094'],
        group_id='g_03',
        enable_auto_commit=False,  # 是否自动提交偏移量信息
        # auto_commit_interval_ms=1,  # 每隔多长时间提交一次偏移量信息
        # auto_offset_reset='earliest'  # 偏移量默认读取位置
    )

    # 2.获取消息数据
    for msg in consumer:
        topic = msg.topic
        partition = msg.partition
        offset = msg.offset
        key = msg.key
        value = msg.value
        print(f'topic为:{topic},分片为:{partition},偏移量为:{offset}, 消息key为:{key}, 消息value为:{value}')

        # 手动提交偏移量信息:当选择手动的时候,千万不能丢失以下操作,否则会导致大量的重复消费处理
        # consumer.commit()  # 同步提交
        consumer.commit_async()  # 异步提交(推荐)

版权声明:本文为weixin_44799217原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。