bit*_*man 7 python list multiprocessing
任何人都可以帮助我在多个python进程之间共享列表.问题是让self.ID_List和self.mps_in_process在以下代码中工作.
import time, random
from multiprocessing import Process #, Manager, Array, Queue
class MP_Stuff():
def __init__(self, parent, id):
time.sleep(1 + random.random()*10) # simulate data processing
parent.killMP(id)
class ParamHandler():
def doFirstMP(self, IDs):
self.mps_in_process = []
self.ID_List = IDs
id = self.ID_List.pop(0)
p = Process(target=MP_Stuff, args=(self, id))
self.mps_in_process.append(id)
p.start()
def doMP(self):
for tmp in range(3): # nr of concurrent processes
if len(self.ID_List) > 0:
id = self.ID_List.pop(0)
p = Process(target=MP_Stuff, args=(self, id))
self.mps_in_process.append(id)
p.start()
def killMP(self, kill_id):
self.mps_in_process.remove(kill_id)
self.doMP()
if __name__ == '__main__':
ID_List = [1,2,3,4,5,6]
paramSet = ParamHandler()
paramSet.doFirstMP(ID_List)Run Code Online (Sandbox Code Playgroud)
很快,代码所做的是,根据self.ID_List中的数据id处理一些数据(这里是MP_Stuff中的随机时间).为了知道正在使用多少数据id,我们使用了self.mps_in_process(这里的nr进程是硬编码的,但实际上它是动态的).
问题是跨多个进程共享mps_in_process和ID_List.当前的代码进入了无限循环.实际上在多处理库中有很好的描述:
"如果在子进程中运行的代码尝试访问全局变量,那么它看到的值(如果有的话)可能与调用Process.start()时父进程中的值不同."
但是,我无法弄清楚如何让mps_in_process和ID_List工作.我不能使用Queue,因为从mps_in_process中取出元素的方式是随机的.我不能使用Array,因为.pop(0)不起作用.我不能使用Manager().list(),因为.remove()和len(ID_List)不起作用.使用线程而不是多处理是没有解决方案的,因为以后必须使用freeze_support().
因此,非常欢迎任何帮助如何在进程间共享列表!
管理器工作正常(包括 len())。您的代码的问题在于,在主进程中,您不会等到处理结束,因此主进程结束并且管理器不再可访问。另外我不知道 ListProxy 的 pop 的原子性,所以也许锁会很方便。
解决办法是p.join()。
p.join但是,我很困惑为什么在 结束时就足够了doFirstMP。如果有人能解释为什么第一个 p 上的 join 在所有计算完成后返回,而不是在第一个 doMP 返回后返回,我会很高兴。
我的代码:
import time, random
from multiprocessing import Process, Manager
class MP_Stuff():
def __init__(self, parent, id):
time.sleep(1 + random.random()*5) # simulate data processing
print id , "done"
parent.killMP(id)
class ParamHandler():
def doFirstMP(self, IDs):
self.mps_in_process = []
self.ID_List = Manager().list(IDs)
id = self.ID_List.pop(0)
p = Process(target=MP_Stuff, args=(self, id))
self.mps_in_process.append(id)
p.start()
p.join()
print "joined"
def doMP(self):
for tmp in range(3): # nr of concurrent processes
print self.ID_List
if len(self.ID_List) > 0:
id = self.ID_List.pop(0)
p = Process(target=MP_Stuff, args=(self, id))
self.mps_in_process.append(id)
p.start()
def killMP(self, kill_id):
print "kill", kill_id
self.mps_in_process.remove(kill_id)
self.doMP()
if __name__ == '__main__':
ID_List = [1,2,3,4,5,6]
paramSet = ParamHandler()
paramSet.doFirstMP(ID_List)
Run Code Online (Sandbox Code Playgroud)