aer*_*ain 12 python multiprocessing
我正在尝试在Python中使用带有多处理库的队列.执行下面的代码后(打印语句工作),但是我在队列上调用join后进程没有退出并且仍然存在.如何终止剩余的流程?
谢谢!
def MultiprocessTest(self):
print "Starting multiprocess."
print "Number of CPUs",multiprocessing.cpu_count()
num_procs = 4
def do_work(message):
print "work",message ,"completed"
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = multiprocessing.JoinableQueue()
for i in range(num_procs):
p = multiprocessing.Process(target=worker)
p.daemon = True
p.start()
source = ['hi','there','how','are','you','doing']
for item in source:
q.put(item)
print "q close"
q.join()
#q.close()
print "Finished everything...."
print "num active children:",multiprocessing.active_children()
Run Code Online (Sandbox Code Playgroud)
试试这个:
import multiprocessing
num_procs = 4
def do_work(message):
print "work",message ,"completed"
def worker():
for item in iter( q.get, None ):
do_work(item)
q.task_done()
q.task_done()
q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
procs.append( multiprocessing.Process(target=worker) )
procs[-1].daemon = True
procs[-1].start()
source = ['hi','there','how','are','you','doing']
for item in source:
q.put(item)
q.join()
for p in procs:
q.put( None )
q.join()
for p in procs:
p.join()
print "Finished everything...."
print "num active children:", multiprocessing.active_children()
Run Code Online (Sandbox Code Playgroud)
小智 6
你的工人需要一个哨兵来终止,否则他们只会坐在封锁读物上.请注意,在Q上使用sleep而不是P上的join可以显示状态信息等.
我的首选模板是:
def worker(q,nameStr):
print 'Worker %s started' %nameStr
while True:
item = q.get()
if item is None: # detect sentinel
break
print '%s processed %s' % (nameStr,item) # do something useful
q.task_done()
print 'Worker %s Finished' % nameStr
q.task_done()
q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
nameStr = 'Worker_'+str(i)
p = multiprocessing.Process(target=worker, args=(q,nameStr))
p.daemon = True
p.start()
procs.append(p)
source = ['hi','there','how','are','you','doing']
for item in source:
q.put(item)
for i in range(num_procs):
q.put(None) # send termination sentinel, one for each process
while not q.empty(): # wait for processing to finish
sleep(1) # manage timeouts and status updates etc.
Run Code Online (Sandbox Code Playgroud)
这是一种无哨兵的方法,适用于相对简单的情况,其中您将多个任务放在 a 上JoinableQueue,然后启动消耗这些任务的工作进程,并在它们读取队列“干”后退出。诀窍是使用JoinableQueue.get_nowait()而不是get(). get_nowait()顾名思义,尝试以非阻塞方式从队列中获取值,如果没有任何内容可获取,则会queue.Empty引发异常。工作线程通过退出来处理该异常。
简单的代码来说明原理:
import multiprocessing as mp
from queue import Empty
def worker(q):
while True:
try:
work = q.get_nowait()
# ... do something with `work`
q.task_done()
except Empty:
break # completely done
# main
worknum = 4
jq = mp.JoinableQueue()
# fill up the task queue
# let's assume `tasks` contains some sort of data
# that your workers know how to process
for task in tasks:
jq.put(task)
procs = [ mp.Process(target=worker, args=(jq,)) for _ in range(worknum) ]
for p in procs:
p.start()
for p in procs:
p.join()
Run Code Online (Sandbox Code Playgroud)
优点是不需要在队列中放入“毒丸”,因此代码会短一些。
重要提示:在更复杂的情况下,生产者和消费者以“交错”的方式使用相同的队列,并且工作人员可能必须等待新任务的出现,应该使用“毒丸”方法。我上面的建议是针对简单的情况,工作人员“知道”如果任务队列是空的,那么就没有必要再犹豫了。
| 归档时间: |
|
| 查看次数: |
27332 次 |
| 最近记录: |