Python ZMQ和多处理导致zmq.error.ZMQError:系统调用中断

Mar*_*vin 5 python multiprocessing zeromq pyzmq

我有一个Python脚本,在其中绑定了几个(例如5个)ZMQ接收器套接字,如下所示:

receiver_1 = context.socket(zmq.PULL)
receiver_1.bind("tcp://*:5555")
...
receiver_5 = context.socket(zmq.PULL)
receiver_5.bind("tcp://*:5559")

receivers = [receiver_1, ..., receiver_5]
Run Code Online (Sandbox Code Playgroud)

然后,我启动一些Google Compute Engine实例,它们连接相应的发送方套接字。

我想并行地从这些套接字中提取数据,因此我尝试使用多处理池来实现。代码看起来像这样:

def recv_result(i):
    result_str = receivers[i].recv()
    return cPickle.loads(result_str)

pool = multiprocessing.Pool()
while True:
    results = pool.map(recv_result, [i for i in range(len(receivers))])
    # break when all results have been received
    ...
Run Code Online (Sandbox Code Playgroud)

运行脚本时出现的错误如下所示:

Traceback (most recent call last):
  ...
  File ...
    results = pool.map(recv_result, [i for i in range(len(receivers))])
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 227, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 528, in get
    raise self._value
zmq.error.ZMQError: Interrupted system call
Run Code Online (Sandbox Code Playgroud)

我也尝试过使用multiprocessing.Process实现相同的功能,但是我得到的错误基本上相同,尽管杂乱得多。

我正在尝试做的是更有效地接收来自GCE实例的所有结果,因为我发现这是脚本的瓶颈(在我当前的工作实现中,我只有一个接收器套接字,并且依次接收所有GCE实例的结果)。如果有人可以指出我当前代码中的错误,或者对实现目标的更好方法的建议,将不胜感激!

Jan*_*sky 2

一些提示:

  • 很好,你使用 ZeroMQ - 它可以为你做很多事情,而无需编写太多代码
  • 不要过度优化。您不会通过 ZeroMQ 通信的多处理/线程获得任何好处,它已经非常快并且能够交换令人难以置信的大量消息
  • 如果使用线程/多处理,切勿共享 zmq 上下文,它必须是线程私有的,否则会中断。这可能是您异常的原因。
  • 如果当前代码使用阻塞 Zeromq 发送和接收,请将其更改为非阻塞。检查如何使用轮询。