多处理队列已满

roh*_*anp 12 python parallel-processing multiprocessing concurrent.futures

我正在使用concurrent.futures来实现多处理.我得到一个队列.全错误,这是奇怪的,因为我只分配10个工作.

A_list = [np.random.rand(2000, 2000) for i in range(10)]

with ProcessPoolExecutor() as pool:
    pool.map(np.linalg.svd, A_list)
Run Code Online (Sandbox Code Playgroud)

错误:

Exception in thread Thread-9:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 921, in _bootstrap_inner
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 869, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/concurrent/futures/process.py", line 251, in _queue_management_worker
    shutdown_worker()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/concurrent/futures/process.py", line 209, in shutdown_worker
    call_queue.put_nowait(None)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 131, in put_nowait
    return self.put(obj, False)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 82, in put
    raise Full
queue.Full
Run Code Online (Sandbox Code Playgroud)

skr*_*sme 17

简短回答
我认为管道尺寸限制是根本原因.除了将数据分解成更小的块并迭代处理它们之外,您无法做很多事情.这意味着您可能需要找到一种新算法,该算法一次可以处理2000x2000阵列的一小部分,以找到奇异值组合.

细节
让我们马上就能得到一件事:你正在处理大量的信息.仅仅因为你只使用十件物品并不意味着它是微不足道的.这些项目中的每一个都是一个2000x2000阵列,其中包含4,000,000个浮点数,每个浮点数通常为64位,所以你看每个阵列大约244MB,加上Numpy的ndarrays中标记的其他数据.

ProcessPoolExecutor的工作原理是启动一个单独的线程管理工作进程.管理线程使用multiprocesing.Queue将作业传递给调用的worker _call_queue.这些multiprocessing.Queue实际上只是管道周围的花式包装,你试图传递给工人的ndarrays可能太大了,管道无法正常处理.

阅读Python问题8426表明,即使您可以查找操作系统的一些标称管道尺寸限制,也要确定管道的确切尺寸.有太多变量使它变得简单.即使从队列中拉出事物的顺序也会在底层管道中引发触发奇怪错误的竞争条件.

我怀疑你的一个工人正在从它上面得到一个不完整或被破坏的对象_call_queue,因为那个队列的管道里满是你的巨大物体.那个工人以不干净的方式死亡,工作队列管理器检测到这个失败,所以它放弃工作并告诉剩下的工人退出.但是,通过使它们做到这一点毒丸_call_queue,这是仍然充满了巨贵ndarrays的.这就是您获得完整队列异常的原因 - 您的数据填满了队列,然后管理线程尝试使用相同的队列将控制消息传递给其他工作者.

我认为这是在程序中不同实体之间混合数据和控制流的潜在危险的典型例子.您的大数据不仅会阻止更多数据被工作人员接收,还会阻止管理员与工作人员的控制通信,因为他们使用相同的路径.

我无法重现你的失败,所以我不能确定所有这些都是正确的.但是,你可以使这个代码使用200x200阵列(~2.5MB)这一事实似乎支持这一理论.标称管道大小限制似乎以KB或几MB为单位进行测量,具体取决于操作系统和体系结构.这一数据量可以通过管道这一事实并不令人惊讶,尤其是当您认为如果消费者不断接收数据时,并非所有2.5MB都需要同时实际适应管道.它建议您可以通过管道连续获得的数据量的合理上限.


luc*_*oli 7

我最近偶然发现了这一点,同时调试了一个python3.6程序,该程序通过管道发送各种GB的数据.这就是我发现的(希望它可以节省别人的时间!).

就像skrrgwasme所说,如果队列管理器在发送毒丸时无法获取信号量,则会引发队列完全错误.对信号量的获取调用是非阻塞的,它会导致管理器失败(由于数据和控制流共享同一个队列,它无法发送'control'命令).请注意,上面的链接指的是python 3.6.0

现在我想知道为什么我的队列管理员会发送毒丸.一定有其他一些失败!显然发生了一些异常(在父级的某些其他子进程中?),队列管理器试图清理并关闭所有子进程.在这一点上,我有兴趣找到这个根本原因.

调试根本原因

我最初尝试记录子进程中的所有异常,但显然没有发生明确的错误.来自第3895期:

请注意,当unpickle结果失败时,multiprocessing.Pool也会中断.

似乎多处理模块在py36中被破坏,因为它不会正确捕获和处理序列化错误.

不幸的是,由于时间限制,我没有设法自己复制和验证问题,而是倾向于跳转到操作点和更好的编程实践(不要通过管道发送所有数据:).这里有几个想法:

  1. 尝试挑选应该通过管道运行的数据.由于我的数据(数百GB)的巨大性质和时间限制,我无法找到哪些记录不可序列化.
  2. 将调试器放入python3.6并打印原始异常.

动作要点

  1. 如果可能,重新构建程序以通过管道发送更少的数据.

  2. 在阅读问题3895后,看起来问题出现了酸洗错误.另一种(和良好的编程实践)可以是使用不同的方式传输数据.例如,可以让子进程写入文件并将路径返回到父进程(这只是一个小字符串,可能是几个字节).

  3. 等待未来的python版本.显然,这是在问题3895的上下文中在python版本标记v3.7.0b3上修复的.在全部的异常将被处理内部shutdown_worker.在撰写本文时,Python的当前维护版本是3.6.5