尽管队列已满,Python多处理队列get()超时

And*_*nes 6 python multiprocessing python-multiprocessing

我正在使用Python的多处理模块来进行科学的并行处理。在我的代码中,我使用了多个繁重的工作流程以及一个将结果持久保存到磁盘的写入程序。要写入的数据通过队列从工作进程发送到写程序进程。数据本身非常简单,仅由一个包含文件名的元组和带有两个浮点数的列表组成。经过几个小时的处理,编写器进程通常会卡住。更准确地说,以下代码块

while (True):
    try:
        item = queue.get(timeout=60)
        break
    except Exception as error:
        logging.info("Writer: Timeout occurred {}".format(str(error)))
Run Code Online (Sandbox Code Playgroud)

永远不会退出循环,我会收到连续的“超时”消息。

我还实现了一个日志记录过程,该过程除其他外输出队列的状态,即使我收到上面的超时错误消息,对qsize()的调用也会不断返回完整的队列(在我的情况下为size = 48)。

我已经彻底检查了队列对象的文档,并且找不到关于为什么同时队列已满的get()返回超时的可能解释。

有任何想法吗?

编辑:

我修改了代码以确保捕获到空队列异常:

while (True):
    try:
        item = queue.get(timeout=60)
        break
    except Empty as error:
        logging.info("Writer: Timeout occurred {}".format(str(error)))
Run Code Online (Sandbox Code Playgroud)

Tom*_*ota 4

在多处理队列中用作同步消息队列。您的问题似乎也是如此。然而,这需要的不仅仅是调用get()方法。处理每个任务后,您需要调用task_done()以便从队列中删除元素。

来自文档:

Queue.task_done()

指示先前排队的任务已完成。由队列消费者线程使用。对于用于获取任务的每个 get(),随后对 task_done() 的调用会告诉队列该任务的处理已完成。

如果 join() 当前处于阻塞状态,它将在处理完所有项目后恢复(这意味着对于已 put() 到队列中的每个项目都会收到一个 task_done() 调用)。

在文档中,您还可以找到正确使用线程队列的代码示例。

如果你的代码应该是这样的

while (True):
    try:
        item = queue.get(timeout=60)
        if item is None:
            break
        # call working fuction here
        queue.task_done()
    except Exception as error:
        logging.info("Writer: Timeout occurred {}".format(str(error)))
Run Code Online (Sandbox Code Playgroud)