具有并行度= 1的串行和并行执行之间的差异

rad*_*pet 10 java multithreading java-stream

请提供给我参考,为什么使用Java Stream API的以下两个析因实现之间的执行时间会有显着差异:

  1. 串行实施
  2. 在自定义派生连接池中执行的并行实现(使用Stream.parallel()),并行度设置为1

我的期望是接近执行时间,但是并行版本的速度提高了2倍。我没有运行任何专门的基准测试,但是即使在冷启动的jvm中,执行时间也不应有太大差异。在下面,我附上两个实现的源代码:

public class FastFactorialSupplier implements FactorialSupplier {
  private final ExecutorService executorService;

  public FastFactorialSupplier(ExecutorService executorService) {
      this.executorService = executorService;
  }

  @Override
  public BigInteger get(long k) {
      try {
          return executorService
                  .submit(
                          () -> LongStream.range(2, k + 1)
                                  .parallel()
                                  .mapToObj(BigInteger::valueOf)
                                  .reduce(BigInteger.ONE, (current, factSoFar) -> factSoFar.multiply(current))
                  )
                  .get();
      } catch (InterruptedException | ExecutionException e) {
          e.printStackTrace();
      }

      return BigInteger.ZERO;
  }
}
Run Code Online (Sandbox Code Playgroud)
public class MathUtils {

  public static BigInteger factorial(long k) {
      return LongStream.range(2, k + 1)
              .mapToObj(BigInteger::valueOf)
              .reduce(BigInteger.ONE, (current, factSoFar) -> factSoFar.multiply(current));
  }
}

Run Code Online (Sandbox Code Playgroud)

这是基于intellij junit运行器显示的带有附加的示例执行时间的测试用例作为注释。

    @Test
    public void testWithoutParallel() {
        //2s 403
        runTest(new DummyFactorialSupplier()); // uses MathUtils.factorial
    }

    @Test
    public void testParallelismWorkStealing1() {
        //1s 43
        runTest(new FastFactorialSupplier(Executors.newWorkStealingPool(1)));
    }

    @Test
    public void testParallelismForkJoin1() {
        // 711ms
        runTest(new FastFactorialSupplier(new ForkJoinPool(1)));
    }

    @Test
    public void testExecutorForkJoin() {
        //85ms
        runTest(new FastFactorialSupplier(new ForkJoinPool()));
    }

    private void runTest(FactorialSupplier factorialSupplier) {
        BigInteger result = factorialSupplier.get(100000);
        assertNotNull(result);
//        assertEquals(456574, result.toString().length());
    }
Run Code Online (Sandbox Code Playgroud)

用java 11日以来出现了在Java 8定制一个问题叉连接池测试运行- https://bugs.openjdk.java.net/browse/JDK-8190974

是否可以进行与伪并行处理有关的优化以及执行的调度方式,而鉴于执行纯粹是顺序执行,则没有这样的优化吗?

编辑:

我也使用jmh运行microbenchmark

平行:

public class FastFactorialSupplierP1Test {

    @Benchmark
    @BenchmarkMode({Mode.AverageTime, Mode.SampleTime, Mode.SingleShotTime, Mode.Throughput, Mode.All})
    @Fork(value = 1, warmups = 1)
    public void measure() {
        runTest(new FastFactorialSupplier(new ForkJoinPool(1)));
    }

    private void runTest(FactorialSupplier factorialSupplier) {
        BigInteger result = factorialSupplier.get(100000);
        assertNotNull(result);
    }

    public static void main(String[] args) throws Exception {
        org.openjdk.jmh.Main.main(args);
    }
}
Run Code Online (Sandbox Code Playgroud)

序列号:

public class SerialFactorialSupplierTest {
    @Benchmark
    @BenchmarkMode({Mode.AverageTime, Mode.SampleTime, Mode.SingleShotTime, Mode.Throughput, Mode.All})
    @Fork(value = 1, warmups = 1)
    public void measure() {
        runTest(new DummyFactorialSupplier());
    }

    private void runTest(FactorialSupplier factorialSupplier) {
        BigInteger result = factorialSupplier.get(100000);
        assertNotNull(result);
    }

    public static void main(String[] args) throws Exception {
        org.openjdk.jmh.Main.main(args);
    }
}
Run Code Online (Sandbox Code Playgroud)
public class IterativeFactorialTest {
    @Benchmark
    @BenchmarkMode({Mode.AverageTime, Mode.SampleTime, Mode.SingleShotTime, Mode.Throughput, Mode.All})
    @Fork(value = 1, warmups = 1)
    public void measure() {
        runTest(new IterativeFact());
    }

    private void runTest(FactorialSupplier factorialSupplier) {
        BigInteger result = factorialSupplier.get(100000);
        assertNotNull(result);
    }

    public static void main(String[] args) throws Exception {
        org.openjdk.jmh.Main.main(args);
    }

    class IterativeFact implements FactorialSupplier {

        @Override
        public BigInteger get(long k) {
            BigInteger result = BigInteger.ONE;

            while (k-- != 0) {
                result = result.multiply(BigInteger.valueOf(k));
            }

            return result;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

结果:

FastFactorialSupplierP1Test.measure                    avgt    5  0.437 ± 0.006   s/op
IterativeFactorialTest.measure                         avgt    5  2.643 ± 0.383   s/op
SerialFactorialSupplierTest.measure                    avgt    5  2.226 ± 0.044   s/op
Run Code Online (Sandbox Code Playgroud)

Hol*_*ger 5

您选择了一个性能取决于评估顺序的操作。仅考虑性能BigInteger.multiply取决于两个因素的大小。然后,遍历一系列BigInteger实例,这些实例的累加值是下次乘法的一个因素,这会使操作变得越来越昂贵,而且距离您越远。

相反,将值范围分成较小的范围,对每个范围分别进行乘法,然后将范围的结果相乘,即使没有同时评估这些子范围,也可以获得性能优势。

因此,当并行流将工作拆分为多个块,以供其他工作线程潜在地使用,但最终在同一线程中对其进行评估时,由于更改了评估顺序,因此在此特定设置中,您仍然可以获得性能上的提高。

我们可以通过删除所有与Stream和线程池相关的工件来进行测试:

public static BigInteger multiplyAll(long from, long to, int split) {
    if(split < 1 || to - from < 2) return serial(from, to);
    split--;
    long middle = (from + to) >>> 1;
    return multiplyAll(from, middle, split).multiply(multiplyAll(middle, to, split));
}

private static BigInteger serial(long l1, long l2) {
    BigInteger bi = BigInteger.valueOf(l1++);
    for(; l1 < l2; l1++) {
        bi = bi.multiply(BigInteger.valueOf(l1));
    }
    return bi;
}
Run Code Online (Sandbox Code Playgroud)

我没有手头的JMH设置来发布压力较大的结果,但是一个简单的运行显示,数量级与您的结果相符,仅一个分割就已经将评估时间大致减少了一半,而更高的数字仍然可以改善性能,尽管曲线变得讨人喜欢。

如中所述ForkJoinTask.html#getSurplusQueuedTaskCount(),这是一种合理的策略,将工作分割成每个工作人员还有一些其他任务,可能由其他线程承担,这可能补偿不平衡的工作负载,例如,如果某些元素的处理成本比其他元素便宜。显然,并行流没有用于处理没有其他工作线程的情况的特殊代码,因此,即使只有一个线程来处理它,您也可以看到拆分工作的效果。