在Fork / Join上下文中,Phaser和CyclicBarrier

Ade*_*lin 7 java multithreading

在尝试了解Phaser和CyclicBarrier之间的区别时,我遇到了一些链接 。Phaser和CyclicBarrierhttps://www.infoq.com/news/2008/07/phasers/ 之间的区别 我读到Phaser与Fork /兼容。如果不是CyclicBarrier,则加入接口,这是演示此代码的代码:

移相器

 public static void main(String[] args) throws InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(1);

        Phaser phaser = new Phaser(16){
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                return phase ==1 || super.onAdvance(phase, registeredParties);
            }
        };

        System.out.println("Available Processors: "+Runtime.getRuntime().availableProcessors());

        ExecutorService executorService = ForkJoinPool.commonPool(); // Runtime.getRuntime().availableProcessors() -1

        for (int i = 0; i < 16; i++) {
            final int count = 0;
            executorService.submit(() -> {
                while (!phaser.isTerminated()) {
                    try {
                        Thread.sleep(ThreadLocalRandom.current().nextInt(300, 2000));
                        System.out.println(Thread.currentThread().getName() + count + " ... ");
                        phaser.arriveAndAwaitAdvance();
                        System.out.println(Thread.currentThread().getName() + count + " ... continues ... ");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
    }
Run Code Online (Sandbox Code Playgroud)

循环屏障

public static void main(String[] args) throws InterruptedException {

        AtomicInteger phases = new AtomicInteger();
        CountDownLatch  countDownLatch = new CountDownLatch(1);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(16, () -> phases.incrementAndGet());

        ExecutorService executorService = ForkJoinPool.commonPool();

        for (int i = 0; i < 16; i++) {
            executorService.submit(() -> {
                while (phases.get() < 1) {
                    try {
                        Thread.sleep(ThreadLocalRandom.current().nextInt(300, 2000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    try {
                        System.out.println(Thread.currentThread().getName() + " Ok, I am waiting ");

                        cyclicBarrier.await();

                        System.out.println(Thread.currentThread().getName() + " continued it's way ... ");
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
    }
Run Code Online (Sandbox Code Playgroud)

说明:

这两个代码运行一个fork / join线程池,这意味着这些线程是守护程序线程,这就是为什么我使用CountDownLatch。该方法commonPool()将创建一个线程池,其中线程等于Runtime.getRuntime().availableProcessors(),我的线程数为12,因此它将创建12个线程。两个示例中的Phaser和CyclicBarrier都定义了16个参与方,即,他们需要16次调用await(),在循环障碍中和arriveAndAwaitAdvance()Phaser中继续。

在使用相位器的示例中,当第12个线程被阻止时,fork / join将产生更多线程,它将创建更多线程,因此相位器最终将终止。但是,使用CyclicBarrier时,第12个线程到达时await()程序停止且永不前进,则挂起。显然,由于屏障需要进行16次调用,才能使线程前进,而创建的线程只能进行12次调用。线程池不会像使用相位器那样创建更多线程来推进CyclicBarrier。

问题:

fork / join如何通过Phaser而不是CyclicBarrier设法创建更多线程?为什么这些方法arriveAndAwaitAdvance()使线程池创建新线程,以及如何使线程池await()创建更多线程?

Ale*_*aux 3

Phaser 之所以能够做到这一点,是因为它ForkJoinPool.managedBlock(ManagedBlocker)在阻塞线程时会在内部调用。

ForkJoinPool 的这个 API 可供任何人访问,因此您可以轻松增强CyclicBarrier版本以使用它,并消除线程饥饿。例如,有以下氛围的东西:

ForkJoinPool.managedBlock(new ManagedBlocker() {

    boolean isReleasable = false;

    @Override
    public boolean block() throws InterruptedException {
        try {
            cyclicBarrier.await();
        } catch (BrokenBarrierException aE) {
            throw new IllegalStateException(aE);
        }
        return isReleasable = true;
    }

    @Override
    public boolean isReleasable() {
        return isReleasable;
    }
});
Run Code Online (Sandbox Code Playgroud)