Python中的多处理队列

aer*_*ain 12 python multiprocessing

我正在尝试在Python中使用带有多处理库的队列.执行下面的代码后(打印语句工作),但是我在队列上调用join后进程没有退出并且仍然存在.如何终止剩余的流程?

谢谢!

def MultiprocessTest(self):
  print "Starting multiprocess."
  print "Number of CPUs",multiprocessing.cpu_count()

  num_procs = 4
  def do_work(message):
    print "work",message ,"completed"

  def worker():
    while True:
      item = q.get()
      do_work(item)
      q.task_done()

  q = multiprocessing.JoinableQueue()
  for i in range(num_procs):
    p = multiprocessing.Process(target=worker)
    p.daemon = True
    p.start()

  source = ['hi','there','how','are','you','doing']
  for item in source:
    q.put(item)
  print "q close"
  q.join()
  #q.close()
  print "Finished everything...."
  print "num active children:",multiprocessing.active_children()
Run Code Online (Sandbox Code Playgroud)

und*_*run 9

试试这个:

import multiprocessing

num_procs = 4
def do_work(message):
  print "work",message ,"completed"

def worker():
  for item in iter( q.get, None ):
    do_work(item)
    q.task_done()
  q.task_done()

q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
  procs.append( multiprocessing.Process(target=worker) )
  procs[-1].daemon = True
  procs[-1].start()

source = ['hi','there','how','are','you','doing']
for item in source:
  q.put(item)

q.join()

for p in procs:
  q.put( None )

q.join()

for p in procs:
  p.join()

print "Finished everything...."
print "num active children:", multiprocessing.active_children()
Run Code Online (Sandbox Code Playgroud)

  • @Dilettant - 不,这实际上不会改变任何东西.创建进程时,q在全局命名空间中可用,因此工作程序在调用时实际上会有q的副本.在调用`multiprocessing.Process`中指定`args =(q,)`可能会更好,因为那时我们明确地分享了这个项目 - 这有助于养成习惯,这样你就可以避免意外地分享你应该分享的东西/不能分享. (2认同)

小智 6

你的工人需要一个哨兵来终止,否则他们只会坐在封锁读物上.请注意,在Q上使用sleep而不是P上的join可以显示状态信息等.
我的首选模板是:

def worker(q,nameStr):
  print 'Worker %s started' %nameStr
  while True:
     item = q.get()
     if item is None: # detect sentinel
       break
     print '%s processed %s' % (nameStr,item) # do something useful
     q.task_done()
  print 'Worker %s Finished' % nameStr
  q.task_done()

q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
  nameStr = 'Worker_'+str(i)
  p = multiprocessing.Process(target=worker, args=(q,nameStr))
  p.daemon = True
  p.start()
  procs.append(p)

source = ['hi','there','how','are','you','doing']
for item in source:
  q.put(item)

for i in range(num_procs):
  q.put(None) # send termination sentinel, one for each process

while not q.empty(): # wait for processing to finish
  sleep(1)   # manage timeouts and status updates etc.
Run Code Online (Sandbox Code Playgroud)

  • 尽管不是q.empty(),但是只有当工人抓紧要完成的最后一件工作时,这才不是可靠的方法来知道处理是否完成。坦白地说,由于您不正确地使用JoinableQueue,因此不需要JoinableQueue。如果选择不使用一个,则不需要工作线程来标记task_done。使用这样的队列的目的是让您可以加入它,而这恰恰是您在程序末尾要执行的操作,而不是等待队列为空。 (2认同)

Lar*_*dua 5

这是一种无哨兵的方法,适用于相对简单的情况,其中您将多个任务放在 a 上JoinableQueue,然后启动消耗这些任务的工作进程,并在它们读取队列“干”后退出。诀窍是使用JoinableQueue.get_nowait()而不是get(). get_nowait()顾名思义,尝试以非阻塞方式从队列中获取值,如果没有任何内容可获取,则会queue.Empty引发异常。工作线程通过退出来处理该异常。

简单的代码来说明原理:

import multiprocessing as mp
from queue import Empty

def worker(q):
  while True:
    try:
      work = q.get_nowait()
      # ... do something with `work`
      q.task_done()
    except Empty:
      break # completely done

# main
worknum = 4
jq = mp.JoinableQueue()

# fill up the task queue
# let's assume `tasks` contains some sort of data
# that your workers know how to process
for task in tasks:
  jq.put(task)

procs = [ mp.Process(target=worker, args=(jq,)) for _ in range(worknum) ]
for p in procs:
  p.start()

for p in procs:
  p.join()
Run Code Online (Sandbox Code Playgroud)

优点是不需要在队列中放入“毒丸”,因此代码会短一些。

重要提示:在更复杂的情况下,生产者和消费者以“交错”的方式使用相同的队列,并且工作人员可能必须等待新任务的出现,应该使用“毒丸”方法。我上面的建议是针对简单的情况,工作人员“知道”如果任务队列是空的,那么就没有必要再犹豫了。