python3实现一个kafka的consumer和producer

  • Post author:
  • Post category:python

一.先大概了解下kafka的原理(转载,是我看到的比较通俗易懂的版本了)

1.消息队列分类

(1)点对点

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。这里要注意:

消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。

Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

(2)发布/订阅

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

2.kafka介绍

kafka是一个分布式的、分区的、多副本的、多订阅者的日志系统(分布式消息队列)。可同时支持点对点模式的消息队列和发布/订阅模式的消息队列。

kafka角色术语:

  • Broker:一台kafka服务器就是一个broker。一个集群由多个broker组成
  • Topic:消息队列,不同的消息会被发送至不同的队列当中
  • Producer:消息生产者,就是向kafka broker发消息的客户端
  • Consumer:消息消费者,从kafka broker取消息的客户端
  • Consumer Group(CG):这是kafka用于实现一个topic消息广播(发给所有的consumer)和单播(发给某一个consumer)的手段。一个topic可以有多个CG,topic的每一条消息都会发送给每个CG,但CG只会把消息发送给该CG中的一个consumer。如果需要实现广播,只要将每个consumer配置一个独立的CG即可。而要实现单播则只要将所有的consumer放至同一个CG即可。用CG还可以将consumer进行自由的分组而不需要producer多次发送消息到不同的topic
  • Partition:Partition是物理上的概念。为了实现扩展性,一个非常大的topic可以分布到多个broker上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition的顺序将消息发给consumer,不保证一个topic的整体顺序
  • Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找2049的位置,只要找到2048.kafka文件即可。第一个位置为00000000000.kafka

kafka特性:

  • 提供数据持久化,消息顺序写入磁盘,提升机械盘的读写性能
  • 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息
  • 通过多副本的方式防止消息丢失
  • 支持消息的同步和异步发送
  • 消费状态保存在客户端
  • 数据迁移、扩容对用户透明
  • 定期删除机制,支持设定partitions的segment file保留时间。

Topic与Partition的关系

Kafka中的topic是以partition的形式存放的,每个topic都可以设置它的partition数量,推荐partition的数量要大于同时运行的consumer的数量,也建议partition的数量大于集群broker的数量,这样消息数据就可以均匀的分布在各个broker中。

在存储结构上,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。parition命名规则为topic名称+序号,每一个partition序号从0开始,序号最大值为partitions数量减1。

每个partition中有多个大小相等的segment数据文件,每个segment的大小是相同的,但每个消息的大小可能不同,因此segment数据文件中消息的数量可能不相等。segement数据文件有两部分组成,分别为index file和data file,此两个文件是一一对应,对成出现,后缀分别为”.index”和”.log”。

每个patition有自己的replica,每个replica分布在不同的broker节点上,多个partition需要选举出leader partition,leader负责读写,并由zk负责fil over。

一个Topic配置多个patition可以将消息内容分散存放到多个broker上,这样就可以避免文件尺寸达到单机磁盘的上限,同时还可以保证消息存储、消费的效率,因为更多的patitions可以容纳更多的consumer,可有效提升kafka的吞吐率。

Consumer与Topic的关系

kafka作为分布式的消息系统支持多个producer和多个consumer,producer可以将消息分布到集群中不同节点的不同patition上,consumer也可以消费多个节点上的多个patition。在写消息时允许多个producer写到同一个partition中,但是读消息时,一个partition只允许被一个consumer group中的一个consumer所消费。而一个consumer可以消费多个patition。也就是说同一个consumer group下的consumer对partition是互斥的,而不同consumer group之间则是共享的。

通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高”故障容错”性,如果group中的某个consumer失效,那么其消费的partitions将会有其他consumer自动接管。而对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息

 

二.用python3实现一个producer(填你们自己需要的信息就好啦)

import json
from kafka import KafkaProducer
from kafka.errors import KafkaError


class KafkaClient(object):
    def __init__(self):
        self.sender = None
        self.config = None

    @staticmethod
    def on_send_success(record_metadata):
        #如果消息成功写入Kafka,broker将返回RecordMetadata对象(包含topic,partition和offset);
        print("Success:[{0}] send success".format(record_metadata))

    @staticmethod
    def on_send_error(excp):
        #相反,broker将返回error。这时producer收到error会尝试重试发送消息几次,直到producer返回error。
        print("INFO" + "Fail:send fail cause:{0}".format(excp))

    def product(self,  kafka_config):
        #设置配置信息
        self.config = kafka_config
        #创建一个生产者
        self.sender = KafkaProducer(**self.config)

    def send(self, topic, value=None, key=None):
        #必须包含Topic和Value,key和partition可选。然后,序列化key和value对象为ByteArray,并发送到网络。
        future = self.sender.send(topic, value=value, key=key)
        try:
            record_metadata = future.get(timeout=10)
            self.on_send_success(record_metadata)
        except KafkaError as e:
            self.on_send_error(e)


if __name__ == '__main__':
    bootstrap_servers = ["", "", ""]
    sasl_plain_password = ""
    sasl_plain_username = ""
    kafka_config={"bootstrap_servers":bootstrap_servers,
                    "security_protocol":"SASL_PLAINTEXT",
                    "sasl_mechanism":'PLAIN',
                    "sasl_plain_username":sasl_plain_username,
                    "sasl_plain_password":sasl_plain_password,
                    "key_serializer":None,
                    "value_serializer":None,
                    "acks":0,
                    "compression_type":None,
                    "retries":0,
                    "batch_size":16384,
                    "linger_ms":0}

    data = json.dumps({
        "": ""
    })

    topic = ''

    client = KafkaClient()
    client.product(kafka_config)
    client.send(topic, value=bytes(data, encoding = "utf8"), key=None)

二.用python3实现一个consumer


import json
import threading
import time
import queue
from confluent_kafka.cimpl import Consumer, KafkaError
from kafka import KafkaConsumer



def kafka_consumer(topic, bootstrap_servers, sasl_plain_username, sasl_plain_password):
    consumer = KafkaConsumer(topic,
                             group_id='group-ha',
                             bootstrap_servers=bootstrap_servers,
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='PLAIN',
                             sasl_plain_username=sasl_plain_username,
                             sasl_plain_password=sasl_plain_password)
    for msg in consumer:
        time.sleep(5)
        consumer.commit()
        try:
            print(msg)
            data_in_kafka = json.loads(msg.value)
            print("=====json_in_kafka====", msg.value)
            print(data_in_kafka)
        except ValueError:
            print("=====msg_in_kafka====", data_in_kafka)
    print('==timeout')
    consumer.close()
    return data_in_kafka

 

 

 

 


版权声明:本文为qq_30758629原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。