Python Kafka多进程与线程

Dev*_*ven 2 python python-multithreading apache-kafka python-multiprocessing

我可以用来KafkaConsumer在单独的线程中使用消息。

但是,当我使用multiprocessing.Process而不是时threading.Thread,出现错误:

OSError: [Errno 9] Bad file descriptor

问题文档表明使用多处理并行使用消息是可能的。有人可以分享一个有效的例子吗?

编辑

这是一些示例代码。抱歉,原始代码太复杂了,因此我在这里创建了一个示例,希望可以传达正在发生的事情。如果我使用threading.Thread而不是,此代码可以正常工作multiprocessing.Process

from multiprocessing import Process

class KafkaWrapper():
    def __init__(self):
        self.consumer = KafkaConsumer(bootstrap_servers='my.server.com')

    def consume(self, topic):
        self.consumer.subscribe(topic)
        for message in self.consumer:
            print(message.value)

class ServiceInterface():
    def __init__(self):
        self.kafka_wrapper = KafkaWrapper()

    def start(self, topic):
        self.kafka_wrapper.consume(topic)

class ServiceA(ServiceInterface):
    pass

class ServiceB(ServiceInterface):
    pass


def main():

    serviceA = ServiceA()
    serviceB = ServiceB()

    jobs=[]
    # The code works fine if I used threading.Thread here instead of Process
    jobs.append(Process(target=serviceA.start, args=("my-topic",)))
    jobs.append(Process(target=serviceB.start, args=("my-topic",)))

    for job in jobs:
        job.start()

    for job in jobs:
        job.join()

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

这是我看到的错误(再次,我的实际代码与上面的示例不同,如果使用,它可以正常工作,threading.Thread但是如果使用,它可以正常工作multiprocessing.Process):

File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "service_interface.py", line 58, in start
    self._kafka_wrapper.start_consuming(self.service_object_id)
  File "kafka_wrapper.py", line 141, in start_consuming
    for message in self._consumer:
  File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__
    return next(self._iterator)
  File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator
    self._client.poll(timeout_ms=poll_ms, sleep=True)
  File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll
    responses.extend(self._poll(timeout, sleep=sleep))
  File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll
    ready = self._selector.select(timeout)
  File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select
    kev_list = self._kqueue.control(None, max_ev, timeout)
  File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "service_interface.py", line 58, in start
    self._kafka_wrapper.start_consuming(self.service_object_id)
  File "kafka_wrapper.py", line 141, in start_consuming
    for message in self._consumer:
  File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__
    return next(self._iterator)
  File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator
    self._client.poll(timeout_ms=poll_ms, sleep=True)
  File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll
    responses.extend(self._poll(timeout, sleep=sleep))
OSError: [Errno 9] Bad file descriptor
  File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll
    ready = self._selector.select(timeout)
  File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select
    kev_list = self._kqueue.control(None, max_ev, timeout)
OSError: [Errno 9] Bad file descriptor
Run Code Online (Sandbox Code Playgroud)

Jac*_*cky 5

Kafka使用者可以是多进程或多线程的(确保正确使用的客户端库支持Kafka的早期版本中所需要的Kafka使用者组),选择取决于您。

但是,如果我们要使用进程,则Kafka客户端库需要做一些事情,以确保自身派生安全,所使用的基础TCP连接(连接到Kafka服务器)不应由多个进程共享。这就是为什么出现连接错误的原因。

解决方法是,不应在生成进程之前创建KafkaConsumer。而是将操作移到每个进程中。

另一种方法是使用单个线程/进程获取消息,并使用额外的进程池来执行实际操作。