我可以使用KafkaConsumer在不同的线程中使用消息。在
但是,当我使用multiprocessing.Process而不是threading.Thread时,我得到一个错误:
OSError: [Errno 9] Bad file descriptor
这question和{a2}表明使用多处理并行地消费消息是可能的。有人能分享一个有效的例子吗?在
编辑
下面是一些示例代码。很抱歉,原始代码太复杂了,所以我在这里创建了一个示例,希望能传达正在发生的事情。如果我使用threading.Thread而不是multiprocessing.Process,那么这段代码可以正常工作。在from multiprocessing import Process
class KafkaWrapper():
def __init__(self):
self.consumer = KafkaConsumer(bootstrap_servers=’my.server.com’)
def consume(self, topic):
self.consumer.subscribe(topic)
for message in self.consumer:
print(message.value)
class ServiceInterface():
def __init__(self):
self.kafka_wrapper = KafkaWrapper()
def start(self, topic):
self.kafka_wrapper.consume(topic)
class ServiceA(ServiceInterface):
<