Bre*_*len 11 python multiprocessing
当我使用生成器作为 multiprocessing.Pool.map 函数的可迭代参数时:
pool.map(func, iterable=(x for x in range(10)))
Run Code Online (Sandbox Code Playgroud)
似乎发电机在func被调用之前已经完全耗尽。
我想产生每个项目并将其传递给每个进程,谢谢
tde*_*ney 18
multiprocessing.map__len__在处理之前将没有方法的可迭代对象转换为列表。这样做是为了帮助计算块大小,池使用它来对工作参数进行分组并减少调度作业的往返成本。这不是最优的,尤其是当 chunksize 为 1 时,但由于map必须以一种或另一种方式耗尽迭代器,因此它通常不是一个重要问题。
相关代码在pool.py. 注意它的使用len:
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
'''
Helper function to implement map, starmap and their async counterparts.
'''
if self._state != RUN:
raise ValueError("Pool not running")
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
Run Code Online (Sandbox Code Playgroud)
唉,这并没有明确的定义。这是我在 Python 3.6.1 下运行的测试用例:
import multiprocessing as mp
def e(i):
if i % 1000000 == 0:
print(i)
if __name__ == '__main__':
p = mp.Pool()
def g():
for i in range(100000000):
yield i
print("generator done")
r = p.map(e, g())
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
您看到的第一件事是“生成器完成”消息,并且峰值内存使用量非常高(正如您怀疑的那样,正是因为生成器在传递任何工作之前就已耗尽)。
但是,map()请像这样替换调用:
r = list(p.imap(e, g()))
Run Code Online (Sandbox Code Playgroud)
现在内存使用量仍然很小,并且“生成器完成”出现在输出端。
然而,您不会等待足够长的时间来看到这一点,因为它非常慢:-( imap()不仅将该可迭代视为可迭代,而且有效地一次仅跨进程边界传递 1 个项目。为了也恢复速度,这可以工作:
r = list(p.imap(e, g(), chunksize=10000))
Run Code Online (Sandbox Code Playgroud)
在现实生活中,我更有可能迭代imap()(或imap_unordered()) 结果,而不是将其强制放入列表中,然后循环结果的内存使用量也很小。
| 归档时间: |
|
| 查看次数: |
6743 次 |
| 最近记录: |