我可以用来KafkaConsumer在单独的线程中使用消息。
但是,当我使用multiprocessing.Process而不是时threading.Thread,出现错误:
OSError: [Errno 9] Bad file descriptor
该问题和文档表明使用多处理并行使用消息是可能的。有人可以分享一个有效的例子吗?
编辑
这是一些示例代码。抱歉,原始代码太复杂了,因此我在这里创建了一个示例,希望可以传达正在发生的事情。如果我使用threading.Thread而不是,此代码可以正常工作multiprocessing.Process。
from multiprocessing import Process
class KafkaWrapper():
def __init__(self):
self.consumer = KafkaConsumer(bootstrap_servers='my.server.com')
def consume(self, topic):
self.consumer.subscribe(topic)
for message in self.consumer:
print(message.value)
class ServiceInterface():
def __init__(self):
self.kafka_wrapper = KafkaWrapper()
def start(self, topic):
self.kafka_wrapper.consume(topic)
class ServiceA(ServiceInterface):
pass
class ServiceB(ServiceInterface):
pass
def main():
serviceA = ServiceA()
serviceB = ServiceB()
jobs=[]
# The code works fine if I used threading.Thread …Run Code Online (Sandbox Code Playgroud) python python-multithreading apache-kafka python-multiprocessing