使用`event`进行多处理暂停 - 重启功能

alp*_*ric 5 python multiprocessing

我正在使用下面发布的代码为Pool 启用暂停重启功能multiprocessing.

如果你解释我为什么event必须将变量作为参数发送给setup()函数,我将不胜感激.为什么然后unpausedsetup()函数范围内声明全局变量,然后将其设置为与event变量相同:

def setup(event):
    global unpaused
    unpaused = event
Run Code Online (Sandbox Code Playgroud)

我也想知道以下声明背后的后勤:

pool=mp.Pool(2, setup, (event,))
Run Code Online (Sandbox Code Playgroud)

提交的第一个参数是Pool要使用的CPU核心数.提交的第二个参数setup()是上面提到的函数.

为什么不能完成所有这些:

global event
event=mp.Event()
pool = mp.Pool(processes=2)
Run Code Online (Sandbox Code Playgroud)

每当我们需要暂停或重新启动作业时,我们只会使用:

暂停:

event.clear()
Run Code Online (Sandbox Code Playgroud)

重启:

event.set()
Run Code Online (Sandbox Code Playgroud)

为什么我们需要一个全局变量unpaused?我不明白!请指教.


import time
import multiprocessing as mp

def myFunct(arg):
    proc=mp.current_process()
    print 'starting:', proc.name, proc.pid,'...\n'
    for i in range(110):
        for n in range(500000):
            pass
    print '\t ...', proc.name, proc.pid, 'completed\n'

def setup(event):
    global unpaused
    unpaused = event

def pauseJob():
    event.clear()

def continueJob():
    event.set()


event=mp.Event()

pool=mp.Pool(2, setup, (event,))
pool.map_async(myFunct, [1,2,3])

event.set()

pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

dan*_*ano 16

你误解了它是如何Event运作的.但首先,我将介绍setup正在做的事情.

setup函数在启动后立即在池中的每个子进程中执行.因此,您要将event每个进程内部调用的全局变量设置为multiprocessing.Event您在主进程中创建的同一对象.最终,每个子进程都有一个全局变量,称为event对同一multiprocessing.Event对象的引用.这将允许您从主进程发出子进程的信号,就像您想要的那样.看这个例子:

import multiprocessing

event = None
def my_setup(event_):
  global event
  event = event_
  print "event is %s in child" % event


if __name__ == "__main__":
    event = multiprocessing.Event()
    p = multiprocessing.Pool(2, my_setup, (event,))
    print "event is %s in parent" % event
    p.close()
    p.join()
Run Code Online (Sandbox Code Playgroud)

输出:

dan@dantop2:~$ ./mult.py 
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in child
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in child
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in parent
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,event在两个子进程和父进程中它们是相同的.就像你想要的那样.

但是,event实际上不需要传递给设置.您可以event从父进程继承实例:

import multiprocessing

event = None

def my_worker(num):
    print "event is %s in child" % event

if __name__ == "__main__":
    event = multiprocessing.Event()
    pool = multiprocessing.Pool(2)
    pool.map_async(my_worker, [i for i in range(pool._processes)]) # Just call my_worker for every process in the pool.

    pool.close()
    pool.join()
    print "event is %s in parent" % event
Run Code Online (Sandbox Code Playgroud)

输出:

dan@dantop2:~$ ./mult.py 
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in child
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in child
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in parent
Run Code Online (Sandbox Code Playgroud)

这样更简单,并且是在父和子之间传递信号量的首选方法.事实上,如果你试图event直接传递给worker函数,你会收到一个错误:

RuntimeError: Semaphore objects should only be shared between processes through inheritance
Run Code Online (Sandbox Code Playgroud)

现在,回到你如何误解的方式Event.Event意思是这样使用:

import time
import multiprocessing

def event_func(num):
    print '\t%r is waiting' % multiprocessing.current_process()
    event.wait()
    print '\t%r has woken up' % multiprocessing.current_process()

if __name__ == "__main__":
    event = multiprocessing.Event()

    pool = multiprocessing.Pool()
    a = pool.map_async(event_func, [i for i in range(pool._processes)])

    print 'main is sleeping'
    time.sleep(2)

    print 'main is setting event'
    event.set()

    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

输出:

main is sleeping
    <Process(PoolWorker-1, started daemon)> is waiting
    <Process(PoolWorker-2, started daemon)> is waiting
    <Process(PoolWorker-4, started daemon)> is waiting
    <Process(PoolWorker-3, started daemon)> is waiting
main is setting event
    <Process(PoolWorker-2, started daemon)> has woken up
    <Process(PoolWorker-1, started daemon)> has woken up
    <Process(PoolWorker-4, started daemon)> has woken up
    <Process(PoolWorker-3, started daemon)> has woken up
Run Code Online (Sandbox Code Playgroud)

如您所见,子进程需要显式调用event.wait()它们才能暂停.event.set在主进程中调用时,它们会被取消暂停.现在你的工人都event.wait没有打电话,所以他们都不能暂停.我建议你看一看的文档的threading.Event,它multiprocessing.Event复制.