前言
本文对python的kafka包做简单封装,方便kafka初学者使用。包安装:
pip install kafka-python
封装代码
kafka_helper.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import traceback
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from typing import List
class KProducer:
def __init__(self, bootstrap_servers: List, key_serializer=lambda m: json.dumps(m).encode("ascii"),
value_serializer=lambda m: json.dumps(m).encode("ascii"), compression_type=None):
try:
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
buffer_memory=33554432,
batch_size=1048576,
max_request_size=1048576,
key_serializer=key_serializer,
value_serializer=value_serializer,
compression_type=compression_type # 压缩消息发送 gzip lz4 snappy zstd
)
print("connect success, kafka producer info {0}".format(bootstrap_servers))
except Exception as e:
raise Exception("connect kafka failed, {}.".format(e))
def sync_send(self, topic: str, data):
"""
同步发送数据
:param data: 发送数据
:param topic: 主题
:return: partition, offset
"""
try:
future = self.producer.send(topic, data)
record_metadata = future.get(timeout=10) # 同步确认消费
partition = record_metadata.partition # 数据所在的分区
offset = record_metadata.offset # 数据所在分区的位置
print("save success, partition: {}, offset: {}".format(partition, offset))
return partition, offset
except Exception as e:
raise Exception("Kafka sync send failed, {}.".format(e))
def async_send(self, topic: str, data):
"""
异步发送数据
:param data: 发送数据
:param topic: 主题
:return: None
"""
try:
self.producer.send(topic, data)
print("send data:{}".format(data))
except Exception as e:
raise Exception("Kafka asyn send failed, {}.".format(e))
def async_callback(self, topic: str, data):
"""
异步发送数据 + 发送状态处理
:param data:发送数据
:param topic: 主题
:return: None
"""
try:
for item in data:
self.producer.send(topic, item).add_callback(self.__send_success).add_errback(self.__send_error)
self.producer.flush() # 批量提交
except Exception as e:
raise Exception("Kafka asyn send fail, {}.".format(e))
@staticmethod
def __send_success():
"""异步发送成功回调函数"""
print("save success")
return
@staticmethod
def __send_error():
"""异步发送错误回调函数"""
print("save error")
return
def close(self):
self.producer.close()
class KConsumer:
def __init__(self, bootstrap_servers: List, topic: str, group_id: str, key_deserializer=None,
value_deserializer=None, auto_offset_reset="latest"):
self.topic = topic
try:
self.consumer = KafkaConsumer(
self.topic,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
enable_auto_commit=False,
auto_commit_interval_ms=1000,
session_timeout_ms=30000,
max_poll_records=50,
max_poll_interval_ms=30000,
metadata_max_age_ms=3000,
key_deserializer=key_deserializer,
value_deserializer=value_deserializer,
auto_offset_reset=auto_offset_reset
)
self.consumer.subscribe(topics=[self.topic])
print("connect to kafka and subscribe topic success")
except Exception as e:
raise Exception("Kafka pconsumers set connect fail, {0}, {1}".format(e, traceback.print_exc()))
def get_consumer(self):
"""
返会可迭代consumer
:return: consumer
"""
return self.consumer
def set_topic(self, topic: str):
"""
订阅主题
:param topic: 主题
:return: None
"""
self.topic = topic
self.consumer.subscribe(topics=[self.topic])
def get_message_by_partition_offset(self, partition, offset):
"""
通过partition、offset获取一个消息
:param partition: 分区
:param offset: 游标、下标、序号
:return: message,消息
"""
self.consumer.unsubscribe()
partition = TopicPartition(self.topic, partition)
self.consumer.assign([partition])
self.consumer.seek(partition, offset=offset)
for message in self.consumer:
return message
测试代码
kafka_test.py
from kafka_helper import KProducer,KConsumer
import json
def sync_send_test(bootstrap_servers,topic,json_format=True):
value = {
"send_type": "sync_send",
"name": "lady_killer",
"age": 18
}
if json_format:
p = KProducer(bootstrap_servers=bootstrap_servers)
p.sync_send(value,topic)
else:
p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None)
v = bytes('{}'.format(json.dumps(value)), 'utf-8')
p.sync_send(v,topic)
p.close()
def async_send_test(bootstrap_servers,topic,json_format=True):
value = {
"send_type": "async_send",
"name":"lady_killer",
"age":18
}
if json_format:
p = KProducer(bootstrap_servers=bootstrap_servers)
p.asyn_send(value,topic)
else:
p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None)
v = bytes('{}'.format(json.dumps(value)), 'utf-8')
p.asyn_send(v,topic)
p.close()
def consumer_test(bootstrap_servers,topic):
c = KConsumer(bootstrap_servers=bootstrap_servers,topic=topic,group_id='test',auto_offset_reset="earliest")
for data in c.get_consumer():
print(type(data.value),data.value)
print(json.loads(data.value))
def get_one_msg(bootstrap_servers,topic,partition,offset):
c = KConsumer(bootstrap_servers=bootstrap_servers, topic=topic, group_id='test', auto_offset_reset="earliest")
msg = c.get_message_by_partition_offset(partition,offset)
print(msg)
if __name__ == '__main__':
bootstrap_servers = ["kafka:9092"]
topic = "demodata"
# 测试生产
sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic)
async_send_test(bootstrap_servers=bootstrap_servers,topic=topic)
sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False)
async_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False)
# 测试消费
consumer_test(bootstrap_servers=bootstrap_servers,topic=topic)
# get_one_msg(bootstrap_servers=bootstrap_servers,topic=topic,partition=0,offset=0)
参考
版权声明:本文为lady_killer9原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。