axc*_*tor 16 python queue multiprocessing
在使用多处理模块的Python中,有两种队列:
他们之间有什么区别?
from multiprocessing import Queue
q = Queue()
q.put(item) # Put an item on the queue
item = q.get() # Get an item from the queue
Run Code Online (Sandbox Code Playgroud)
from multiprocessing import JoinableQueue
q = JoinableQueue()
q.task_done() # Signal task completion
q.join() # Wait for completion
Run Code Online (Sandbox Code Playgroud)
ffe*_*rri 18
JoinableQueue有一些方法join()和task_done(),这Queue也没有.
class multiprocessing.Queue([maxsize])
返回使用管道和一些锁/信号量实现的进程共享队列.当进程首先将项目放入队列时,将启动一个馈送线程,该线程将对象从缓冲区传输到管道中.
标准库的Queue模块中通常的Queue.Empty和Queue.Full异常被引发以指示超时.
除了task_done()和join()之外,Queue实现了Queue.Queue的所有方法.
class multiprocessing.JoinableQueue([maxsize])
JoinableQueue是一个Queue子类,是一个队列,它还具有task_done()和join()方法.
task_done()
表示以前排队的任务已完成.由队列使用者线程使用.对于用于获取任务的每个get(),对task_done()的后续调用会告知队列该任务的处理已完成.
如果join()当前正在阻塞,则它将在所有项目都已处理后恢复(这意味着已为每个已放入队列的项目收到task_done()调用).
如果调用的次数超过队列中放置的项目,则引发ValueError.
加入()
阻止,直到队列中的所有项目都已获取并处理完毕.
每当项目添加到队列时,未完成任务的计数就会增加.每当消费者线程调用task_done()以指示该项目已被检索并且其上的所有工作都已完成时,计数就会下降.当未完成任务的数量降至零时,join()取消阻止.
如果你使用JoinableQueue那么你必须调用JoinableQueue.task_done()从队列中删除的每个任务,否则用于计算未完成任务数量的信号量最终可能会溢出,从而引发异常.
小智 6
根据文档,很难确定它Queue实际上是空的。您可以通过JoinableQueue调用 来等待队列清空q.join()。如果您想要分批完成工作,并在每个批次结束时执行一些离散的操作,这可能会有所帮助。
例如,您可能一次通过队列处理 1000 个项目,然后向用户发送推送通知,告知您已完成另一批处理。这对于普通的Queue.
它可能看起来像:
import multiprocessing as mp
BATCH_SIZE = 1000
STOP_VALUE = 'STOP'
def consume(q):
for item in iter(q.get, STOP_VALUE):
try:
process(item)
# Be very defensive about errors since they can corrupt pipes.
except Exception as e:
logger.error(e)
finally:
q.task_done()
q = mp.JoinableQueue()
with mp.Pool() as pool:
# Pull items off queue as fast as we can whenever they're ready.
for _ in range(mp.cpu_count()):
pool.apply_async(consume, q)
for i in range(0, len(URLS), BATCH_SIZE):
# Put `BATCH_SIZE` items in queue asynchronously.
pool.map_async(expensive_func, URLS[i:i+BATCH_SIZE], callback=q.put)
# Wait for the queue to empty.
q.join()
notify_users()
# Stop the consumers so we can exit cleanly.
for _ in range(mp.cpu_count()):
q.put(STOP_VALUE)
Run Code Online (Sandbox Code Playgroud)
注意:我实际上还没有运行过这段代码。如果你从队列中取出物品的速度比放入物品的速度快,你可能会提前完成。在这种情况下,此代码至少每 1000 个项目发送一次更新,甚至可能更频繁。对于进度更新,这可能没问题。如果精确到 1000 很重要,您可以使用mp.Value('i', 0)并在每次发布时检查它是否为 1000 join。
| 归档时间: |
|
| 查看次数: |
14529 次 |
| 最近记录: |