具有多处理功能的Python itertools - 巨大的列表与使用迭代器的低效CPU使用率

xis*_*one 7 python multithreading python-itertools

我处理n个元素(在下面命名为"pair")变体,重复用作我函数的参数.显然,只要"r"列表不足以消耗所有内存,一切都可以正常工作.问题是我必须最终为6个元素重复16次.我在云端使用40核心系统.

代码看起来如下所示:

if __name__ == '__main__':
  pool = Pool(39)
  r = itertools.product(pairs,repeat=16)
  pool.map(f, r)
Run Code Online (Sandbox Code Playgroud)

我相信我应该使用迭代器而不是预先创建巨大的列表,这里问题开始..

我尝试使用以下代码解决问题:

if __name__ == '__main__':
  pool = Pool(39)
  for r in itertools.product(pairs,repeat=14):
    pool.map(f, r)
Run Code Online (Sandbox Code Playgroud)

内存问题消失了,但每个内核的CPU使用率是5%.现在,代码的单核版本比这更快.

如果你能引导我一点,我真的很感激..

谢谢.

Sha*_*ger 5

原始代码不是list在您自己的代码中创建前期代码(itertools.product返回一个生成器),而是pool.map实现了整个生成器(因为它假设如果您可以存储所有输出,那么您也可以存储所有输入)。

不要pool.map在这里使用。如果需要有序的结果,请使用pool.imap,或者如果结果的顺序不重要,请使用pool.imap_unordered。迭代任一调用的结果(不要包装list),并在结果出现时进行处理,内存不成问题:

if __name__ == '__main__':
    pool = Pool(39)
    for result in pool.imap(f, itertools.product(pairs, repeat=16)):
        print(result)
Run Code Online (Sandbox Code Playgroud)

如果您要使用pool.map副作用,则只需要运行它即可完成,但结果和顺序无关紧要,可以使用imap_unorderedcollections.deque有效地消耗掉“结果”而无需实际存储任何内容,从而可以显着提高性能(一个dequemaxlen0是最快的,最低存储器办法迫使一个迭代,而不存储结果运行到完成):

from collections import deque

if __name__ == '__main__':
    pool = Pool(39)
    deque(pool.imap_unordered(f, itertools.product(pairs, repeat=16)), 0)
Run Code Online (Sandbox Code Playgroud)

最后,我有点怀疑要指定39个Pool工人。multiprocessing对于CPU绑定的任务非常有利;如果您使用的工人数多于拥有CPU内核的工人并获得收益,那么multiprocessingIPC付出的代价可能会超过获得的收益,而使用更多的工人只是通过缓冲更多数据来掩盖问题。

如果您的工作在很大程度上受I / O约束,则可以尝试使用基于线程的池,这将避免酸洗和酸洗的开销以及父子进程之间的IPC成本。与基于进程的池不同,Python线程受GIL问题的约束,因此您的CPU绑定工作在Python中(不包括GIL释放对I / O的ctypes调用,对.dll / .so文件的调用以及某些第三方扩展,例如numpy释放用于大量CPU工作的GIL)仅限于一个内核(在用于CPU绑定工作的Python 2.x中,您经常浪费大量的解决GIL争用并执行上下文切换; Python 3消除了大部分浪费) 。但是,如果您的工作在很大程度上受到I / O的限制,则对I / O的阻塞会释放GIL以允许其他线程运行,因此,只要其中大多数延迟在I / O上,您就可以拥有许多线程。切换也很容易(只要您没有通过假设您可以写入“共享”状态并且不影响其他worker或父进程来设计程序依赖于每个worker的单独地址空间),只需更改:

from multiprocessing import Pool
Run Code Online (Sandbox Code Playgroud)

至:

from multiprocessing.dummy import Pool
Run Code Online (Sandbox Code Playgroud)

multiprocessing.dummy根据线程而不是进程获取池的版本。