zel*_*ell 10 python multiprocessing
我正在使用其多处理模块编写Python程序.该程序调用许多工作函数,每个函数产生一个随机数.一旦其中一个工人产生了大于0.7的数字,我就需要终止程序.
以下是我的程序,其中" 如何做到这一点 "部分尚未填写.任何的想法?谢谢.
import time
import numpy as np
import multiprocessing as mp
import time
import sys
def f(i):
np.random.seed(int(time.time()+i))
time.sleep(3)
res=np.random.rand()
print "From i = ",i, " res = ",res
if res>0.7:
print "find it"
# terminate ???? Question: How to do this???
if __name__=='__main__':
num_workers=mp.cpu_count()
pool=mp.Pool(num_workers)
for i in range(num_workers):
p=mp.Process(target=f,args=(i,))
p.start()
Run Code Online (Sandbox Code Playgroud)
Tim*_*ers 21
没有任何一个过程可以阻止另一个蛮力os.kill()的大锤.不要去那里.
要做到这一点,您需要重新设计基本方法:主流程和工作流程需要相互通信.
我已经把它充实了,但到目前为止的例子太过分了,无法使它变得有用.例如,正如所写的,只不过是num_workers打电话rand(),所以没有理由相信它们中的任何一个必须> 0.7.
一旦worker函数生成一个循环,它就会变得更加明显.例如,工作人员可以检查是否mp.Event在循环的顶部设置了一个,如果是,则退出.主要过程将设定Event何时希望工人停止.
当工人mp.Event发现值> 0.7时,工人可以设置不同的.主要过程将等待Event,然后设置"停止时间" Event供工人查看,然后通常循环.join()工作人员进行干净关闭.
这里充实了一个便携,干净的解决方案,假设工人将继续前进,直到至少有一个人发现值> 0.7.请注意,我从中删除numpy了,因为它与此代码无关.这里的代码应该在任何支持的平台上的任何股票Python下正常工作multiprocessing:
import random
from time import sleep
def worker(i, quit, foundit):
print "%d started" % i
while not quit.is_set():
x = random.random()
if x > 0.7:
print '%d found %g' % (i, x)
foundit.set()
break
sleep(0.1)
print "%d is done" % i
if __name__ == "__main__":
import multiprocessing as mp
quit = mp.Event()
foundit = mp.Event()
for i in range(mp.cpu_count()):
p = mp.Process(target=worker, args=(i, quit, foundit))
p.start()
foundit.wait()
quit.set()
Run Code Online (Sandbox Code Playgroud)
还有一些示例输出:
0 started
1 started
2 started
2 found 0.922803
2 is done
3 started
3 is done
4 started
4 is done
5 started
5 is done
6 started
6 is done
7 started
7 is done
0 is done
1 is done
Run Code Online (Sandbox Code Playgroud)
一切都干净利落:没有追溯,没有异常终止,没有留下僵尸进程......干净如哨声.
正如@noxdafox指出的那样,有一种Pool.terminate()方法可以跨平台尽其所能地杀死工作进程,无论他们在做什么(例如,在Windows上调用平台TerminateProcess()).我不推荐它用于生产代码,因为突然终止进程会使各种共享资源处于不一致状态,或者让它们泄漏.在multiprocessing文档中有各种各样的警告,您应该添加您的OS文档.
不过,它可能是权宜之计!这是使用这种方法的完整程序.请注意,我将截止值提高到了0.95,这使得这比使用eyeblink更长的时间:
import random
from time import sleep
def worker(i):
print "%d started" % i
while True:
x = random.random()
print '%d found %g' % (i, x)
if x > 0.95:
return x # triggers callback
sleep(0.5)
# callback running only in __main__
def quit(arg):
print "quitting with %g" % arg
# note: p is visible because it's global in __main__
p.terminate() # kill all pool workers
if __name__ == "__main__":
import multiprocessing as mp
ncpu = mp.cpu_count()
p = mp.Pool(ncpu)
for i in range(ncpu):
p.apply_async(worker, args=(i,), callback=quit)
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
还有一些示例输出:
$ python mptest.py
0 started
0 found 0.391351
1 started
1 found 0.767374
2 started
2 found 0.110969
3 started
3 found 0.611442
4 started
4 found 0.790782
5 started
5 found 0.554611
6 started
6 found 0.0483844
7 started
7 found 0.862496
0 found 0.27175
1 found 0.0398836
2 found 0.884015
3 found 0.988702
quitting with 0.988702
4 found 0.909178
5 found 0.336805
6 found 0.961192
7 found 0.912875
$ [the program ended]
Run Code Online (Sandbox Code Playgroud)