在Python中为多处理共享内存的更好方法是什么?

alf*_*dox 10 python multithreading shared-memory multiprocessing python-multiprocessing

我已经解决了这个问题一个星期了,而且它变得非常令人沮丧,因为每次我实现一个更简单但相似的我需要做的比例示例时,事实证明多处理会捏造它.它处理共享内存的方式令我困惑,因为它非常有限,它可能会很快变得无用.

所以我的问题的基本描述是我需要创建一个进程,该进程在一些参数中传递以打开图像并创建大小为60x40的大约20K补丁.这些补丁一次保存到列表2中,需要返回到主线程,然后由GPU上运行的其他2个并发进程再次处理.

过程和工作流程以及所有大部分都需要处理的事情,我现在需要的是最容易被认为是最困难的部分.我无法保存并将带有20K补丁的列表返回到主线程.

第一个问题是因为我将这些补丁保存为PIL图像.然后我发现添加到Queue对象的所有数据都必须被pickle.第二个问题是我然后将补丁转换为每个60x40的数组并将它们保存到列表中.而现在仍然不起作用?显然,当您调用queue_obj.get()程序挂起时,队列可以保存有限数量的数据.

我尝试过很多其他的东西,而且我尝试的每一件新东西都不起作用,所以我想知道是否有人有一个库的其他建议我可以使用它来共享对象而没有所有模糊的东西?

这是我正在看的一种示例实现.请记住,这完全正常,但完全实现不会.我确实有代码打印信息性消息,以确保保存的数据具有完全相同的形状和一切,但由于某种原因它不起作用.在完整实现中,独立进程成功完成,但在q.get()处冻结.

from PIL import Image
from multiprocessing import Queue, Process
import StringIO
import numpy

img = Image.open("/path/to/image.jpg")
q = Queue()
q2 = Queue()
#
#
# MAX Individual Queue limit for 60x40 images in BW is 31,466.
# Multiple individual Queues can be filled to the max limit of 31,466.
# A single Queue can only take up to 31,466, even if split up in different puts.
def rz(patch, qn1, qn2):
    totalPatchCount = 20000
    channels = 1
    patch = patch.resize((60,40), Image.ANTIALIAS)
    patch = patch.convert('L')
    # ImgArray = numpy.asarray(im, dtype=numpy.float32)
    list_im_arr = []
    # ----Create a 4D Array
    # returnImageArray = numpy.zeros(shape=(totalPatchCount, channels, 40, 60))
    imgArray = numpy.asarray(patch, dtype=numpy.float32)
    imgArray = imgArray[numpy.newaxis, ...]
    # ----End 4D array
    # list_im_arr2 = []
    for i in xrange(totalPatchCount):
        # returnImageArray[i] = imgArray
        list_im_arr.append(imgArray)
    qn1.put(list_im_arr)
    qn1.cancel_join_thread()
    # qn2.cancel_join_thread()
    print "PROGRAM Done"

# rz(img,q,q2)
# l = q.get()

#
p = Process(target=rz,args=(img, q, q2,))
p.start()
p.join()
#
# # l = []
# # for i in xrange(1000): l.append(q.get())
#
imdata = q.get()
Run Code Online (Sandbox Code Playgroud)

use*_*217 6

队列用于进程之间的通信.在你的情况下,你真的没有这种沟通.您可以简单地让进程返回结果,并使用该.get()方法来收集它们.(记得添加if __name__ == "main":,参见编程指南)

from PIL import Image
from multiprocessing import Pool, Lock
import numpy

img = Image.open("/path/to/image.jpg")

def rz():
    totalPatchCount = 20000
    imgArray = numpy.asarray(patch, dtype=numpy.float32)
    list_im_arr = [imgArray] * totalPatchCount  # A more elegant way than a for loop
    return list_im_arr

if __name__ == '__main__':  
    # patch = img....  Your code to get generate patch here
    patch = patch.resize((60,40), Image.ANTIALIAS)
    patch = patch.convert('L')

    pool = Pool(2)
    imdata = [pool.apply_async(rz).get() for x in range(2)]
    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

现在,根据这篇文章的第一个答案,多处理只传递可选择的对象.在多处理中,酸洗可能是不可避免的,因为进程不共享内存.他们根本不住在同一个宇宙中.(他们在第一次产生时会继承记忆,但是他们无法触及他们自己的宇宙).PIL图像对象本身不可拾取.您可以通过仅提取存储在其中的图像数据来使其可选择,如此建议的帖子.

由于您的问题主要是I/O绑定,您还可以尝试多线程.它可能会更快达到您的目的.线程共享所有内容,因此不需要酸洗.如果您使用的是python 3,那么这ThreadPoolExecutor是一个很棒的工具.对于Python 2,您可以使用ThreadPool.为了实现更高的效率,您必须重新安排您的工作方式,您希望分解流程并让不同的线程完成工作.

from PIL import Image
from multiprocessing.pool import ThreadPool
from multiprocessing import Lock
import numpy

img = Image.open("/path/to/image.jpg")
lock = Lock():
totalPatchCount = 20000

def rz(x):
    patch = ...
    return patch

pool = ThreadPool(8)
imdata = [pool.map(rz, range(totalPatchCount)) for i in range(2)]
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)