为什么Java Stream生成器无序?

Ale*_*rev 10 java java-stream

我尝试将一些工作与Java Streams并行化.让我们考虑一下这个简单的例子:

Stream.generate(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return generateNewInteger();
        }
    })
    .parallel()
    .forEachOrdered(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) {
            System.out.println(integer);
        }
    });
Run Code Online (Sandbox Code Playgroud)

问题是它没有调用accept方法forEachOrdered,它只在我使用时才有效forEach.我想问题是Stream.generate内部创造的InfiniteSupplyingSpliterator没有ORDERED特征.

问题是为什么?看起来我们知道数据的生成顺序.第二个问题是如何forEachOrdered在生成流元素的情况下对并行化流进行操作?

Hol*_*ger 11

最简单的答案是Stream.generate无序的,因为它的规范是这样说的.

这并不是说如果实现尝试在可能的情况下按顺序处理项目,那实际上恰恰相反.一旦操作被定义为无序,实施将尽可能从无序性中获益.如果您在无序操作中遇到类似于源订单的内容,则可能无法从无序处理中获益,或者实现尚未使用所有机会.由于这可能会在将来的版本或替代实现中发生变化,因此如果已将操作指定为无序,则不得依赖订单.

定义Stream.generate为无序的意图在与有序的比较时可能会变得更加清晰Stream.iterate.传递给的函数iterate将接收其前一个元素,因此元素之间存在先前的后续关系,因此是一个排序.传递的供应商Stream.generate没有收到前一个元素,换句话说,在仅考虑功能签名时与前一个元素没有关系.这适用于Stream.generate(() -> constant)Stream.generate(Type::new)类似用例,但较少Stream.generate(instance::statefulOp),这似乎不是预期的主要用例.它仍然有效,如果操作是线程安全的,你可以忍受流的无序性质.

你的例子永远不会取得进展的原因是forEachOrdered实际上并没有考虑无序性质,而是试图在遇到顺序分裂后处理块,即所有子任务都尝试缓冲它们的元素,以便它们可以通过一旦他们左边的子任务完成,他们就会采取行动.当然,缓冲和无限源不能很好地结合在一起,特别是因为底层InfiniteSupplyingSpliterator将分裂成自己无限的子任务.原则上,最左边的任务可以将其元素直接提供给操作,但是任务似乎在队列中的某个位置,等待激活,这将永远不会发生,因为所有工作线程已经忙于处理其他无限子-任务.最终,整个操作将打破OutOfMemoryError,如果你让它运行足够长...