使用带有python多处理的列表

bit*_*man 7 python list multiprocessing

任何人都可以帮助我在多个python进程之间共享列表.问题是让self.ID_List和self.mps_in_process在以下代码中工作.

import time, random
from multiprocessing import Process #, Manager, Array, Queue

class MP_Stuff():
    def __init__(self, parent, id):
        time.sleep(1 + random.random()*10) # simulate data processing
        parent.killMP(id)

class ParamHandler():
    def doFirstMP(self, IDs):
        self.mps_in_process = []
        self.ID_List = IDs
        id = self.ID_List.pop(0)
        p = Process(target=MP_Stuff, args=(self, id))
        self.mps_in_process.append(id)
        p.start()

    def doMP(self):
        for tmp in range(3): # nr of concurrent processes
            if len(self.ID_List) > 0:
                id = self.ID_List.pop(0)
                p = Process(target=MP_Stuff, args=(self, id))
                self.mps_in_process.append(id)
                p.start()

    def killMP(self, kill_id):
        self.mps_in_process.remove(kill_id)
        self.doMP()

if __name__ == '__main__':
    ID_List = [1,2,3,4,5,6]
    paramSet = ParamHandler()
    paramSet.doFirstMP(ID_List)
Run Code Online (Sandbox Code Playgroud)

很快,代码所做的是,根据self.ID_List中的数据id处理一些数据(这里是MP_Stuff中的随机时间).为了知道正在使用多少数据id,我们使用了self.mps_in_process(这里的nr进程是硬编码的,但实际上它是动态的).

问题是跨多个进程共享mps_in_process和ID_List.当前的代码进入了无限循环.实际上在多处理库中有很好的描述:

"如果在子进程中运行的代码尝试访问全局变量,那么它看到的值(如果有的话)可能与调用Process.start()时父进程中的值不同."

但是,我无法弄清楚如何让mps_in_process和ID_List工作.我不能使用Queue,因为从mps_in_process中取出元素的方式是随机的.我不能使用Array,因为.pop(0)不起作用.我不能使用Manager().list(),因为.remove()和len(ID_List)不起作用.使用线程而不是多处理是没有解决方案的,因为以后必须使用freeze_support().

因此,非常欢迎任何帮助如何在进程间共享列表!

Kra*_*rab 3

管理器工作正常(包括 len())。您的代码的问题在于,在主进程中,您不会等到处理结束,因此主进程结束并且管理器不再可访问。另外我不知道 ListProxy 的 pop 的原子性,所以也许锁会很方便。

解决办法是p.join()

p.join但是,我很困惑为什么在 结束时就足够了doFirstMP。如果有人能解释为什么第一个 p 上的 join 在所有计算完成后返回,而不是在第一个 doMP 返回后返回,我会很高兴。

我的代码:

import time, random
from multiprocessing import Process, Manager

class MP_Stuff():
    def __init__(self, parent, id):
        time.sleep(1 + random.random()*5) # simulate data processing
        print id , "done"
        parent.killMP(id)

class ParamHandler():    
    def doFirstMP(self, IDs):
        self.mps_in_process = []
        self.ID_List = Manager().list(IDs)
        id = self.ID_List.pop(0)
        p = Process(target=MP_Stuff, args=(self, id))
        self.mps_in_process.append(id)
        p.start()
        p.join()
        print "joined"

    def doMP(self):
        for tmp in range(3): # nr of concurrent processes
            print self.ID_List
            if len(self.ID_List) > 0:
                id = self.ID_List.pop(0)
                p = Process(target=MP_Stuff, args=(self, id))
                self.mps_in_process.append(id)
                p.start()

    def killMP(self, kill_id):
        print "kill", kill_id
        self.mps_in_process.remove(kill_id)
        self.doMP()

if __name__ == '__main__':
    ID_List = [1,2,3,4,5,6]
    paramSet = ParamHandler()
    paramSet.doFirstMP(ID_List)
Run Code Online (Sandbox Code Playgroud)