结合itertools和多处理?

dig*_*ngo 12 python multiprocessing python-itertools

我有一个256x256x256Numpy数组,其中每个元素都是一个矩阵.我需要对这些矩阵中的每一个进行一些计算,并且我想使用该multiprocessing模块来加快速度.

这些计算的结果必须存储在256x256x256与原始数组相同的数组中,以便[i,j,k]原始数组中元素的矩阵结果必须放在[i,j,k]新数组的元素中.

为此,我想制作一个可以伪方式写入的列表,[array[i,j,k], (i, j, k)]并将其传递给一个"多处理"的函数.假设这matrices是从原始数组中提取的所有矩阵的列表,并且myfunc是执行计算的函数,代码看起来有点像这样:

import multiprocessing
import numpy as np
from itertools import izip

def myfunc(finput):
    # Do some calculations...
    ...

    # ... and return the result and the index:
    return (result, finput[1])

# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)

# Make function input from the matrices and the indices:
finput = izip(matrices, inds)

pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))
Run Code Online (Sandbox Code Playgroud)

然而,它似乎map_async实际上是首先创建这个巨大的列表finput:我的CPU没有做太多,但内存和交换在几秒钟内完全消耗,这显然不是我想要的.

有没有办法将这个庞大的列表传递给多处理函数而无需先显式创建它?或者你知道另一种解决这个问题的方法吗?

谢谢你!:-)

unu*_*tbu 11

multiprocessing.Pool.map*调用函数后,所有方法都会完全消耗迭代器(演示代码).要一次为一个块提供迭代器的map函数块,请使用grouper_nofill:

def grouper_nofill(n, iterable):
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
    '''
    it=iter(iterable)
    def take():
        while 1: yield list(itertools.islice(it,n))
    return iter(take().next,[])

chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
    async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)
Run Code Online (Sandbox Code Playgroud)

PS.pool.map_asyncchunksize参数做了一些不同的事情:它将iterable分解为块,然后将每个块提供给调用的工作进程map(func,chunk).如果func(item)完成得太快,这可以为工作进程提供更多数据来咀嚼,但是在你的情况下它没有帮助,因为迭代器在map_async发出调用后仍然会立即完全消耗掉.