如何使用multiprocessing.Queue.get方法?

alp*_*ric 3 python queue multiprocessing python-multiprocessing

下面的代码将三个数字放入队列中。然后它尝试从队列中取回号码。但它永远不会。如何从队列中获取数据?

import multiprocessing

queue = multiprocessing.Queue()

for i in range(3):
    queue.put(i)

while not queue.empty():
    print queue.get()
Run Code Online (Sandbox Code Playgroud)

Dar*_*aut 5

我最初在阅读@Martijn Pieters'后删除了这个答案,因为他更详细和更早地描述了“为什么这不起作用”。然后\n我意识到,OP 示例中的用例不太符合规范的听起来标题

\n\n
\n

“如何使用 multiprocessing.Queue.get 方法”。

\n
\n\n

这并不是因为没有子进程参与演示,而是因为在实际应用程序中几乎不会预先填充队列并且仅在之后读出,而是读取和写入与之间的等待时间交错发生。Martijn 展示的扩展演示代码在通常情况下不起作用,因为当排队跟不上读取时, while 循环会很快中断。所以这里是重新加载的答案,它能够处理通常的交错提要和读取场景:

\n\n
\n\n

不要依赖queue.empty 检查来进行同步。

\n\n
\n

将对象放入空队列后,在queue\xe2\x80\x99s的empty()方法返回False之前可能会有无限小的延迟,并且get_nowait()可以在不引发queue.Empty的情况下返回。\n ...

\n\n

空的()

\n\n

如果队列为空则返回 True,否则返回 False。由于多线程/多处理语义,这是不可靠的。文档

\n
\n\n

要么使用for msg in iter(queue.get, sentinel):to .get()from 队列,通过传递哨兵值来打破循环... iter(callable, sentinel)?

\n\n
from multiprocessing import Queue\n\nSENTINEL = None\n\nif __name__ == \'__main__\':\n\n    queue = Queue()\n\n    for i in [*range(3), SENTINEL]:\n        queue.put(i)\n\n    for msg in iter(queue.get, SENTINEL):\n        print(msg)\n
Run Code Online (Sandbox Code Playgroud)\n\n

...或者如果您需要非阻塞解决方案,则使用get_nowait()并处理可能的异常。queue.Empty

\n\n
from multiprocessing import Queue\nfrom queue import Empty\nimport time\n\nSENTINEL = None\n\nif __name__ == \'__main__\':\n\n    queue = Queue()\n\n    for i in [*range(3), SENTINEL]:\n        queue.put(i)\n\n    while True:\n        try:\n            msg = queue.get_nowait()\n            if msg == SENTINEL:\n                break\n            print(msg)\n        except Empty:\n            # do other stuff\n            time.sleep(0.1)\n
Run Code Online (Sandbox Code Playgroud)\n\n

如果只有一个进程并且该进程中只有一个线程正在读取队列,则也可以将最后一个代码片段与以下内容交换:

\n\n
while True:\n    if not queue.empty():  # this is not an atomic operation ...\n        msg = queue.get()  # ... thread could be interrupted in between\n        if msg == SENTINEL:\n            break\n        print(msg)\n    else:\n        # do other stuff\n        time.sleep(0.1)\n
Run Code Online (Sandbox Code Playgroud)\n\n

由于线程可能会在检查和之间删除GIL,因此这不适合进程中的多线程队列读取。如果多个进程正在从队列中读取数据,则同样适用。if not queue.empty()queue.get()

\n\n

不过,对于单生产者/单消费者场景,使用 amultiprocessing.Pipe代替multiprocessing.Queue就足够了并且性能更高。

\n