python 多进程 消费kafka_Python-Kafka多进程vs线程

  • Post author:
  • Post category:python


我可以使用KafkaConsumer在不同的线程中使用消息。在

但是,当我使用multiprocessing.Process而不是threading.Thread时,我得到一个错误:

OSError: [Errno 9] Bad file descriptor

这question和{a2}表明使用多处理并行地消费消息是可能的。有人能分享一个有效的例子吗?在

编辑

下面是一些示例代码。很抱歉,原始代码太复杂了,所以我在这里创建了一个示例,希望能传达正在发生的事情。如果我使用threading.Thread而不是multiprocessing.Process,那么这段代码可以正常工作。在from multiprocessing import Process

class KafkaWrapper():

def __init__(self):

self.consumer = KafkaConsumer(bootstrap_servers=’my.server.com’)

def consume(self, topic):

self.consumer.subscribe(topic)

for message in self.consumer:

print(message.value)

class ServiceInterface():

def __init__(self):

self.kafka_wrapper = KafkaWrapper()

def start(self, topic):

self.kafka_wrapper.consume(topic)

class ServiceA(ServiceInterface):

<



版权声明:本文为weixin_39938746原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。