如何使Python的多处理队列的.empty()方法返回正确的值?还是替代品?

Ghe*_*Ace 5 python multithreading python-2.7 python-multiprocessing

我有这个使用模块中的Queue类的代码片段multiprocess.我很困惑,.empty()一个实例的方法Queue并没有像我期望的那样给我一个正确的值.这是我的代码:

from time import sleep
from multiprocessing import Queue, Lock

foo = Queue()
locker = Lock()

with locker:  # even with this, still True
    foo.put("bar")

print(foo.empty())  # True, obviously not
print(foo.empty())  # True
print(foo.empty())  # True
print(foo.qsize())  # 1L
print(foo.empty())  # True
Run Code Online (Sandbox Code Playgroud)

但是,如果我使用sleep函数time,因为在执行中会导致按时间顺序延迟.有用.

from time import sleep
from multiprocessing import Queue, Lock

foo = Queue()
locker = Lock()

foo.put("bar")

sleep(0.01)

print(foo.empty())  # False
print(foo.empty())  # False
print(foo.empty())  # False
print(foo.qsize())  # 1L
print(foo.empty())  # False
Run Code Online (Sandbox Code Playgroud)

我知道我的另一种选择是.qsize() > 0表达,但我确信我只是以错误的方式做这件事.

我究竟做错了什么?

*编辑*

我现在明白这是不可靠的,谢谢@Mathias Ettinger.任何干净的选择?我需要知道热才能可靠地判断我Queue是否是空的.

pil*_*row 7

不幸的是,Queue的复杂实现意味着.empty()不能保证立即正确,即使.qsize()是.

由于.qsize() 支持你的平台(这是不是真的无处不在),你可以重新实现.empty()来讲检查.qsize(),这会为你工作:

# mp.Queue() is a function, not a class, so we need to find the true class
# to subclass
import multiprocessing.queues

class XQueue(multiprocessing.queues.Queue):
    def empty(self):
        try:
            return self.qsize() == 0
        except NotImplementedError:  # OS X -- see qsize() implementation
            return super(XQueue, self).empty()
Run Code Online (Sandbox Code Playgroud)

在引擎盖下,Queue .put()将您的对象添加到缓冲区并增加进程间信号量,而隐藏的守护进程线程负责排空缓冲区并将其内容序列化为管道.(消费者然后.get()通过读取此管道.)因此,这就是为什么在您的示例中工作的原因:守护程序线程有足够的时间在调用之前将对象从内存缓冲区移动到I/O表示.empty().

顺便说一句,我发现这种行为非常令人惊讶.通常,当我们说队列的大小方法(空/满/计数)是"不可靠"时,我们的意思是它们准确且一致,但可能会立即失效,因为另一个生产者或消费者可能已经改变了队列.(Queue.Queue例如,大多数单进程多线程队列,例如python ,都是"不可靠的",但在这种意义上是一致的.)在这种情况下,默认.empty()方法实际上可能与队列状态不一致.


409*_*ict 6

按照该文件,既没有empty(),full()也不qsize()是可靠的.

替代方案包括:

  • 阅读通过以下内容的确切数量的项目Queue:

    AMT = 8
    for _ in range(AMT):
        queue.put('some stuff')
    
    for _ in range(AMT):
        print(queue.get())
    
    Run Code Online (Sandbox Code Playgroud)

    如果您事先知道必须处理多少项目或每个线程将处理多少项,这将非常有用.

  • 在监护人出现之前阅读物品:

    num_threads = 8
    guardian = 'STUFF DONE'
    
    while num_threads:
        item = queue.get()
        if item == guardian:
            num_threads -= 1
        else:
            process(item)
    
    Run Code Online (Sandbox Code Playgroud)

    如果每个线程都有可变的工作量(并且您事先不知道总数),这会有用,但可以确定何时完成.