如何懒惰地评估嵌套的 flatMap

Nir*_*rro 14 java cartesian-product lazy-evaluation java-stream

我试图从两个潜在的无限流中召唤一个笛卡尔积,然后我通过limit().

到目前为止,这(大约)是我的策略:

@Test
void flatMapIsLazy() {
        Stream.of("a", "b", "c")
            .flatMap(s -> Stream.of("x", "y")
                .flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
                    .mapToObj(sd::repeat)))
            .map(s -> s + "u")
            .limit(20)
            .forEach(System.out::println);
}
Run Code Online (Sandbox Code Playgroud)

这不起作用。

显然,我的第二个流在第一次在管道上使用时就被当场进行了最终评估。它不会产生我可以按照自己的节奏使用的惰性流。

我认为.forEach这段代码中的原因ReferencePipeline#flatMap是:

@Override
public void accept(P_OUT u) {
    try (Stream<? extends R> result = mapper.apply(u)) {
        if (result != null) {
            if (!cancellationRequestedCalled) {
               result.sequential().forEach(downstream);
            }
            else {
                var s = result.sequential().spliterator();
                do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我希望上面的代码返回 20 个元素,如下所示:

a
ax
axx
axxx
axxxx
...
axxxxxxxxxxxxxxxxxxx
Run Code Online (Sandbox Code Playgroud)

但相反,它崩溃了OutOfMemoryError,因为Stream嵌套的很长很长flatMap(??)并用重复字符串的不必要副本填满了我的记忆。如果代替Integer.MAX_VALUE,提供值 3,将相同的限制保持在 20,则预期输出将改为:

a
ax
axx
axxx
a
ay
ayy
ayyy
b
bx
bxx
bxxx
...
(up until 20 lines)
Run Code Online (Sandbox Code Playgroud)

编辑:此时我刚刚使用惰性迭代器推出了我自己的实现。不过,我认为应该有一种方法可以用纯 Streams 做到这一点。

编辑 2:这已被承认为 Java 中的错误票https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8267758%20

Don*_*nat 5

正如您已经写的那样,这已被视为错误。也许,它会在 Java 的未来版本中解决。

但即使现在也可能有解决方案。它不是很优雅,只有在外部流中的元素数量和限制足够小时才可能可行。但它会在这些限制下工作。

让我首先通过将外部flatMap转换为两个操作(amap和 aflatMap具有标识,只执行展平)来稍微修改您的示例:

Stream.of("a", "b", "c")
      .map(s -> Stream.of("x", "y")
            .flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
                  .mapToObj(sd::repeat)))
      .flatMap(s -> s)
      .map(s -> s + "u")
      .limit(20)
      .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

我们可以很容易地看到,我们需要的每个内部流不超过 20 个元素。所以我们可以将每个流限制为这个数量的元素。这将起作用(您应该使用变量或常量作为限制):

Stream.of("a", "b", "c")
      .map(s -> Stream.of("x", "y")
            .flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
                  .mapToObj(sd::repeat)))
      .flatMap(s -> s.limit(20))            // limit each inner stream
      .map(s -> s + "u")
      .limit(20)
      .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

当然这样还是会产生过多的中间结果,但是在上面的限制下可能问题不大。