队列与Python中的JoinableQueue

axc*_*tor 16 python queue multiprocessing

在使用多处理模块的Python中,有两种队列:

  • 队列
  • JoinableQueue.

他们之间有什么区别?

队列

from multiprocessing import Queue
q = Queue()
q.put(item) # Put an item on the queue
item = q.get() # Get an item from the queue
Run Code Online (Sandbox Code Playgroud)

JoinableQueue

from multiprocessing import JoinableQueue
q = JoinableQueue()
q.task_done() # Signal task completion
q.join() # Wait for completion
Run Code Online (Sandbox Code Playgroud)

ffe*_*rri 18

JoinableQueue有一些方法join()task_done(),这Queue也没有.


class multiprocessing.Queue([maxsize])

返回使用管道和一些锁/信号量实现的进程共享队列.当进程首先将项目放入队列时,将启动一个馈送线程,该线程将对象从缓冲区传输到管道中.

标准库的Queue模块中通常的Queue.Empty和Queue.Full异常被引发以指示超时.

除了task_done()和join()之外,Queue实现了Queue.Queue的所有方法.


class multiprocessing.JoinableQueue([maxsize])

JoinableQueue是一个Queue子类,是一个队列,它还具有task_done()和join()方法.

task_done()

表示以前排队的任务已完成.由队列使用者线程使用.对于用于获取任务的每个get(),对task_done()的后续调用会告知队列该任务的处理已完成.

如果join()当前正在阻塞,则它将在所有项目都已处理后恢复(这意味着已为每个已放入队列的项目收到task_done()调用).

如果调用的次数超过队列中放置的项目,则引发ValueError.

加入()

阻止,直到队列中的所有项目都已获取并处理完毕.

每当项目添加到队列时,未完成任务的计数就会增加.每当消费者线程调用task_done()以指示该项目已被检索并且其上的所有工作都已完成时,计数就会下降.当未完成任务的数量降至零时,join()取消阻止.


如果你使用JoinableQueue那么你必须调用JoinableQueue.task_done()从队列中删除的每个任务,否则用于计算未完成任务数量的信号量最终可能会溢出,从而引发异常.

  • 除了引用文档 - 这个答案带来了什么?一个例子和一些解释会很好 (8认同)
  • 您可以添加示例代码吗?这将改善答案。目前它对我没有太大帮助。对不起。 (2认同)

小智 6

根据文档,很难确定它Queue实际上是空的。您可以通过JoinableQueue调用 来等待队列清空q.join()。如果您想要分批完成工作,并在每个批次结束时执行一些离散的操作,这可能会有所帮助。

例如,您可能一次通过队列处理 1000 个项目,然后向用户发送推送通知,告知您已完成另一批处理。这对于普通的Queue.

它可能看起来像:

import multiprocessing as mp

BATCH_SIZE = 1000
STOP_VALUE = 'STOP'

def consume(q):
  for item in iter(q.get, STOP_VALUE):
    try:
      process(item)
    # Be very defensive about errors since they can corrupt pipes.
    except Exception as e:
      logger.error(e)
    finally:
      q.task_done()

q = mp.JoinableQueue()
with mp.Pool() as pool:
  # Pull items off queue as fast as we can whenever they're ready.
  for _ in range(mp.cpu_count()):
    pool.apply_async(consume, q)
  for i in range(0, len(URLS), BATCH_SIZE):
    # Put `BATCH_SIZE` items in queue asynchronously.
    pool.map_async(expensive_func, URLS[i:i+BATCH_SIZE], callback=q.put)
    # Wait for the queue to empty.
    q.join()
    notify_users()
  # Stop the consumers so we can exit cleanly.
  for _ in range(mp.cpu_count()):
    q.put(STOP_VALUE)
Run Code Online (Sandbox Code Playgroud)

注意:我实际上还没有运行过这段代码。如果你从队列中取出物品的速度比放入物品的速度快,你可能会提前完成。在这种情况下,此代码至少每 1000 个项目发送一次更新,甚至可能更频繁。对于进度更新,这可能没问题。如果精确到 1000 很重要,您可以使用mp.Value('i', 0)并在每次发布时检查它是否为 1000 join