小编Dev*_*ven的帖子

Python Kafka多进程与线程

我可以用来KafkaConsumer在单独的线程中使用消息。

但是,当我使用multiprocessing.Process而不是时threading.Thread,出现错误:

OSError: [Errno 9] Bad file descriptor

问题文档表明使用多处理并行使用消息是可能的。有人可以分享一个有效的例子吗?

编辑

这是一些示例代码。抱歉,原始代码太复杂了,因此我在这里创建了一个示例,希望可以传达正在发生的事情。如果我使用threading.Thread而不是,此代码可以正常工作multiprocessing.Process

from multiprocessing import Process

class KafkaWrapper():
    def __init__(self):
        self.consumer = KafkaConsumer(bootstrap_servers='my.server.com')

    def consume(self, topic):
        self.consumer.subscribe(topic)
        for message in self.consumer:
            print(message.value)

class ServiceInterface():
    def __init__(self):
        self.kafka_wrapper = KafkaWrapper()

    def start(self, topic):
        self.kafka_wrapper.consume(topic)

class ServiceA(ServiceInterface):
    pass

class ServiceB(ServiceInterface):
    pass


def main():

    serviceA = ServiceA()
    serviceB = ServiceB()

    jobs=[]
    # The code works fine if I used threading.Thread …
Run Code Online (Sandbox Code Playgroud)

python python-multithreading apache-kafka python-multiprocessing

2
推荐指数
1
解决办法
2692
查看次数