为什么不能并行减少流的流?/ stream已经被操作或关闭

Cer*_*ber 2 java parallel-processing java-8 java-stream

上下文

我偶然发现了一个相当烦人的问题:我有一个包含大量数据源的程序,它能够传输相同类型的元素,并且我想"映射"程序中的每个可用元素(元素顺序不物).

因此我试图将我Stream<Stream<T>> streamOfStreamOfT;简化为简单的Stream<T> streamOfT;使用streamOfT = streamOfStreamOfT.reduce(Stream.empty(), Stream::concat);

由于元素顺序对我来说并不重要,我尝试将reduce操作并行化为.parallel():streamOfT = streamOfStreamOfT.parallel().reduce(Stream.empty(), Stream::concat);但是这会触发一个java.lang.IllegalStateException: stream has already been operated upon or closed

要亲自体验它,只需通过注释/取消注释即可使用以下主要内容(java 1.8u20) .parallel()

public static void main(String[] args) {
    // GIVEN
    List<Stream<Integer>> listOfStreamOfInts = new ArrayList<>();
    for (int j = 0; j < 10; j++) {
        IntStream intStreamOf10Ints = IntStream.iterate(0, i -> i + 1)
                .limit(10);
        Stream<Integer> genericStreamOf10Ints = StreamSupport.stream(
                intStreamOf10Ints.spliterator(), true);
        listOfStreamOfInts.add(genericStreamOf10Ints);
    }
    Stream<Stream<Integer>> streamOfStreamOfInts = listOfStreamOfInts
            .stream();
    // WHEN
    Stream<Integer> streamOfInts = streamOfStreamOfInts
            // ////////////////
            // PROBLEM
            //    |
            //    V
            .parallel()
            .reduce(Stream.empty(), Stream::concat);

    // THEN
    System.out.println(streamOfInts.map(String::valueOf).collect(
            joining(", ")));
}
Run Code Online (Sandbox Code Playgroud)

有人可以解释这个限制吗?/找到一种更好的方法来处理流的并行减少


编辑1

继@Smutje和@LouisWasserman评论之后,似乎这.flatMap(Function.identity())是一个容忍.parallel()流的更好的选择

Bri*_*etz 7

reduce您正在使用的形式采用身份和关联组合功能.但这Stream.empty()不是一个价值; 它有州.流不是数组或集合之类的数据结构; 它们是通过可能并行的聚合操作来推送数据的载体,它们具有一些状态(比如流是否被消耗.)想想它是如何工作的; 你要构建一棵树,其中同一个"空"流出现在多个叶子中.当你尝试两次使用这个有状态的非同一性(它不会顺序发生,但会并行发生)时,第二次尝试遍历那个空流时,它将被正确地看作已被使用.

所以问题是,你只是reduce错误地使用这种方法.问题不在于并行性; 简单来说,并行性暴露了潜在的问题.

其次,即使按照你认为应该的方式"工作",你也只能建立代表扁平流的树的并行性; 当你去加入时,那是一个连续的流管道.糟糕!

第三,即使你按照你认为的方式"工作",你也会通过建立连接流来增加很多元素访问开销,而你却不会从你正在寻求的并行性中获益.

简单的答案是平整流:

String joined = streamOfStreams.parallel()
                               .flatMap(s -> s)
                               .collect(joining(", "));
Run Code Online (Sandbox Code Playgroud)