多处理IOError:错误的消息长度

And*_*ndy 21 python numpy pool multiprocessing ioerror

IOError: bad message length在向map函数传递大参数时得到了一个.我怎么能避免这个?我设置N=1500或更大时会发生错误.

代码是:

import numpy as np
import multiprocessing

def func(args):
    i=args[0]
    images=args[1]
    print i
    return 0

N=1500       #N=1000 works fine

images=[]
for i in np.arange(N):
    images.append(np.random.random_integers(1,100,size=(500,500)))

iter_args=[]
for i in range(0,1):
    iter_args.append([i,images])

pool=multiprocessing.Pool()
print pool
pool.map(func,iter_args)
Run Code Online (Sandbox Code Playgroud)

在文档中multiprocessingrecv_bytes一个引发IOError 的函数.可能是因为这个吗?(https://python.readthedocs.org/en/v2.7.2/library/multiprocessing.html)

编辑 如果我使用imagesnumpy数组而不是列表,我得到一个不同的错误:SystemError: NULL result without error in PyObject_Call.有点不同的代码:

import numpy as np
import multiprocessing

def func(args):
    i=args[0]
    images=args[1]
    print i
    return 0

N=1500       #N=1000 works fine

images=[]
for i in np.arange(N):
    images.append(np.random.random_integers(1,100,size=(500,500)))
images=np.array(images)                                            #new

iter_args=[]
for i in range(0,1):
    iter_args.append([i,images])

pool=multiprocessing.Pool()
print pool
pool.map(func,iter_args)
Run Code Online (Sandbox Code Playgroud)

EDIT2我使用的实际功能是:

def func(args):
    i=args[0]
    images=args[1]
    image=np.mean(images,axis=0)
    np.savetxt("image%d.txt"%(i),image)
    return 0
Run Code Online (Sandbox Code Playgroud)

此外,iter_args不包含相同的图像集:

iter_args=[]
for i in range(0,1):
    rand_ind=np.random.random_integers(0,N-1,N)
    iter_args.append([i,images[rand_ind]])
Run Code Online (Sandbox Code Playgroud)

Jos*_*edy 11

您正在创建一个池并将所有图像一次性发送到func().如果你可以同时处理单个图像,尝试这样的事情,在35秒内使用Python 2.7.10为N = 10000完成运行:

import numpy as np
import multiprocessing

def func(args):
    i = args[0]
    img = args[1]
    print "{}: {} {}".format(i, img.shape, img.sum())
    return 0

N=10000

images = ((i, np.random.random_integers(1,100,size=(500,500))) for i in xrange(N))
pool=multiprocessing.Pool(4)
pool.imap(func, images)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

这里的关键是使用迭代器,因此您不必一次将所有数据保存在内存中.例如,我将包含所有数据的数组中的图像转换为生成器表达式,以便仅在需要时创建图像.你可以修改它来从磁盘或其他任何东西加载你的图像.我还使用了pool.imap而不是pool.map.

如果可以,请尝试在worker函数中加载图像数据.现在,您必须序列化所有数据并将其发送到另一个流程.如果您的图像数据较大,这可能是一个瓶颈.

[现在我们知道func必须立即处理所有图像]

你可以对你的图像做一个迭代的意思.这是一个不使用多处理的解决方案.要使用多处理,您可以将图像划分为块,并将这些块存储到池中.

import numpy as np

N=10000
shape = (500,500)

def func(images):
    average = np.full(shape, 0)
    for i, img in images:
        average += img / N
    return average

images = ((i, np.full(shape,i)) for i in range(N))

print func(images)
Run Code Online (Sandbox Code Playgroud)


And*_*ndy 1

这就是解决问题的方法:将图像声明为全局的。

import numpy as np
import multiprocessing


N=1500       #N=1000 works fine

images=[]
for i in np.arange(N):
    images.append(np.random.random_integers(1,100,size=(500,500)))

def func(args):
    i=args[0]
    images=images
    print i
    return 0

iter_args=[]
for i in range(0,1):
    iter_args.append([i])

pool=multiprocessing.Pool()
print pool
pool.map(func,iter_args)
Run Code Online (Sandbox Code Playgroud)