Ghe*_*Ace 5 python multithreading python-2.7 python-multiprocessing
我有这个使用模块中的Queue
类的代码片段multiprocess
.我很困惑,.empty()
一个实例的方法Queue
并没有像我期望的那样给我一个正确的值.这是我的代码:
from time import sleep
from multiprocessing import Queue, Lock
foo = Queue()
locker = Lock()
with locker: # even with this, still True
foo.put("bar")
print(foo.empty()) # True, obviously not
print(foo.empty()) # True
print(foo.empty()) # True
print(foo.qsize()) # 1L
print(foo.empty()) # True
Run Code Online (Sandbox Code Playgroud)
但是,如果我使用sleep
函数time
,因为在执行中会导致按时间顺序延迟.有用.
from time import sleep
from multiprocessing import Queue, Lock
foo = Queue()
locker = Lock()
foo.put("bar")
sleep(0.01)
print(foo.empty()) # False
print(foo.empty()) # False
print(foo.empty()) # False
print(foo.qsize()) # 1L
print(foo.empty()) # False
Run Code Online (Sandbox Code Playgroud)
我知道我的另一种选择是.qsize() > 0
表达,但我确信我只是以错误的方式做这件事.
我究竟做错了什么?
*编辑*
我现在明白这是不可靠的,谢谢@Mathias Ettinger.任何干净的选择?我需要知道热才能可靠地判断我Queue
是否是空的.
不幸的是,Queue的复杂实现意味着.empty()
不能保证立即正确,即使.qsize()
是.
由于.qsize()
是支持你的平台(这是不是真的无处不在),你可以重新实现.empty()
来讲检查.qsize()
,这会为你工作:
# mp.Queue() is a function, not a class, so we need to find the true class
# to subclass
import multiprocessing.queues
class XQueue(multiprocessing.queues.Queue):
def empty(self):
try:
return self.qsize() == 0
except NotImplementedError: # OS X -- see qsize() implementation
return super(XQueue, self).empty()
Run Code Online (Sandbox Code Playgroud)
在引擎盖下,Queue .put()
将您的对象添加到缓冲区并增加进程间信号量,而隐藏的守护进程线程负责排空缓冲区并将其内容序列化为管道.(消费者然后.get()
通过读取此管道.)因此,这就是为什么在您的示例中工作的原因:守护程序线程有足够的时间在调用之前将对象从内存缓冲区移动到I/O表示.empty()
.
顺便说一句,我发现这种行为非常令人惊讶.通常,当我们说队列的大小方法(空/满/计数)是"不可靠"时,我们的意思是它们准确且一致,但可能会立即失效,因为另一个生产者或消费者可能已经改变了队列.(Queue.Queue
例如,大多数单进程多线程队列,例如python ,都是"不可靠的",但在这种意义上是一致的.)在这种情况下,默认.empty()
方法实际上可能与队列状态不一致.
按照该文件,既没有empty()
,full()
也不qsize()
是可靠的.
替代方案包括:
阅读通过以下内容的确切数量的项目Queue
:
AMT = 8
for _ in range(AMT):
queue.put('some stuff')
for _ in range(AMT):
print(queue.get())
Run Code Online (Sandbox Code Playgroud)
如果您事先知道必须处理多少项目或每个线程将处理多少项,这将非常有用.
在监护人出现之前阅读物品:
num_threads = 8
guardian = 'STUFF DONE'
while num_threads:
item = queue.get()
if item == guardian:
num_threads -= 1
else:
process(item)
Run Code Online (Sandbox Code Playgroud)
如果每个线程都有可变的工作量(并且您事先不知道总数),这会有用,但可以确定何时完成.