我知道文档说该对象是线程安全的,但这是否意味着所有方法对它的所有访问都是线程安全的?因此,如果我一次从多个线程调用put()并在同一个实例上调用(),那么会发生什么不好的事情吗?
我问,因为这个答案让我第二次猜测:https: //stackoverflow.com/a/22006181/4164238
Chr*_*s K 47
快速回答是肯定的,它们是线程安全的.但是不要把它留在那里......
首先是一个小房子保持,BlockingQueue
是一个接口,任何非线程安全的实现将打破记录的合同.您包含的链接指的是它LinkedBlockingQueue
,它有一些聪明.
你包含的链接是一个有趣的观察,是的,其中有两个锁LinkedBlockingQueue
.然而,它无法理解"简单"实现可能违反的边缘情况实际上正在被处理,这就是为什么take和put方法比最初期望的更复杂.
LinkedBlockingQueue
优化以避免在读取和写入时使用相同的锁定,这减少了争用,但是对于正确的行为,它依赖于队列不为空.当队列中包含元素时,推送和弹出点不在同一内存区域,并且可以避免争用.但是当队列为空时,无法避免争用,因此需要额外的代码来处理这种常见的"边缘"情况.这是代码复杂性和性能/可伸缩性之间的常见折衷.
接下来的问题是,如何LinkedBlockingQueue
知道队列何时为空/非空并因此处理线程呢?答案是它使用a AtomicInteger
和a Condition
作为两个额外的并发数据结构.所述AtomicInteger
用于检查队列的长度是否是零,并且条件是用于等待一个信号时,队列可能是所希望的状态,以通知一个等待的线程.这种额外的协调确实有开销,但是在测量中已经表明,当增加并发线程的数量时,这种技术的开销低于使用单个锁引入的争用.
下面我复制了代码LinkedBlockingQueue
并添加了解释它们如何工作的注释.在较高级别,take()
首先锁定所有其他呼叫take()
,然后put()
根据需要发出信号. put()
以类似的方式工作,首先它阻止所有其他调用put()
,然后take()
在必要时发出信号.
从put()
方法:
// putLock coordinates the calls to put() only; further coordination
// between put() and take() follows below
putLock.lockInterruptibly();
try {
// block while the queue is full; count is shared between put() and take()
// and is safely visible between cores but prone to change between calls
// a while loop is used because state can change between signals, which is
// why signals get rechecked and resent.. read on to see more of that
while (count.get() == capacity) {
notFull.await();
}
// we know that the queue is not full so add
enqueue(e);
c = count.getAndIncrement();
// if the queue is not full, send a signal to wake up
// any thread that is possibly waiting for the queue to be a little
// emptier -- note that this is logically part of 'take()' but it
// has to be here because take() blocks itself
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
Run Code Online (Sandbox Code Playgroud)
从 take()
takeLock.lockInterruptibly();
try {
// wait for the queue to stop being empty
while (count.get() == 0) {
notEmpty.await();
}
// remove element
x = dequeue();
// decrement shared count
c = count.getAndDecrement();
// send signal that the queue is not empty
// note that this is logically part of put(), but
// for thread coordination reasons is here
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
Run Code Online (Sandbox Code Playgroud)