use*_*114 20 python mmap numpy multiprocessing
我正在使用Python的multiprocessing
模块并行处理大型numpy数组.numpy.load(mmap_mode='r')
在主进程中使用内存映射数组.在那之后,multiprocessing.Pool()
分叉过程(我推测).
一切似乎都很好,除了我得到的行:
AttributeError("'NoneType' object has no attribute 'tell'",)
in `<bound method memmap.__del__ of
memmap([ 0.57735026, 0.57735026, 0.57735026, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ], dtype=float32)>`
ignored
Run Code Online (Sandbox Code Playgroud)
在unittest日志中.尽管如此,测试仍然没有通过.
知道那里发生了什么吗?
使用Python 2.7.2,OS X,NumPy 1.6.1.
更新:
经过一些调试后,我把原因归结为一个代码路径,该代码路径使用这个内存映射的numpy数组(一小部分)作为Pool.imap
调用的输入.
显然,"问题"是multiprocessing.Pool.imap
通过将输入传递给新进程的方式:它使用pickle.这不适用于mmap
ed numpy数组,而内部的某些内容会导致错误.
我发现Robert Kern的回复似乎解决了同样的问题.他建议为imap
输入来自内存映射数组时创建一个特殊的代码路径:在生成的进程中手动映射同一个数组.
这将是如此复杂和丑陋,我宁愿忍受错误和额外的内存副本.有没有其他方法可以更轻松地修改现有代码?
Joe*_*ton 22
我通常的方法(如果你可以使用额外的内存副本)是在一个进程中执行所有IO,然后将事情发送到工作线程池.要将一个memmapped数组的片段加载到内存中x = np.array(data[yourslice])
(data[yourslice].copy()
实际上并没有这样做,这可能会导致一些混淆.).
首先,让我们生成一些测试数据:
import numpy as np
np.random.random(10000).tofile('data.dat')
Run Code Online (Sandbox Code Playgroud)
您可以使用以下内容重现错误:
import numpy as np
import multiprocessing
def main():
data = np.memmap('data.dat', dtype=np.float, mode='r')
pool = multiprocessing.Pool()
results = pool.imap(calculation, chunks(data))
results = np.fromiter(results, dtype=np.float)
def chunks(data, chunksize=100):
"""Overly-simple chunker..."""
intervals = range(0, data.size, chunksize) + [None]
for start, stop in zip(intervals[:-1], intervals[1:]):
yield data[start:stop]
def calculation(chunk):
"""Dummy calculation."""
return chunk.mean() - chunk.std()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
如果你只是改用屈服np.array(data[start:stop])
,你就可以解决问题:
import numpy as np
import multiprocessing
def main():
data = np.memmap('data.dat', dtype=np.float, mode='r')
pool = multiprocessing.Pool()
results = pool.imap(calculation, chunks(data))
results = np.fromiter(results, dtype=np.float)
def chunks(data, chunksize=100):
"""Overly-simple chunker..."""
intervals = range(0, data.size, chunksize) + [None]
for start, stop in zip(intervals[:-1], intervals[1:]):
yield np.array(data[start:stop])
def calculation(chunk):
"""Dummy calculation."""
return chunk.mean() - chunk.std()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
当然,这确实为每个块提供了额外的内存副本.
从长远来看,你可能会发现从memmap文件切换到HDF更容易.如果您的数据是多维的,则尤其如此.(我推荐h5py
,但pyTables
如果你的数据是"像桌子一样",那就太好了.)
祝你好运,无论如何!
归档时间: |
|
查看次数: |
6875 次 |
最近记录: |