Java 8流串行和并行性能

Ami*_*ani 10 java performance java-8 java-stream

在我的机器上,下面的程序打印:

OptionalLong[134043]
 PARALLEL took 127869 ms
OptionalLong[134043]
 SERIAL took 60594 ms
Run Code Online (Sandbox Code Playgroud)

我不清楚为什么串行执行程序比并行执行程序要快.我把两个程序-Xms2g -Xmx2g放在一个8gb相对安静的盒子上.有人可以澄清最新情况吗?

import java.util.stream.LongStream;
import java.util.stream.LongStream.Builder;

public class Problem47 {

    public static void main(String[] args) {

        final long startTime = System.currentTimeMillis();
        System.out.println(LongStream.iterate(1, n -> n + 1).parallel().limit(1000000).filter(n -> fourConsecutives(n)).findFirst());
        final long endTime = System.currentTimeMillis();
        System.out.println(" PARALLEL took " +(endTime - startTime) + " ms");

        final long startTime2 = System.currentTimeMillis();
        System.out.println(LongStream.iterate(1, n -> n + 1).limit(1000000).filter(n -> fourConsecutives(n)).findFirst());
        final long endTime2 = System.currentTimeMillis();
        System.out.println(" SERIAL took " +(endTime2 - startTime2) + " ms");
    }

    static boolean fourConsecutives(final long n) {
        return distinctPrimeFactors(n).count() == 4 &&
                distinctPrimeFactors(n + 1).count() == 4 &&
                distinctPrimeFactors(n + 2).count() == 4 &&
                distinctPrimeFactors(n + 3).count() == 4;
    }

    static LongStream distinctPrimeFactors(long number) {
        final Builder builder = LongStream.builder();
        final long limit = number / 2;
        long n = number;
        for (long i = 2; i <= limit; i++) {
            while (n % i == 0) {
                builder.accept(i);
                n /= i;
            }
        }
        return builder.build().distinct();
    }

}
Run Code Online (Sandbox Code Playgroud)

Bri*_*etz 18

我们可以更容易并行执行,但我们不一定能使并行性变得容易.

代码中的罪魁祸首是limit + parallel的组合.实现limit()对于顺序流来说是微不足道的,但对并行流来说相当昂贵.这是因为限制操作的定义与流的遭遇顺序相关联.具有limit()的流通常比顺序流更慢,除非每个元素完成的计算非常高.

您选择的流源也限制了并行性.使用iterate(0, n->n+1)给你正整数,但从iterate根本上是连续的; 在计算第(n-1)个元素之前,不能计算第n个元素.因此,当我们尝试拆分此流时,我们最终会分裂(首先,休息).尝试使用range(0,k); 这更好地分裂,通过随机访问的一半整齐地分裂.

  • 哦,你也应该使用像JMH这样不错的微型线束.您的代码会显示所有经典的微基准测试错误(预热不足,取决于去优化偏差等.否则数字不是真正有意义的. (4认同)
  • 我刚刚注意到你的流源使用了iterate(),它也基本上是顺序的(在你计算#(n-1)之前不能计算元素#n)因此会分裂得很差.尝试用范围(0,100000)替换iterate(). (2认同)

Hol*_*ger 13

虽然Brian Goetz对你的设置是正确的,例如你应该使用.range(1, 1000000)而不是.iterate(1, n -> n + 1).limit(1000000)你的基准测试方法非常简单,我想强调一点:

即使在修复这些问题之后,即使使用挂钟和TaskManager,您也可以看到出现了问题.在我的机器上,操作大约需要半分钟,您可以看到并行性在大约两秒后降至单核.即使一个专门的基准测试工具可以产生不同的结果,除非你想一直在基准工具中运行你的最终应用程序,否则无关紧要......

现在我们可以尝试更多地模拟您的设置,或者告诉您应该学习有关Fork/Join框架的特殊内容,就像实现者在讨论列表中所做的那样.

或者我们尝试另一种实现:

ExecutorService es=Executors.newFixedThreadPool(
                       Runtime.getRuntime().availableProcessors());
AtomicLong found=new AtomicLong(Long.MAX_VALUE);
LongStream.range(1, 1000000).filter(n -> found.get()==Long.MAX_VALUE)
    .forEach(n -> es.submit(()->{
        if(found.get()>n && fourConsecutives(n)) for(;;) {
            long x=found.get();
            if(x<n || found.compareAndSet(x, n)) break;
        }
    }));
es.shutdown();
try { es.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); }
catch (InterruptedException ex) {throw new AssertionError(ex); }
long result=found.get();
System.out.println(result==Long.MAX_VALUE? "not found": result);
Run Code Online (Sandbox Code Playgroud)

在我的机器上,它完成了我所期望的并行执行只需稍微多一点?sequential time?/?number of cpu cores?.不改变您的fourConsecutives实现中的任何内容.

最重要的是,至少在处理单个项目需要很长时间时,当前Stream实现(或底层的Fork/Join框架)存在问题,如本相关问题中已经讨论的那样.如果你想要可靠的并行性,我建议使用经过验证和测试ExecutorService的.正如您在我的示例中所看到的,它并不意味着删除Java 8功能,它们很好地配合在一起.Stream.parallel应谨慎使用引入的自动并行性(鉴于当前的实现).

  • 因此,您的方法可能适用于此问题,但一般情况下(当任务无法轻易地先切成相等大小的任务时),您会发现ExecutorService中的共享工作队列很快成为争用瓶颈.您描述的方法仅适用于最粗粒度的并行性. (2认同)