alp*_*ric 5 python multiprocessing
我正在使用下面发布的代码为Pool 启用暂停重启功能multiprocessing.
如果你解释我为什么event必须将变量作为参数发送给setup()函数,我将不胜感激.为什么然后unpaused在setup()函数范围内声明全局变量,然后将其设置为与event变量相同:
def setup(event):
global unpaused
unpaused = event
Run Code Online (Sandbox Code Playgroud)
我也想知道以下声明背后的后勤:
pool=mp.Pool(2, setup, (event,))
Run Code Online (Sandbox Code Playgroud)
提交的第一个参数是Pool要使用的CPU核心数.提交的第二个参数setup()是上面提到的函数.
为什么不能完成所有这些:
global event
event=mp.Event()
pool = mp.Pool(processes=2)
Run Code Online (Sandbox Code Playgroud)
每当我们需要暂停或重新启动作业时,我们只会使用:
暂停:
event.clear()
Run Code Online (Sandbox Code Playgroud)
重启:
event.set()
Run Code Online (Sandbox Code Playgroud)
为什么我们需要一个全局变量unpaused?我不明白!请指教.
import time
import multiprocessing as mp
def myFunct(arg):
proc=mp.current_process()
print 'starting:', proc.name, proc.pid,'...\n'
for i in range(110):
for n in range(500000):
pass
print '\t ...', proc.name, proc.pid, 'completed\n'
def setup(event):
global unpaused
unpaused = event
def pauseJob():
event.clear()
def continueJob():
event.set()
event=mp.Event()
pool=mp.Pool(2, setup, (event,))
pool.map_async(myFunct, [1,2,3])
event.set()
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
dan*_*ano 16
你误解了它是如何Event运作的.但首先,我将介绍setup正在做的事情.
该setup函数在启动后立即在池中的每个子进程中执行.因此,您要将event每个进程内部调用的全局变量设置为multiprocessing.Event您在主进程中创建的同一对象.最终,每个子进程都有一个全局变量,称为event对同一multiprocessing.Event对象的引用.这将允许您从主进程发出子进程的信号,就像您想要的那样.看这个例子:
import multiprocessing
event = None
def my_setup(event_):
global event
event = event_
print "event is %s in child" % event
if __name__ == "__main__":
event = multiprocessing.Event()
p = multiprocessing.Pool(2, my_setup, (event,))
print "event is %s in parent" % event
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
输出:
dan@dantop2:~$ ./mult.py
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in child
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in child
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in parent
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,event在两个子进程和父进程中它们是相同的.就像你想要的那样.
但是,event实际上不需要传递给设置.您可以event从父进程继承实例:
import multiprocessing
event = None
def my_worker(num):
print "event is %s in child" % event
if __name__ == "__main__":
event = multiprocessing.Event()
pool = multiprocessing.Pool(2)
pool.map_async(my_worker, [i for i in range(pool._processes)]) # Just call my_worker for every process in the pool.
pool.close()
pool.join()
print "event is %s in parent" % event
Run Code Online (Sandbox Code Playgroud)
输出:
dan@dantop2:~$ ./mult.py
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in child
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in child
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in parent
Run Code Online (Sandbox Code Playgroud)
这样更简单,并且是在父和子之间传递信号量的首选方法.事实上,如果你试图event直接传递给worker函数,你会收到一个错误:
RuntimeError: Semaphore objects should only be shared between processes through inheritance
Run Code Online (Sandbox Code Playgroud)
现在,回到你如何误解的方式Event.Event意思是这样使用:
import time
import multiprocessing
def event_func(num):
print '\t%r is waiting' % multiprocessing.current_process()
event.wait()
print '\t%r has woken up' % multiprocessing.current_process()
if __name__ == "__main__":
event = multiprocessing.Event()
pool = multiprocessing.Pool()
a = pool.map_async(event_func, [i for i in range(pool._processes)])
print 'main is sleeping'
time.sleep(2)
print 'main is setting event'
event.set()
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
输出:
main is sleeping
<Process(PoolWorker-1, started daemon)> is waiting
<Process(PoolWorker-2, started daemon)> is waiting
<Process(PoolWorker-4, started daemon)> is waiting
<Process(PoolWorker-3, started daemon)> is waiting
main is setting event
<Process(PoolWorker-2, started daemon)> has woken up
<Process(PoolWorker-1, started daemon)> has woken up
<Process(PoolWorker-4, started daemon)> has woken up
<Process(PoolWorker-3, started daemon)> has woken up
Run Code Online (Sandbox Code Playgroud)
如您所见,子进程需要显式调用event.wait()它们才能暂停.event.set在主进程中调用时,它们会被取消暂停.现在你的工人都event.wait没有打电话,所以他们都不能暂停.我建议你看一看的文档的threading.Event,它multiprocessing.Event复制.