“多处理”与“concurrent.futures”中的最大工人数量

A. *_*dry 5 python multiprocessing concurrent.futures

在 Python 3.8 中,concurrent.futures.ProcessPoolExecutor已更新为将 Windows 上可使用的最大工作进程(进程)数量限制为 61。有关原因,请参阅thisthis,但据我了解:

  • 在 Windows 上,multiprocessing调用 Windows API 函数WaitForMultipleObjects,该函数用于等待进程完成。它最多可以等待 63 个对象,减去结果队列读取器和线程唤醒读取器,因此有 61 个限制。(即Windows 使用每个进程一个线程来跟踪进程)。

(另请参阅此问题

multiprocessing但是,仍然使用os.cpu_count(). Value Error它一开始抛出 a ,但随后继续并使用 100% 的 CPU 核心。例如,

Exception in thread Thread-N:
Traceback (most recent call last):
  File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\threading.py", line 932, in _bootstrap_inner
    self.run()
  File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\pool.py", line 519, in _handle_workers       
    cls._wait_for_updates(current_sentinels, change_notifier)
  File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\pool.py", line 499, in _wait_for_updates     
    wait(sentinels, timeout=timeout)
  File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\connection.py", line 879, in wait
    ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
  File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\connection.py", line 811, in _exhaustive_wait
    res = _winapi.WaitForMultipleObjects(L, False, timeout)
ValueError: need at most 63 handles, got a sequence of length 98
Run Code Online (Sandbox Code Playgroud)

我的机器有 96 个核心。这个“错误”真的是一个错误吗?如果不是,我是否应该只使用该multiprocessing模块而不是该concurrent.futures模块,这将我的CPU使用率限制为61个核心?

编辑:我怀疑这是一个错误,因为我假设multiprocess将继续等待引发错误的进程完成。如果我不限制核心数量,似乎就会发生这种情况(程序在 CPU 使用率下降后就会挂起)。不过,我不确定是否真的如此。

Boo*_*boo 2

你的问题提得很好。查看代码,这似乎是一个不可恢复的错误。但在我看来,无法理解的是,在 Windows 下,会有代码将ThreadPoolExecutor池大小限制为 61,而不对类强制执行该限制multiprocessing.Pool。无论如何,使用以下程序进行检查应该很容易。如果没有打印完成!并且挂起,我想说肯定存在问题,如果您使用以下命令,则应该明确限制池大小multiprocessing.Pool

import multiprocessing

def worker(x):
    return x ** 2

def main():
    pool = multiprocessing.Pool(96)
    results = pool.map(worker, range(96))
    assert len(results) == 96
    pool.close()
    pool.join()
    print('Done!')

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

但您的程序挂起的事实相当确定上述程序将挂起,我怀疑您甚至不会看到该assert声明。无论哪种方式,使用大于 61 的池大小都是不可靠的。