将multiprocessing.Queue转储到列表中

Ram*_*hum 21 python queue multiprocessing

我想把一个multiprocessing.Queue列入一个列表.为此,我编写了以下函数:

import Queue

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    # START DEBUG CODE
    initial_size = queue.qsize()
    print("Queue has %s items initially." % initial_size)
    # END DEBUG CODE

    while True:
        try:
            thing = queue.get(block=False)
            result.append(thing)
        except Queue.Empty:

            # START DEBUG CODE
            current_size = queue.qsize()
            total_size = current_size + len(result)
            print("Dumping complete:")
            if current_size == initial_size:
                print("No items were added to the queue.")
            else:
                print("%s items were added to the queue." % \
                      (total_size - initial_size))
            print("Extracted %s items from the queue, queue has %s items \
            left" % (len(result), current_size))
            # END DEBUG CODE

            return result
Run Code Online (Sandbox Code Playgroud)

但由于某种原因,它不起作用.

观察以下shell会话:

>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> for i in range(100):
...     q.put([range(200) for j in range(100)])
... 
>>> q.qsize()
100
>>> l=dump_queue(q)
Queue has 100 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 99 items left
>>> l=dump_queue(q)
Queue has 99 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 3 items from the queue, queue has 96 items left
>>> l=dump_queue(q)
Queue has 96 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 95 items left
>>> 
Run Code Online (Sandbox Code Playgroud)

这里发生了什么事?为什么不是所有物品都被倾倒?

jno*_*ler 25

试试这个:

import Queue
import time

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    for i in iter(queue.get, 'STOP'):
        result.append(i)
    time.sleep(.1)
    return result

import multiprocessing
q = multiprocessing.Queue()
for i in range(100):
    q.put([range(200) for j in range(100)])
q.put('STOP')
l=dump_queue(q)
print len(l)
Run Code Online (Sandbox Code Playgroud)

多处理队列有一个内部缓冲区,它有一个馈线线程,可以从缓冲区中取出工作并将其刷新到管道.如果没有刷新所有对象,我可以看到一个过早提升Empty的情况.使用标记来指示队列的结束是安全的(并且可靠).另外,使用iter(get,sentinel)成语比依赖Empty更好.

我不喜欢它因为冲洗时间而可以提空(我添加了time.sleep(.1)以允许上下文切换到馈线线程,你可能不需要它,它没有它工作 - 这是一个习惯发布GIL).

  • 良好的一般想法Jesse,但更安全和更可靠的是使用一个`uuid`字符串(或用于线程而不是多处理,特定的`sentinel = object()),而不是通用字符串.即使这样,如果其他一些线程同时出现,你也会遇到麻烦; 唯一真正的_safe_方式是依赖于Queue内部的方式,唉! - ) (3认同)

Mat*_*ujo 9

在某些情况下,我们已经计算了所有内容,我们只想转换队列。

shared_queue = Queue()
shared_queue_list = []
...
join() #All process are joined
while shared_queue.qsize() != 0:
    shared_queue_list.append(shared_queue.get())
Run Code Online (Sandbox Code Playgroud)

现在 shared_queue_list 将结果转换为列表。