为什么flatMap()之后的filter()在Java流中"不完全"懒惰?

Vad*_*dar 70 java lambda java-8 java-stream

我有以下示例代码:

System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);
System.out.println("-----------");
System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);
Run Code Online (Sandbox Code Playgroud)

输出如下:

1
Result: 1
-----------
-1
0
1
0
1
2
1
2
3
Result: -1
Run Code Online (Sandbox Code Playgroud)

从这里我看到,在第一种情况下stream真的表现得懒惰 - 我们使用findFirst()所以一旦我们有第一个元素我们的过滤lambda没有被调用.然而,在使用flatMaps的第二种情况下,我们看到尽管找到满足过滤条件的第一个元素(它只是任何第一个元素,因为lambda总是返回true),流的其他内容仍然通过过滤函数被馈送.

我试图理解为什么它表现得像这样,而不是在第一个元素计算后放弃,如第一种情况.任何有用的信息将不胜感激.

Hol*_*ger 55

TL; DR,这已在JDK-8075939中得到解决,并在Java 10中得到修复.

在查看implementation(ReferencePipeline.java)时,我们看到了方法[ link ]

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}
Run Code Online (Sandbox Code Playgroud)

将调用以进行findFirst操作.需要注意的特殊事情是sink.cancellationRequested()允许在第一场比赛中结束循环.与[ link ] 比较

@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
    Objects.requireNonNull(mapper);
    // We can do better than this, by polling cancellationRequested when stream is infinite
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    try (Stream<? extends R> result = mapper.apply(u)) {
                        // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
                        if (result != null)
                            result.sequential().forEach(downstream);
                    }
                }
            };
        }
    };
}
Run Code Online (Sandbox Code Playgroud)

用于推进一个项目的方法最终调用forEach子流而没有任何提前终止的可能性,并且该flatMap方法开始时的评论甚至告知该缺失特征.

因为这不仅仅是一个优化的东西,因为它意味着代码只是在子流无限时断开,我希望开发人员很快证明他们"可以做得比这更好"......


为了说明这些含义,虽然Stream.iterate(0, i->i+1).findFirst()按预期工作,但Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst()最终会陷入无限循环.

关于规范,大部分都可以在

包规范的"流操作和流水线"一章:

...

中间操作返回一个新流.他们总是懒惰 ;

...

...懒惰还允许在没有必要时避免检查所有数据; 对于诸如"查找超过1000个字符的第一个字符串"之类的操作,只需要检查足够的字符串以找到具有所需特征的字符串,而不检查源中可用的所有字符串.(当输入流是无限的而不仅仅是大的时候,这种行为变得更加重要.)

...

此外,一些操作被认为是短路操作.如果在呈现无限输入时,它可能产生有限流,则中间操作是短路的.如果在呈现无限输入时它可以在有限时间内终止,则终端操作是短路的.在流水线中进行短路操作是处理无限流以在有限时间内正常终止的必要但不充分的条件.

很明显,短路操作不能保证有限的时间终止,例如,当过滤器与处理无法完成的任何项目不匹配时,但是通过简单忽略而不支持在有限时间内终止任何项目的实现操作的短路特性远远不符合规范.

  • 这个bug.虽然规范可能支持这种行为,但是没有人希望获得无限流的第一个元素会抛出StackOverflowError,或者最终会进入无限循环,无论它是直接来自管道源还是通过映射函数从嵌套流中获取.这应该报告为错误. (25认同)
  • @Marko Topolnik:属性"在执行管道的终端操作之前不会开始"并不否定延迟操作的其他属性.我知道所讨论的财产没有单句宣告,否则我引用了它.在[Stream`API doc](http://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html)中,据说"Streams很懒; 源数据的计算仅在启动终端操作时执行,并且*源元素仅在需要时消耗*. (7认同)
  • 您可能会再次质疑,这意味着对于短路有一个懒惰的执行保证,但是,我倾向于反过来看待它:在任何情况下都没有说实现可以像我们在这里看到的那样自由地实现非惰性行为.关于允许的内容和不允许的内容,规范非常详尽. (6认同)
  • @Vadym S. Khondar:提交错误报告是一个好主意.关于为什么之前没有人发现这一点,我看到很多"不敢相信我是第一个注意到这种"之前的错误.除非涉及无限流,否则此错误仅具有性能影响,在许多用例中可能会被忽视. (5认同)
  • @Eugene:目前[JDK-8075939](https://bugs.openjdk.java.net/browse/JDK-8075939)似乎没有任何活动.注意[此解决方法](http://stackoverflow.com/a/32767282/2711488)... (4认同)
  • [JDK-8075939](https://bugs.openjdk.java.net/browse/JDK-8075939)现在取得进展.有关core-libs-dev审阅线程和第一个webrev的链接,请参阅http://mail.openjdk.java.net/pipermail/core-libs-dev/2017-December/050763.html.它适用于我们将在Java 10中看到它. (4认同)
  • @StefanZobel 看起来它也已经 [backported](https://bugs.openjdk.java.net/browse/JDK-8225328) 到 Java 8。 (2认同)

Mar*_*nik 16

输入流的元素被逐个懒惰地消耗.第一个元素1由两个flatMaps转换为流-1, 0, 1, 0, 1, 2, 1, 2, 3,因此整个流只对应于第一个输入元素.嵌套流由管道急切地实现,然后变平,然后送到filter舞台.这解释了你的输出.

上述内容并非源于一个基本限制,但它可能会使嵌套流的完全懒惰变得更加复杂.我怀疑让它变得更好是一个更大的挑战.相比之下,Clojure的懒惰seqs为每个这样的嵌套级别提供了另一层包装.由于这种设计,StackOverflowError当嵌套运行到极端时,操作甚至可能失败.

  • @MarkoTopolnik,谢谢你的回复.实际上,霍尔格带来的担忧实际上是我惊讶的原因.第二种情况是否意味着我不能将flatMap用于无限流? (2认同)

Seb*_*ian 8

关于无限子流的破坏,当引入中间(与终端相对)短路操作时,flatMap的行为变得更加令人惊讶.

虽然以下按预期工作,但打印出无限的整数序列

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

下面的代码打印出只有"1",但仍然没有终止:

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).limit(1).forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

我无法想象阅读那些不是错误的规范.


Tag*_*eev 5

在我的免费StreamEx库中,我介绍了短路收集器.当使用短路收集器(如MoreCollectors.first())收集顺序流时,从源中消耗一个元素.在内部,它以非常脏的方式实现:使用自定义异常来打破控制流.使用我的库,您的样本可以通过以下方式重写:

System.out.println(
        "Result: " +
                StreamEx.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .collect(MoreCollectors.first())
                .get()
        );
Run Code Online (Sandbox Code Playgroud)

结果如下:

-1
Result: -1
Run Code Online (Sandbox Code Playgroud)