如果并行处理,为什么在一个无限的数字流中按素数进行过滤?

wwe*_*ner 11 concurrency java-8 java-stream

我正在创建一个从200万开始的无限整数流,使用朴素素数测试实现过滤此流以生成负载并将结果限制为10.

Predicate<Integer> isPrime = new Predicate<Integer>() {
    @Override
    public boolean test(Integer n) {
        for (int i = 2; i < n; i++) {
            if (n % i == 0) return false;   
        }
        return true;
    }
};

Stream.iterate(200_000_000, n -> ++n)
    .filter(isPrime)
    .limit(10)
    .forEach(i -> System.out.print(i + " "));
Run Code Online (Sandbox Code Playgroud)

这按预期工作.

现在,如果我在过滤之前添加对parallel()的调用,则不会产生任何内容并且处理无法完成.

Stream.iterate(200_000_000, n -> ++n)
    .parallel()
    .filter(isPrime)
    .limit(10)
    .forEach(i -> System.out.print(i + " "));
Run Code Online (Sandbox Code Playgroud)

有人能指出我在这里发生的事情的正确方向吗?

编辑:我不是在寻找更好的素性测试实现(它旨在成为一个长期运行的实现),而是为了解释使用并行流的负面影响.

Tag*_*eev 12

处理实际上已完成,但可能需要很长时间,具体取决于计算机上的硬件线程数.有关限制的API文档警告说,并行流可能会很慢.

实际上,并行流首先根据可用的并行级别将计算分成几个部分,对每个部分执行计算,然后将结果连接在一起.你的任务有多少部分?每个常见的FJP线程(= Runtime.getRuntime().availableProcessors())加上(有时?)一个当前线程,如果它不在FJP中.你可以控制它的添加

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");
Run Code Online (Sandbox Code Playgroud)

实际上,对于您的任务,您设置的数字越小,计算的速度就越快.

如何拆分无限任务?您的特定任务由IteratorSpliterator处理,trySplit方法从1024开始创建不断增加的大小块.您可以自己尝试:

Spliterator<Integer> spliterator = Stream.iterate(200_000_000, n -> ++n).spliterator();
Spliterator[] spliterators = new Spliterator[10];
for(int i=0; i<spliterators.length; i++) {
    spliterators[i] = spliterator.trySplit();
}
for(int i=0; i<spliterators.length; i++) {
    System.out.print((i+1)+": ");
    spliterators[i].tryAdvance(System.out::println);
}       
Run Code Online (Sandbox Code Playgroud)

所以第一个块处理范围为200000000-200001023的数字,第二个处理范围为200001024-200003071的数字,依此类推.如果您只有1个硬件线程,则您的任务将被拆分为两个块,因此将检查3072.如果您有8个硬件线程,则您的任务将被拆分为9个块,并将检查46080个数字.只有在处理完所有块之后,并行计算才会停止.将任务分成如此大块的启发式方法在你的情况下效果不好,但你会看到性能提升,该区域的素数在数千个数字中出现一次.

可能您的特定场景可以在内部进行优化(即,如果第一个线程发现已经达到限制条件,则停止计算).随意向Java bug跟踪器报告错误.


在Stream API中挖掘更多内容后更新我得出结论,当前行为是一个错误,引发了一个问题并发布了一个补丁.这个补丁可能会被JDK9接受,甚至可能会向后移植到JDK 8u分支.使用我的补丁,并行版本仍然无法提高性能,但至少其工作时间与顺序流工作时间相当.