嵌套的Java 8并行forEach循环表现不佳.这种行为有望吗?

Chr*_*ies 19 java parallel-processing concurrency java-8 java-stream

注意:我已经在另一个SO帖子中解决了这个问题 - 在嵌套的Java 8并行流动作中使用信号量可能是DEADLOCK.这是一个错误吗? - 但是这篇文章的标题表明问题与使用信号量有关 - 这有点分散了讨论的注意力.我正在创建这个,以强调嵌套循环可能有性能问题 - 虽然这两个问题可能是一个共同的原因(也许是因为我花了很多时间来弄清楚这个问题).(我不认为它是重复的,因为它强调另一种症状 - 但如果你只是删除它).

问题:如果嵌套两个Java 8 stream.parallel().forEach循环并且所有任务都是独立的,无状态的等等 - 除了提交到公共FJ池 - 然后在并行循环内嵌套并行循环执行得更差而不是在并行循环内嵌套顺序循环.更糟糕的是:如果同步包含内循环的操作,您将获得DEADLOCK.

演示性能问题

如果没有"同步",您仍然可以观察到性能问题.您可以在以下网址找到演示代码:http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java (有关更详细的说明,请参阅JavaDoc).

我们的设置如下:我们有一个嵌套的stream.parallel().forEach().

  • 内环是独立的(无状态,无干扰等 - 除了使用公共池之外)并且在最坏的情况下总共消耗1秒,即如果处理顺序.
  • 外循环的一半任务在该循环之前消耗10秒.
  • 在该循环之后,一半消耗10秒.
  • 因此,每个线程总共消耗11秒(最坏情况).*我们有一个布尔值,允许将内部循环从parallel()切换到sequential().

现在:将24个外循环任务提交给具有并行性的池8我们期望24/8*11 =最多33秒(在8核或更好的机器上).

结果是:

  • 内部顺序循环:33秒.
  • 内部并行循环:> 80秒(我有92秒).

问题:你能证实这种行为吗?这是人们对框架的期望吗?(我现在更加小心,声称这是一个错误,但我个人认为这是由于ForkJoinTask的实现中的一个错误.备注:我已将此发布到并发兴趣(请参阅http:// cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html),但到目前为止我没有得到确认).

证明了僵局

以下代码将为DEADLOCK

    // Outer loop
    IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
        doWork();
        synchronized(this) {
            // Inner loop
            IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
                doWork();
            });
        }
    });
Run Code Online (Sandbox Code Playgroud)

其中numberOfTasksInOuterLoop = 24,numberOfTasksInInnerLoop = 240,outerLoopOverheadFactor = 10000doWork一些无状态的CPU刻录机.

您可以在http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachAndSynchronization.java中找到完整的演示代码 (有关更详细的说明,请参阅JavaDoc).

这种行为有望吗?请注意,有关Java并行流的文档未提及嵌套或同步的任何问题.此外,未提及使用公共fork-join-pool的事实.

更新

关于性能问题的另一个测试可以在http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachBenchmark.java找到 - 这个测试没有任何阻塞操作(没有线程) .sleep和not synchronized).我在这里写了一些更多的评论:http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

更新2

似乎这个问题和更严重的带有信号量的DEADLOCK已在Java8 u40中得到修复.

Hol*_*ger 5

问题是你配置的相当有限的并行性被外部流处理吃掉了:如果你说你需要8个线程并处理超过8个项目的流,parallel()它将创建8个工作线程并让它们处理项目.

然后在您的消费者中,您正在处理另一个流,parallel()但没有剩余工作线程.由于工作线程被阻塞等待内部流处理的结束,因此ForkJoinPool必须创建违反配置的并行性的新工作线程.在我看来,它不会回收这些延伸线程,但让它们在处理后立即死亡.因此,在内部处理过程中,会创建和处理新线程,这是一项昂贵的操作.

你可能会认为它是一个缺陷,启动线程不会对并行流处理的计算做出贡献,只是等待结果,但即使修复了这个问题,你仍然会遇到一个很难解决的常见问题(如果有的话):

每当工作线程数与外部流项目之间的比率较低时,实现将全部用于外部流,因为它不知道该流是外部流.因此,并行执行内部流请求的工作线程数多于可用数量.使用调用程序线程为计算做出贡献可以通过性能等于串行计算的方式来修复它,但是在这里获得并行执行的优势并不能与固定数量的工作线程的概念一起使用.

请注意,此处您正在研究此问题的表面,因为您有相当平衡的项目处理时间.如果内部项目和外部项目的处理分歧(与同一级别的项目相比),则问题将更加严重.


更新:通过分析和查看代码,似乎ForkJoinPool 确实尝试使用等待线程进行"工作窃取",但使用不同的代码取决于是否Thread是工作线程或其他线程的事实.结果,一个工作线程实际上正在等待大约80%的时间并且做很少甚至没有工作,而其他线程确实有助于计算......


更新2:为了完整性,这里是注释中描述的简单并行执行方法.由于它将每个项目排入队列,因此当单个项目的执行时间相当小时,它会产生很大的开销.所以它不是一个复杂的解决方案,而是一个可以处理长时间运行的任务而不需要太多魔法的演示......

import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.*;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class NestedParallelForEachTest1 {
    static final boolean isInnerStreamParallel = true;

    // Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
    static final int numberOfTasksInOuterLoop = 24;  // In real applications this can be a large number (e.g. > 1000).
    static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
    static final int concurrentExecutionsLimitForStreams = 8;

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println(System.getProperty("java.version")+" "+System.getProperty("java.home"));
        new NestedParallelForEachTest1().testNestedLoops();
        E.shutdown();
    }

    final static ThreadPoolExecutor E = new ThreadPoolExecutor(
        concurrentExecutionsLimitForStreams, concurrentExecutionsLimitForStreams,
        2, TimeUnit.MINUTES, new SynchronousQueue<>(), (r,e)->r.run() );

    public static void parallelForEach(IntStream s, IntConsumer c) {
        s.mapToObj(i->E.submit(()->c.accept(i))).collect(Collectors.toList())
         .forEach(NestedParallelForEachTest1::waitOrHelp);
    }
    static void waitOrHelp(Future f) {
        while(!f.isDone()) {
            Runnable r=E.getQueue().poll();
            if(r!=null) r.run();
        }
        try { f.get(); }
        catch(InterruptedException ex) { throw new RuntimeException(ex); }
        catch(ExecutionException eex) {
            Throwable t=eex.getCause();
            if(t instanceof RuntimeException) throw (RuntimeException)t;
            if(t instanceof Error) throw (Error)t;
            throw new UndeclaredThrowableException(t);
        }
    }
    public void testNestedLoops(NestedParallelForEachTest1 this) {
        long start = System.nanoTime();
        // Outer loop
        parallelForEach(IntStream.range(0,numberOfTasksInOuterLoop), i -> {
            if(i < 10) sleep(10 * 1000);
            if(isInnerStreamParallel) {
                // Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
                parallelForEach(IntStream.range(0,numberOfTasksInInnerLoop), j -> sleep(10));
            }
            else {
                // Inner loop as sequential
                IntStream.range(0,numberOfTasksInInnerLoop).sequential().forEach(j -> sleep(10));
            }
            if(i >= 10) sleep(10 * 1000);
        });
        long end = System.nanoTime();
        System.out.println("Done in "+TimeUnit.NANOSECONDS.toSeconds(end-start)+" sec.");
    }
    static void sleep(int milli) {
        try {
            Thread.sleep(milli);
        } catch (InterruptedException ex) {
            throw new AssertionError(ex);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)