Java8 Parallel Stream花费时间对值进行求和

Abh*_*mar 4 java parallel-processing out-of-memory java-8 java-stream

我正在练习java8并行流部分并编写一个程序,它将从0传递给参数的数字加到该数字.

例如,如果我通过10,它将从1到10的数字相加并返回输出.

以下是该计划

public class ParellelStreamExample {



    public static void main(String[] args) {
        System.out.println("Long Range value - "+ Long.MIN_VALUE + " to "+ Long.MAX_VALUE);
        long startTime = System.nanoTime();
        long sum = sequentailSum(100000000);
        System.out.println(
                "Time in sequential execution " + (System.nanoTime() - startTime) / 1000000 + " msec with sum = " + sum);
        long startTime1 = System.nanoTime();
        long sum1 = parellelSum(100000000);
        System.out.println("Time in parallel execution " + (System.nanoTime() - startTime1) / 1000000
                + " msec with sum = " + sum1);

    }

    private static Long parellelSum(long n) {
        return Stream.iterate(1l, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);
    }

    private static Long sequentailSum(long n) {
        return Stream.iterate(1l, i -> i + 1).limit(n).reduce(0L, Long::sum);
    }
}
Run Code Online (Sandbox Code Playgroud)

我收到的输出是

Long Range value - -9223372036854775808 to 9223372036854775807
Time in sequential execution 1741 msec with sum = 5000000050000000

Exception in thread "main" java.lang.OutOfMemoryError
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
    at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
    at java.util.stream.SliceOps$1.opEvaluateParallelLazy(SliceOps.java:155)
    at java.util.stream.AbstractPipeline.sourceSpliterator(AbstractPipeline.java:431)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:474)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.parellelSum(ParellelStreamExample.java:21)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.main(ParellelStreamExample.java:14)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.lang.Long.valueOf(Long.java:840)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.lambda$0(ParellelStreamExample.java:21)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample$$Lambda$3/250421012.apply(Unknown Source)
    at java.util.stream.Stream$1.next(Stream.java:1033)
    at java.util.Spliterators$IteratorSpliterator.trySplit(Spliterators.java:1784)
    at java.util.stream.AbstractShortCircuitTask.compute(AbstractShortCircuitTask.java:114)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Run Code Online (Sandbox Code Playgroud)

为什么这个程序不会并行运行部分和gc开销,而是它应该在并行部分运行得更快,因为它使用fork/join框架并通过线程进行处理.

它出了什么问题?

Mis*_*sha 6

这里有几件事情出了问题.

  1. 您正在尝试使用System.nanoTime()JMH之类的代码来对代码进行基准测试.
  2. 你试图在一个简单的计算(sum)上Long而不是使用一个简单的计算LongStream.如果JVM无法摆脱装箱,那么指针追逐的开销很容易淹没并行性的好处.
  3. 您正在尝试对由此产生的继承顺序流进行分析iterate.流框架将尝试通过缓冲流并将其分派给多个线程来执行您所要求的操作,这会增加大量开销.
  4. 您正在使用limit有序并行流.这要求流框架执行大量额外同步,以确保使用确切的n第一个元素来生成结果.您将看到,如果您放入.unordered()并行流,执行时间将显着减少,但结果将是非确定性的,因为您将获得一些 n元素的总和而不是必然的第一 n元素.

正确的方法是使用JMH并替换iterate(...).limit(...)LongStream.rangeClosed(1, n)