我应该尽可能使用并行流吗?

Mat*_*ann 455 java parallel-processing java-8 java-stream

使用Java 8和lambdas,可以很容易地将集合作为流进行迭代,并且易于使用并行流.来自docs的两个例子,第二个使用parallelStream:

myShapesCollection.stream()
    .filter(e -> e.getColor() == Color.RED)
    .forEach(e -> System.out.println(e.getName()));

myShapesCollection.parallelStream() // <-- This one uses parallel
    .filter(e -> e.getColor() == Color.RED)
    .forEach(e -> System.out.println(e.getName()));
Run Code Online (Sandbox Code Playgroud)

只要我不关心顺序,使用并行是否总是有益的?人们会认为将更多核心的工作划分得更快.

还有其他考虑因素吗?什么时候应该使用并行流?什么时候应该使用非并行?

(这个问题被要求引发关于如何以及何时使用并行流的讨论,而不是因为我认为总是使用它们是一个好主意.)

JB *_*zet 655

与顺序流相比,并行流具有更高的开销.协调线程需要花费大量时间.我默认使用顺序流,只考虑并行流

  • 我有大量要处理的项目(或者每个项目的处理需要时间并且可以并行化)

  • 我首先遇到了性能问题

  • 我还没有在多线程环境中运行该进程(例如:在Web容器中,如果我已经有很多请求并行处理,在每个请求中添加一个额外的并行层可能会产生更多负面影响而不是正面影响)

在您的示例中,无论如何,性能将由同步访问驱动System.out.println(),并且使此过程并行将不起作用,甚至是否定的.

此外,请记住,并行流不能神奇地解决所有同步问题.如果进程中使用的谓词和函数使用共享资源,则必须确保所有内容都是线程安全的.特别是,如果你并行,副作用是你真正需要担心的事情.

无论如何,衡量,不要猜!只有测量才能告诉您并行性是否值得.

  • 好答案.我想补充一点,如果你有大量的项目需要处理,那只会增加线程协调问题; 只有当每个项目的处理需要时间并且可并行化时,并行化可能是有用的. (15认同)
  • @WarrenDew我不同意.Fork/Join系统将简单地将N个项目拆分为例如4个部分,并按顺序处理这4个部分.然后将减少4个结果.如果真的很大,即使是快速的单元处理,并行化也是有效的.但一如既往,你必须衡量. (12认同)
  • @Harshana他显然意味着将按顺序处理4个部分中的每个部分的元素.但是,可以同时处理部件本身.换句话说,如果您有多个可用的CPU核心,则每个部分可以独立于其他部分在其自己的核心上运行,同时按顺序处理自己的元素.(注意:我不知道,如果这是并行Java流的工作方式,我只想澄清JBNizet的含义.) (2认同)

Bri*_*etz 228

Stream API旨在使编写计算变得容易,这些计算方式从执行方式中抽象出来,使顺序和并行之间的切换变得容易.

然而,仅仅因为它简单,并不意味着它总是一个好主意,事实上,仅仅因为你可以放弃所有地方是一个主意.parallel().

首先,请注意,并行性除了在有更多内核可用时更快执行的可能性之外没有任何好处.并行执行总是涉及比顺序执行更多的工作,因为除了解决问题之外,它还必须执行子任务的调度和协调.希望通过分解多个处理器的工作,您将能够更快地得到答案; 这是否真的发生取决于很多事情,包括你的数据集的大小,你在每个元素上做了多少计算,计算的性质(具体来说,一个元素的处理是否与其他元素的处理相互作用?) ,可用处理器的数量,以及竞争这些处理器的其他任务的数量.

此外,请注意并行性也经常暴露计算中的非确定性,这通常是通过顺序实现隐藏的; 有时这无关紧要,或者可以通过约束所涉及的操作来缓解(即,减少运算符必须是无状态和关联的.)

实际上,有时并行性会加速你的计算,有时它不会,有时它甚至会减慢它的速度.最好先使用顺序执行开发,然后应用并行性,其中(A)您知道实际上有益于提高性能,(B)它实际上会提高性能.(A)是业务问题,而不是技术问题.如果您是性能专家,您通常可以查看代码并确定(B),但智能路径是衡量.(而且,在你确信(A)之前不要打扰;如果代码足够快,最好在其他地方应用你的大脑周期.)

最简单的并行性能模型是"NQ"模型,其中N是元素的数量,Q是每个元素的计算.通常,在开始获得性能优势之前,您需要产品NQ超过某个阈值.对于像"将数字从1加到N"这样的低Q问题,您通常会看到N = 1000和N = 10000之间的盈亏平衡.对于Q值较高的问题,您会看到在较低阈值处出现断层现象.

但现实非常复杂.因此,在您获得专业知识之前,首先要确定顺序处理何时实际上是在为您付出代价,然后衡量并行性是否会有所帮助.

  • 本文详细介绍了NQ模型:http://gee.cs.oswego.edu/dl/html/StreamParallelGuidance.html (14认同)
  • @Pacerier这是一个很好的理论,但遗憾的是天真(参见30年来尝试构建自动并行编译器的历史).由于当我们不可避免地弄错了时,正确地猜测不足以惹恼用户是不切实际的,负责任的事情就是让用户说出他们想要的东西.对于大多数情况,默认(顺序)是正确的,更可预测. (5认同)
  • "首先,请注意,当更多内核可用时,并行性没有提供更快执行的可能性" - 或者如果您正在应用涉及IO的操作(例如`myListOfURLs.stream().map((url) - > downloadPage(url))...`). (4认同)
  • @specializt:将流从顺序切换到并行*确实*改变算法(在大多数情况下).这里提到的决定论是关于你的(任意)运算符*可能依赖的属性(Stream实现不能知道),但当然*不应该依赖.这就是这个答案的那部分试图说的.如果你关心规则,就像你说的那样,你可以有一个确定的结果(否则并行流是没用的),但也有可能有意允许非确定性,比如使用`findAny`而不是`findFirst `... (3认同)
  • @Jules:永远不要对 IO 使用并行流。它们仅用于 CPU 密集型操作。并行流使用`ForkJoinPool.commonPool()`,你不希望阻塞任务去那里。 (2认同)

Ram*_*tra 58

我观看了Brian Goetz (Java语言架构师和Lambda Expressions规范负责人)的演讲之一.他在进行并行化之前详细解释了以下4个要点:

分裂/分解成本
- 有时分裂比仅仅做工作更昂贵!
任务调度/管理成本
- 可以在将工作交给另一个线程所花费的时间内完成大量工作.
结果组合成本
- 有时组合涉及复制大量数据.例如,添加数字很便宜,而合并集很昂贵.
地方
- 房间里的大象.这是每个人都可能错过的重点.您应该考虑缓存未命中,如果CPU由于缓存未命中而等待数据,那么您将无法通过并行获得任何内容.这就是为什么基于数组的源并行化最好的原因,因为下一个索引(在当前索引附近)被缓存,并且CPU遇到缓存未命中的可能性更小.

他还提到了一个相对简单的公式来确定并行加速的可能性.

NQ型号:

N x Q > 10000
Run Code Online (Sandbox Code Playgroud)

其中,
N =数据项数量
Q =每个项目的工作量

  • 布莱恩实际上回答了这个问题:) (24认同)
  • “每项工作量”以什么单位衡量?10000 代表什么? (6认同)

edh*_*ned 12

JB击中了头部.我唯一可以补充的是,Java8不进行纯粹的并行处理,它确实是顺序的.是的,我写了这篇文章,我已经做了三十年的F/J,所以我确实理解了这个问题.

  • 老实说:你的整篇论文看起来像一个大规模,精心策划的咆哮 - 而且几乎否定了它的可信度......我建议用一个不那么激进的低调来重新做这件事,否则没有多少人会费心去完全阅读它...我只是说 (14认同)
  • 流不可迭代,因为流执行内部迭代而不是外部迭代.无论如何,这就是溪流的全部原因.如果您在学术工作方面遇到问题,那么函数式编程可能不适合您.函数式编程=== math === academic.不,J8-FJ没有被打破,只是大多数人都不读f******手册.java文档非常清楚地表明它不是并行执行框架.这就是所有分裂者的全部原因.是的它是学术性的,是的,如果你知道如何使用它,它是有效的.是的,使用自定义执行程序应该更容易 (10认同)
  • 这个线程是从 2013 年开始的,从那时起发生了很多变化。此部分用于评论而不是详细答案。 (2认同)

tkr*_*use 6

切勿并行化有限制的无限流。发生的情况如下:

    public static void main(String[] args) {
        // let's count to 1 in parallel
        System.out.println(
            IntStream.iterate(0, i -> i + 1)
                .parallel()
                .skip(1)
                .findFirst()
                .getAsInt());
    }
Run Code Online (Sandbox Code Playgroud)

结果

    Exception in thread "main" java.lang.OutOfMemoryError
        at ...
        at java.base/java.util.stream.IntPipeline.findFirst(IntPipeline.java:528)
        at InfiniteTest.main(InfiniteTest.java:24)
    Caused by: java.lang.OutOfMemoryError: Java heap space
        at java.base/java.util.stream.SpinedBuffer$OfInt.newArray(SpinedBuffer.java:750)
        at ...
Run Code Online (Sandbox Code Playgroud)

如果你使用也一样.limit(...)

这里的解释: Java 8,在流中使用 .parallel 会导致 OOM 错误

同样,如果流是有序的并且具有比您想要处理的元素多得多的元素,则不要使用并行,例如

public static void main(String[] args) {
    // let's count to 1 in parallel
    System.out.println(
            IntStream.range(1, 1000_000_000)
                    .parallel()
                    .skip(100)
                    .findFirst()
                    .getAsInt());
}
Run Code Online (Sandbox Code Playgroud)

这可能会运行更长时间,因为并行线程可能会在大量数字范围上工作,而不是关键的 0-100,导致这需要很长时间。