Java中最快的循环同步是什么(ExecutorService与CyclicBarrier vs.X)?

15 java performance multithreading executorservice cyclicbarrier

哪个Java同步构造可能为具有固定数量线程的并发迭代处理场景提供最佳性能,如下所述?在我自己试验了一段时间后(使用ExecutorService和CyclicBarrier)并对结果感到有些惊讶,我会感谢一些专家建议和一些新想法.这里的现有问题似乎并不主要关注绩效,因此这个新问题.提前致谢!

该应用程序的核心是一个简单的迭代数据处理算法,与Mac Pro上8个内核的计算负载并行,运行OS X 10.6和Java 1.6.0_07.要处理的数据被分成8个块,每个块被送到Runnable,由固定数量的线程之一执行.并行化算法是相当简单的,它在功能上按预期工作,但它的性能还不是我认为的可能.该应用程序似乎花了很多时间在系统调用同步,所以经过一些分析后,我想知道我是否选择了最合适的同步机制.

该算法的一个关键要求是它需要分阶段进行,因此线程需要在每个阶段结束时进行同步.主线程准备工作(非常低的开销),将它传递给线程,让它们处理它,然后在所有线程完成后继续,重新安排工作(再次非常低的开销)并重复循环.机器专用于此任务,通过使用预分配项的每线程池来最小化垃圾收集,并且可以修复线程数(没有传入请求等,每个CPU核心只有一个线程).

V1 - ExecutorService

我的第一个实现使用了一个带有8个工作线程的ExecutorService.该程序创建8个任务来完成工作,然后让他们处理它,大致如下:

// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
    // package data into 8 work items
    ...

    // create one Callable task per work item
    ...

    // submit the Callables to the worker threads
    executorService.invokeAll( taskList );
}
Run Code Online (Sandbox Code Playgroud)

这在功能上运行良好(它做它应该做的事情),对于非常大的工作项,确实所有8个CPU都变得高负载,就像预期允许的处理算法一样(一些工作项将比其他工作项完成得更快,然后空闲) .但是,随着工作项变小(并且实际上不受程序控制),用户CPU负载急剧缩小:

blocksize | system | user | cycles/sec
256k        1.8%    85%     1.30
64k         2.5%    77%     5.6
16k         4%      64%     22.5
4096        8%      56%     86
1024       13%      38%     227
256        17%      19%     420
64         19%      17%     948
16         19%      13%     1626
Run Code Online (Sandbox Code Playgroud)

图例: - 块大小=工作项的大小(=计算步骤) - 系统=系统负载,如OS X活动监视器(红色条)所示 - 用户=用户加载,如OS X活动监视器(绿色条)所示 - cycles/sec =通过主while循环的迭代,越多越好

这里关注的主要领域是在系统中花费的高百分比时间,这似乎是由线程同步调用驱动的.正如所料,对于较小的工作项,ExecutorService.invokeAll()将需要相对更多的努力来同步线程与每个线程中正在执行的工作量.但是,由于ExecutorService比这个用例需要的更通用(如果有多个任务而不是内核,它可以为线程排队任务),我可能会有一个更精简的同步构造.

V2 - CyclicBarrier

下一个实现使用CyclicBarrier在接收工作之前和完成之后同步线程,大致如下:

main() {
    // create the barrier
    barrier = new CyclicBarrier( 8 + 1 );

    // create Runable for thread, tell it about the barrier
    Runnable task = new WorkerThreadRunnable( barrier );

    // start the threads
    for( int i = 0; i < 8; i++ )
    {
        // create one thread per core
        new Thread( task ).start();
    }

    while( ... ) {
        // tell threads about the work
        ...

        // N threads + this will call await(), then system proceeds
        barrier.await();

        // ... now worker threads work on the work...

        // wait for worker threads to finish
        barrier.await();
    }
}

class WorkerThreadRunnable implements Runnable {
    CyclicBarrier barrier;

    WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }

    public void run()
    {
        while( true )
        {
            // wait for work
            barrier.await();

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

同样,这在功能上运行良好(它做它应该做的),对于非常大的工作项,确实所有8个CPU都变得高负载,如前所述.但是,随着工作项目变小,负载仍然急剧缩小:

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.7%     78%    6.1
16k         5.5%     52%    25
4096        9%       29%    64
1024       11%       15%    117
256        12%        8%    169
64         12%        6.5%  285
16         12%        6%    377
Run Code Online (Sandbox Code Playgroud)

对于大型工作项,同步可忽略不计,性能与V1相同.但出乎意料的是,(高度专业化)CyclicBarrier的结果似乎比(通用)ExecutorService的结果要好得多:吞吐量(周期/秒)仅为V1的1/4左右.初步结论是,即使这似乎是CyclicBarrier的广告理想用例,它的执行情况比通用的ExecutorService差得多.

V3 - 等待/通知+ CyclicBarrier

似乎值得尝试用简单的等待/通知机制替换第一个循环障碍await():

main() {
    // create the barrier
    // create Runable for thread, tell it about the barrier
    // start the threads

    while( ... ) {
        // tell threads about the work
        // for each: workerThreadRunnable.setWorkItem( ... );

        // ... now worker threads work on the work...

        // wait for worker threads to finish
        barrier.await();
    }
}

class WorkerThreadRunnable implements Runnable {
    CyclicBarrier barrier;
    @NotNull volatile private Callable<Integer> workItem;

    WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }

    final protected void
    setWorkItem( @NotNull final Callable<Integer> callable )
    {
        synchronized( this )
        {
            workItem = callable;
            notify();
        }
    }

    public void run()
    {
        while( true )
        {
            // wait for work
            while( true )
            {
                synchronized( this )
                {
                    if( workItem != NO_WORK ) break;

                    try
                    {
                        wait();
                    }
                    catch( InterruptedException e ) { e.printStackTrace(); }
                }
            }

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

再次,这在功能上运作良好(它做它应该做的).

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.4%     80%    6.3
16k         4.6%     60%    30.1
4096        8.6%     41%    98.5
1024       12%       23%    202
256        14%       11.6%  299
64         14%       10.0%  518
16         14.8%      8.7%  679
Run Code Online (Sandbox Code Playgroud)

小工作项的吞吐量仍然比ExecutorService差,但大约是CyclicBarrier的2倍.消除一个CyclicBarrier消除了一半的差距.

V4 - 忙等待而不是等待/通知

由于这个应用程序是在系统上运行的主要应用程序,并且如果它们不忙于工作项目,核心空闲,为什么不尝试忙于等待每个线程中的工作项,即使这会不必要地旋转CPU.工作线程代码更改如下:

class WorkerThreadRunnable implements Runnable {
    // as before

    final protected void
    setWorkItem( @NotNull final Callable<Integer> callable )
    {
        workItem = callable;
    }

    public void run()
    {
        while( true )
        {
            // busy-wait for work
            while( true )
            {
                if( workItem != NO_WORK ) break;
            }

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

功能也很好(它做它应该做的).

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.2%     81%    6.3
16k         4.2%     62%     33
4096        7.5%     40%    107
1024       10.4%     23%    210
256        12.0%    12.0%   310
64         11.9%    10.2%   550
16         12.2%     8.6%   741
Run Code Online (Sandbox Code Playgroud)

对于小型工作项,与CyclicBarrier +等待/通知变体相比,这会使吞吐量进一步提高10%,这并非无关紧要.但使用ExecutorService,它的吞吐量仍远低于V1.

V5 - ?

那么这种(可能并非罕见的)问题的最佳同步机制是什么?我厌倦了编写自己的同步机制来完全替换ExecutorService(假设它太通用了,必须有一些东西仍然可以取出来使其更高效).这不是我的专业领域,我担心我会花很多时间调试它(因为我甚至不确定我的等待/通知和忙等待变体是否正确)以获得不确定的收益.

任何建议将不胜感激.

小智 6

看起来你似乎不需要工人之间的任何同步.也许你应该考虑使用Java 7中提供的ForkJoin框架,以及一个单独的库.一些链接:


小智 1

更新:V5 - 所有线程中的忙等待(到目前为止似乎是最佳的)

由于所有核心都专用于此任务,因此似乎值得尝试简单地消除所有复杂的同步构造并在所有线程中的每个同步点处进行繁忙等待。事实证明,这大大击败了所有其他方法。

设置如下:从上面的V4开始(CyclicBarrier + Busy Wait)。将 CyclicBarrier 替换为 AtomicInteger,主线程在每个周期将其重置为零。每个完成其工作的 Runnable 工作线程都会将原子整数加一。主线程忙等待:

while( true ) {
    // busy-wait for threads to complete their work
    if( atomicInt.get() >= workerThreadCount ) break;
}
Run Code Online (Sandbox Code Playgroud)

只启动了 7 个工作线程,而不是 8 个(因为所有线程,包括主线程,现在几乎完全加载了一个核心)。结果如下:

blocksize | system | user | cycles/sec
256k        1.0%     98%       1.36
64k         1.0%     98%       6.8
16k         1.0%     98%      44.6
4096        1.0%     98%     354
1024        1.0%     98%    1189
256         1.0%     98%    3222
64          1.5%     98%    8333
16          2.0%     98%   16129
Run Code Online (Sandbox Code Playgroud)

在工作线程中使用等待/通知会将吞吐量减少到该解决方案的大约 1/3。