为什么 LogWriter 中的竞争条件会导致生产者阻塞?【实践中的并发】

gst*_*low 5 java concurrency multithreading shutdown blockingqueue

首先,为了防止不喜欢读到最后的人将问题标记为重复,我已经阅读了“生产者-消费者日志服务”和“关闭问题的不可靠方式” 。但它并没有完全回答问题,并且回答与书本相矛盾。

在书中提供了以下代码:

public class LogWriter {
    private final BlockingQueue<String> queue;
    private final LoggerThread logger;
    private static final int CAPACITY = 1000;

    public LogWriter(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>(CAPACITY);
        this.logger = new LoggerThread(writer);
    }

    public void start() {
        logger.start();
    }

    public void log(String msg) throws InterruptedException {
        queue.put(msg);
    }

    private class LoggerThread extends Thread {
        private final PrintWriter writer;

        public LoggerThread(Writer writer) {
            this.writer = new PrintWriter(writer, true); // autoflush
        }

        public void run() {
            try {
                while (true)
                    writer.println(queue.take());
            } catch (InterruptedException ignored) {
            } finally {
                writer.close();
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

现在我们应该了解如何停止这个过程。我们应该停止记录但不应该跳过已经提交的消息。

作者研究方法:

public void log(String msg) throws InterruptedException {
     if(!shutdownRequested)
           queue.put(msg);
     else
           throw new IllegalArgumentException("logger is shut down");
 }
Run Code Online (Sandbox Code Playgroud)

并像这样评论:

另一种关闭 LogWriter 的方法是设置“关闭请求”标志以防止提交更多消息,如代码清单 7.14 所示。然后消费者可以在收到关闭请求的通知后排空队列,写出任何挂起的消息并解除阻塞在 log 中的任何生产者。但是,这种方法具有竞争条件,使其不可靠。日志的实现是一个检查然后行动的序列:生产者可以观察到服务尚未关闭,但在关闭后仍将消息排队,同样存在生产者可能在日志中被阻塞并且永远不会被解除阻塞的风险。有一些技巧可以减少这种可能性(比如让消费者在宣布队列耗尽之前等待几秒钟),

这句话对我来说已经够难了。

我明白那个

 if(!shutdownRequested)
           queue.put(msg);
Run Code Online (Sandbox Code Playgroud)

不是原子的,关闭后可以将消息添加到队列中。是的,它不是很准确,但我没有看到问题。队列将被排空,当队列为空时,我们可以停止 LoggerThread。尤其不明白为什么生产者会被屏蔽

作者没有提供完整的代码,因此我无法理解所有细节。我相信这本书是社区大多数人阅读的,这个例子有详细的解释。

请用完整的代码示例进行解释。

CKi*_*ing 5

首先要理解的是,当请求关闭时,生产者需要停止接受更多请求,而消费者(LoggerThread在这种情况下)需要排空队列。您在问题中提供的代码仅展示了故事的一方面;生产者拒绝任何进一步的请求时shutdownRequestedtrue。在这个例子之后,作者继续说:

然后消费者可以在收到关闭请求的通知后排空队列,写出任何挂起的消息并解除阻塞在日志中的任何生产者

首先,queue.takeLoggerThread如图你的问题会阻止无限新的消息在队列中供; 然而,如果我们想关闭LoggerThread(优雅地),我们需要确保关闭代码在为真LoggerThread时有机会执行,shutdownRequested而不是被无限阻塞queue.take

当作者说消费者可以排空队列时,他的意思是LogWritter可以检查shutdownRequested如果为真,则可以调用非阻塞的drainTo方法将队列中的当前内容排空到单独的集合中,而不是调用queue.take(或调用类似的非阻塞方法)。Alternatey,如果shutdownRequested为false,则LogWriter可以queue.take照常继续调用。

这种方法的真正问题是log方法(由生产者调用)的实现方式。由于它不是原子的,因此多个线程可能会错过设置shutdownRequested为 true 的情况。如果错过这个更新的线程数比越大,会发生什么CAPACITYqueue。我们再来看看log方法。(为了解释,添加了花括号):

public void log(String msg) throws InterruptedException {
     if(!shutdownRequested) {//A. 1001 threads see shutdownRequested as false and pass the if condition.

           //B. At this point, shutdownRequested is set to true by client code
           //C. Meanwhile, the LoggerThread which is the consumer sees that shutdownRequested is true and calls 
           //queue.drainTo to drain all existing messages in the queue instead of `queue.take`. 
           //D. Producers insert the new message into the queue.    
           queue.put(msg);//Step E
     } else
           throw new IllegalArgumentException("logger is shut down");
     }
}
Run Code Online (Sandbox Code Playgroud)

步骤 E所示,putLoggerThread完成排空队列并退出 w 时,多个生产者线程可能会调用。在第 1000 个线程调用之前应该没有问题put。真正的问题是当第1001 个线程调用put. 它将阻塞,因为队列容量只有 1000,并且LoggerThread可能不再处于活动状态或订阅该queue.take方法。