python-kafka客户端封装

  • Post author:
  • Post category:python





前言

本文对python的kafka包做简单封装,方便kafka初学者使用。包安装:

pip install kafka-python



封装代码

kafka_helper.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import traceback
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from typing import List


class KProducer:
    def __init__(self, bootstrap_servers: List, key_serializer=lambda m: json.dumps(m).encode("ascii"),
                 value_serializer=lambda m: json.dumps(m).encode("ascii"), compression_type=None):
        try:
            self.producer = KafkaProducer(
                bootstrap_servers=bootstrap_servers,
                buffer_memory=33554432,
                batch_size=1048576,
                max_request_size=1048576,
                key_serializer=key_serializer,
                value_serializer=value_serializer,
                compression_type=compression_type  # 压缩消息发送 gzip lz4 snappy zstd
            )
            print("connect success, kafka producer info {0}".format(bootstrap_servers))
        except Exception as e:
            raise Exception("connect kafka failed, {}.".format(e))

    def sync_send(self, topic: str, data):
        """
        同步发送数据
        :param data:  发送数据
        :param topic: 主题
        :return: partition, offset
        """
        try:
            future = self.producer.send(topic, data)
            record_metadata = future.get(timeout=10)  # 同步确认消费
            partition = record_metadata.partition  # 数据所在的分区
            offset = record_metadata.offset  # 数据所在分区的位置
            print("save success, partition: {}, offset: {}".format(partition, offset))
            return partition, offset
        except Exception as e:
            raise Exception("Kafka sync send failed, {}.".format(e))

    def async_send(self, topic: str, data):
        """
        异步发送数据
        :param data:  发送数据
        :param topic: 主题
        :return: None
        """
        try:
            self.producer.send(topic, data)
            print("send data:{}".format(data))
        except Exception as e:
            raise Exception("Kafka asyn send failed, {}.".format(e))

    def async_callback(self, topic: str, data):
        """
        异步发送数据 + 发送状态处理
        :param data:发送数据
        :param topic: 主题
        :return: None
        """
        try:
            for item in data:
                self.producer.send(topic, item).add_callback(self.__send_success).add_errback(self.__send_error)
                self.producer.flush()  # 批量提交
        except Exception as e:
            raise Exception("Kafka asyn send fail, {}.".format(e))

    @staticmethod
    def __send_success():
        """异步发送成功回调函数"""
        print("save success")
        return

    @staticmethod
    def __send_error():
        """异步发送错误回调函数"""
        print("save error")
        return

    def close(self):
        self.producer.close()


class KConsumer:
    def __init__(self, bootstrap_servers: List, topic: str, group_id: str, key_deserializer=None,
                 value_deserializer=None, auto_offset_reset="latest"):
        self.topic = topic
        try:
            self.consumer = KafkaConsumer(
                self.topic,
                bootstrap_servers=bootstrap_servers,
                group_id=group_id,
                enable_auto_commit=False,
                auto_commit_interval_ms=1000,
                session_timeout_ms=30000,
                max_poll_records=50,
                max_poll_interval_ms=30000,
                metadata_max_age_ms=3000,
                key_deserializer=key_deserializer,
                value_deserializer=value_deserializer,
                auto_offset_reset=auto_offset_reset
            )
            self.consumer.subscribe(topics=[self.topic])
            print("connect to kafka and subscribe topic success")
        except Exception as e:
            raise Exception("Kafka pconsumers set connect fail, {0}, {1}".format(e, traceback.print_exc()))

    def get_consumer(self):
        """
        返会可迭代consumer
        :return: consumer
        """
        return self.consumer

    def set_topic(self, topic: str):
        """
        订阅主题
        :param topic: 主题
        :return: None
        """
        self.topic = topic
        self.consumer.subscribe(topics=[self.topic])

    def get_message_by_partition_offset(self, partition, offset):
        """
        通过partition、offset获取一个消息
        :param partition: 分区
        :param offset: 游标、下标、序号
        :return: message,消息
        """
        self.consumer.unsubscribe()
        partition = TopicPartition(self.topic, partition)
        self.consumer.assign([partition])
        self.consumer.seek(partition, offset=offset)
        for message in self.consumer:
            return message



测试代码

kafka_test.py

from kafka_helper import KProducer,KConsumer
import json

def sync_send_test(bootstrap_servers,topic,json_format=True):
    value = {
        "send_type": "sync_send",
        "name": "lady_killer",
        "age": 18
    }
    if json_format:
        p = KProducer(bootstrap_servers=bootstrap_servers)
        p.sync_send(value,topic)
    else:
        p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None)
        v = bytes('{}'.format(json.dumps(value)), 'utf-8')
        p.sync_send(v,topic)
    p.close()

def async_send_test(bootstrap_servers,topic,json_format=True):
    value = {
        "send_type": "async_send",
        "name":"lady_killer",
        "age":18
    }
    if json_format:
        p = KProducer(bootstrap_servers=bootstrap_servers)
        p.asyn_send(value,topic)
    else:
        p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None)
        v = bytes('{}'.format(json.dumps(value)), 'utf-8')
        p.asyn_send(v,topic)
    p.close()

def consumer_test(bootstrap_servers,topic):
    c = KConsumer(bootstrap_servers=bootstrap_servers,topic=topic,group_id='test',auto_offset_reset="earliest")
    for data in c.get_consumer():
        print(type(data.value),data.value)
        print(json.loads(data.value))

def get_one_msg(bootstrap_servers,topic,partition,offset):
    c = KConsumer(bootstrap_servers=bootstrap_servers, topic=topic, group_id='test', auto_offset_reset="earliest")
    msg = c.get_message_by_partition_offset(partition,offset)
    print(msg)


if __name__ == '__main__':
    bootstrap_servers = ["kafka:9092"]
    topic = "demodata"
    # 测试生产
    sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic)
    async_send_test(bootstrap_servers=bootstrap_servers,topic=topic)
    sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False)
    async_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False)
    # 测试消费
    consumer_test(bootstrap_servers=bootstrap_servers,topic=topic)
    # get_one_msg(bootstrap_servers=bootstrap_servers,topic=topic,partition=0,offset=0)



参考


Kafka入门,这一篇就够了(安装,topic,生产者,消费者)



版权声明:本文为lady_killer9原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。