如何将生成器用作具有多处理映射功能的可迭代对象

Bre*_*len 11 python multiprocessing

当我使用生成器作为 multiprocessing.Pool.map 函数的可迭代参数时:

pool.map(func, iterable=(x for x in range(10)))
Run Code Online (Sandbox Code Playgroud)

似乎发电机在func被调用之前已经完全耗尽。

我想产生每个项目并将其传递给每个进程,谢谢

tde*_*ney 18

multiprocessing.map__len__在处理之前将没有方法的可迭代对象转换为列表。这样做是为了帮助计算块大小,池使用它来对工作参数进行分组并减少调度作业的往返成本。这不是最优的,尤其是当 chunksize 为 1 时,但由于map必须以一种或另一种方式耗尽迭代器,因此它通常不是一个重要问题。

相关代码在pool.py. 注意它的使用len

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
        error_callback=None):
    '''
    Helper function to implement map, starmap and their async counterparts.
    '''
    if self._state != RUN:
        raise ValueError("Pool not running")
    if not hasattr(iterable, '__len__'):
        iterable = list(iterable)

    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0
Run Code Online (Sandbox Code Playgroud)


Tim*_*ers 6

唉,这并没有明确的定义。这是我在 Python 3.6.1 下运行的测试用例:

import multiprocessing as mp

def e(i):
    if i % 1000000 == 0:
        print(i)

if __name__ == '__main__':
    p = mp.Pool()
    def g():
        for i in range(100000000):
            yield i
        print("generator done")
    r = p.map(e, g())
    p.close()
    p.join()
Run Code Online (Sandbox Code Playgroud)

您看到的第一件事是“生成器完成”消息,并且峰值内存使用量非常高(正如您怀疑的那样,正是因为生成器在传递任何工作之前就已耗尽)。

但是,map()请像这样替换调用:

r = list(p.imap(e, g()))
Run Code Online (Sandbox Code Playgroud)

现在内存使用量仍然很小,并且“生成器完成”出现在输出端。

然而,您不会等待足够长的时间来看到这一点,因为它非常慢:-( imap()不仅将该可迭代视为迭代,而且有效地一次仅跨进程边界传递 1 个项目。为了也恢复速度,这可以工作:

r = list(p.imap(e, g(), chunksize=10000))
Run Code Online (Sandbox Code Playgroud)

在现实生活中,我更有可能迭代imap()(或imap_unordered()) 结果,而不是将其强制放入列表中,然后循环结果的内存使用量也很小。

  • 我所说的“定义不明确”只是意味着_没有_可以从文档中推断出这一点 - 这是实现细节的后果。因此,不能保证“map()”将始终是急切的,或者“imap()”将始终是惰性的。但这就是他们现在的实际行为。 (2认同)