BlockingQueue的实现:SynchronousQueue和LinkedBlockingQueue之间有什么区别

nan*_*nda 32 java collections concurrency java.util.concurrent

我看到BlockingQueue的这些实现并无法理解它们之间的差异.我的结论到目前为止:

  1. 我不会需要SynchronousQueue
  2. LinkedBlockingQueue确保FIFO,必须使用参数true创建BlockingQueue以使其成为FIFO
  3. SynchronousQueue打破了大多数集合方法(包含,大小等)

那我什么时候需要SynchronousQueue?这种实现的性能是否优于LinkedBlockingQueue

为了使它更复杂...为什么当其他人(Executors.newSingleThreadExecutorExecutors.newFixedThreadPool)使用LinkedBlockingQueue 时,Executors.newCachedThreadPool使用SynchronousQueue ?

编辑

第一个问题解决了.但我仍然不明白为什么Executors.newCachedThreadPool使用SynchronousQueue时其他人(Executors.newSingleThreadExecutorExecutors.newFixedThreadPool)使用LinkedBlockingQueue?

我得到的是,使用SynchronousQueue,如果没有自由线程,生产者将被阻止.但由于线程数实际上是无限的(如果需要,将创建新线程),这将永远不会发生.那为什么要使用SynchronousQueue呢?

axt*_*avt 52

SynchronousQueue是一种非常特殊的队列 - 它实现了一种集合方法(生成器等待消费者准备就绪,消费者等到生产者准备就绪)后面的界面Queue.

因此,只有在需要特定语义的特殊情况下才需要它,例如,单线程处理任务而不排队进一步的请求.

使用的另一个原因SynchronousQueue是性能.实现SynchronousQueue似乎是大大优化的,所以如果你不需要任何东西而不是一个集合点(Executors.newCachedThreadPool()如果消费者是按需创建的,那么队列项不会累积),你可以通过使用获得性能提升SynchronousQueue.

简单的综合测试表明,在一个简单的单一生产者 - 双核机器吞吐量的单个消费者场景比SynchronousQueue吞吐量LinkedBlockingQueueArrayBlockingQueue队列长度= 1 高约20倍.当队列长度增加时,它们的吞吐量增加并且几乎达到吞吐量SynchronousQueue.这意味着SynchronousQueue与其他队列相比,多核计算机上的同步开销较低.但同样,只有在你需要一个伪装成会合点的特定情况下才有意义Queue.

编辑:

这是一个测试:

public class Test {
    static ExecutorService e = Executors.newFixedThreadPool(2);
    static int N = 1000000;

    public static void main(String[] args) throws Exception {    
        for (int i = 0; i < 10; i++) {
            int length = (i == 0) ? 1 : i * 5;
            System.out.print(length + "\t");
            System.out.print(doTest(new LinkedBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new ArrayBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new SynchronousQueue<Integer>(), N));
            System.out.println();
        }

        e.shutdown();
    }

    private static long doTest(final BlockingQueue<Integer> q, final int n) throws Exception {
        long t = System.nanoTime();

        e.submit(new Runnable() {
            public void run() {
                for (int i = 0; i < n; i++)
                    try { q.put(i); } catch (InterruptedException ex) {}
            }
        });    

        Long r = e.submit(new Callable<Long>() {
            public Long call() {
                long sum = 0;
                for (int i = 0; i < n; i++)
                    try { sum += q.take(); } catch (InterruptedException ex) {}
                return sum;
            }
        }).get();
        t = System.nanoTime() - t;

        return (long)(1000000000.0 * N / t); // Throughput, items/sec
    }
}    
Run Code Online (Sandbox Code Playgroud)

这是我机器上的结果:

在此输入图像描述


Jed*_*ith 5

目前,默认Executors(ThreadPoolExecutor基于)可以使用一组预先创建的固定大小和BlockingQueue一些大小的线程用于任何溢出,或者创建最大大小的线程(如果(并且仅当)该队列已满).

这导致了一些令人惊讶的特性.例如,由于仅在达到队列容量时才创建其他线程,因此使用LinkedBlockingQueue(无限制)意味着即使当前池大小为零,也永远不会创建新线程.如果您使用a,ArrayBlockingQueue则仅在新线程已满时创建新线程,并且如果池尚未清空空间,则有可能会拒绝后续作业.

A SynchronousQueue具有零容量,因此生产者阻塞直到消费者可用,或者创建一个线程.这意味着尽管@axtavt产生了令人印象深刻的数字,但从生产者的角度来看,缓存的线程池通常具有最差的性能.

不幸的是,目前还没有一个很好的库版本的折衷实现,它将在突发或活动期间创建线程,从最低的最低值开始.你有一个可成长的游泳池或一个固定的游泳池.我们内部有一个,但还没有为公众消费做好准备.

  • 你是什​​么意思你使用LinkedBlockingQueue缓存线程池?它们都使用相同的ThreadPoolExecutor,并且当且仅当(a)不是已经运行的corePoolSize线程时才创建线程(b)不是可用于处理作业的线程(c)当作业被拒绝时提供给队列和(d)尚未达到maximumPoolSize.否则它会拒绝.因此,只有在使用LinkedBlockingQueue进行构造时,才能获得超出核心大小的线程创建,而LinkedBlockingQueue的max-capacity远小于Integer.MAX_VALUE.它只会在队列满时才会增长. (2认同)