Python中多处理的内存错误

pce*_*con 9 python memory multiprocessing

我正在尝试使用Python执行一些昂贵的科学计算.我必须读取存储在csv文件中的一堆数据然后进行处理.由于每个过程需要很长时间,我有8个处理器可供使用,我试图使用该Pool方法Multiprocessing.

这就是我构建多处理调用的方式:

    pool = Pool()
    vector_components = []
    for sample in range(samples):
        vector_field_x_i = vector_field_samples_x[sample]
        vector_field_y_i = vector_field_samples_y[sample]
        vector_component = pool.apply_async(vector_field_decomposer, args=(x_dim, y_dim, x_steps, y_steps,
                                                                           vector_field_x_i, vector_field_y_i))
        vector_components.append(vector_component)
    pool.close()
    pool.join()

    vector_components = map(lambda k: k.get(), vector_components)

    for vector_component in vector_components:
        CsvH.write_vector_field(vector_component, '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')
Run Code Online (Sandbox Code Playgroud)

我正在运行500个样本的数据集,大小等于100(x_dim)乘以100(y_dim).

在那之前一切正常.

然后我收到500个400 x 400样本的数据集.

运行时,我在调用时遇到错误get.

我还尝试运行400 x 400的单个样本并得到相同的错误.

Traceback (most recent call last):
  File "__init__.py", line 33, in <module>
    VfD.samples_vector_field_decomposer(samples, x_dim, y_dim, x_steps, y_steps, vector_field_samples_x, vector_field_samples_y)
  File "/export/home/pceccon/VectorFieldDecomposer/Sources/Controllers/VectorFieldDecomposerController.py", line 43, in samples_vector_field_decomposer
    vector_components = map(lambda k: k.get(), vector_components)
  File "/export/home/pceccon/VectorFieldDecomposer/Sources/Controllers/VectorFieldDecomposerController.py", line 43, in <lambda>
    vector_components = map(lambda k: k.get(), vector_components)
  File "/export/home/pceccon/.pyenv/versions/2.7.5/lib/python2.7/multiprocessing/pool.py", line 554, in get
    raise self._value
MemoryError
Run Code Online (Sandbox Code Playgroud)

我该怎么办?

先感谢您.

dan*_*ano 8

-现在你保持若干名单在内存中vector_field_x,vector_field_y,vector_components,然后一个单独的副本vector_components的过程map调用(这是当你真正耗尽内存).您可以vector_components通过使用pool.imap而不是pool.apply_async手动创建的列表来避免需要列表的副本.imap返回迭代器而不是完整列表,因此您永远不会将所有结果都存储在内存中.

通常,pool.map将传递给它的iterable打破为块,并将这些块发送到子进程,而不是一次发送一个元素.这有助于提高性能.因为imap使用迭代器而不是列表,所以它不知道您传递给它的可迭代的完整大小.在不知道迭代的大小的情况下,它不知道每个块有多大,所以它默认为chunksize1,这将起作用,但可能无法很好地执行.为了避免这种情况,你可以为它提供一个好的chunksize参数,因为你知道iterable是sample元素long.你的500元素列表可能没什么区别,但值得尝试.

这是一些演示所有这些的示例代码:

import multiprocessing
from functools import partial


def vector_field_decomposer(x_dim, y_dim, x_steps, y_steps, vector_fields):
    vector_field_x_i = vector_fields[0]
    vector_field_y_i = vector_fields[1]
    # Do whatever is normally done here.


if __name__ == "__main__":
    num_workers = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(num_workers)
    # Calculate a good chunksize (based on implementation of pool.map)
    chunksize, extra = divmod(samples // 4 * num_workers)
    if extra:
        chunksize += 1

    # Use partial so many arguments can be passed to vector_field_decomposer
    func = partial(vector_field_decomposer, x_dim, y_dim, x_steps, y_steps)
    # We use a generator expression as an iterable, so we don't create a full list.
    results = pool.imap(func, 
                        ((vector_field_samples_x[s], vector_field_samples_y[s]) for s in xrange(samples)),
                        chunksize=chunksize)
    for vector in results:
        CsvH.write_vector_field(vector_component, 
                                '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')
    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

应该可以让您避免这些MemoryError问题,但如果没有,您可以尝试imap在整个样本中运行较小的块,然后进行多次传递.我认为你不会有任何问题,因为你没有建立任何额外的列表,除了vector_field_*你开始的列表.