takeWhile()与flatmap的工作方式不同

Jee*_*ese 75 java lambda java-stream java-9

我正在创建片段与takeWhile探索其可能性.与flatMap结合使用时,行为与预期不符.请在下面找到代码段.

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));
Run Code Online (Sandbox Code Playgroud)

实际产量:

Sample1
Sample2
Sample3
Sample5
Run Code Online (Sandbox Code Playgroud)

ExpectedOutput:

Sample1
Sample2
Sample3
Run Code Online (Sandbox Code Playgroud)

期望的原因是takeWhile应该执行直到内部条件变为真.我还在flatmap中添加了printout语句以进行调试.流返回两次,符合预期.

但是,如果链中没有flatmap,这样可以正常工作.

String[] strArraySingle = {"Sample3", "Sample4", "Sample5"};
Arrays.stream(strArraySingle)
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));
Run Code Online (Sandbox Code Playgroud)

实际产量:

Sample3
Run Code Online (Sandbox Code Playgroud)

这里实际输出与预期输出匹配.

免责声明:这些代码段仅用于代码练习,不提供任何有效的用例.

更新: 错误JDK-8193856:修复将作为JDK 10的一部分提供.更改将更正whileOps Sink :: accept

@Override 
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}
Run Code Online (Sandbox Code Playgroud)

改变实施:

@Override
public void accept(T t) {
    if (take && (take = predicate.test(t))) {
        downstream.accept(t);
    }
}
Run Code Online (Sandbox Code Playgroud)

Nic*_*lai 54

这是JDK 9中的一个错误 - 来自问题#8193856:

takeWhile错误地假设上游操作支持并尊重取消,但遗憾的是并非如此flatMap.

说明

如果订购了流,takeWhile则应显示预期的行为.在您的代码中并非完全如此,因为您使用了forEach放弃订单的情况.如果你关心它,你在这个例子中做,你应该使用forEachOrdered.有趣的是:这不会改变任何事情.

那么也许首先没有订购流?(在这种情况下,行为是正常的.)如果为从中创建的流创建临时变量strArray并通过((StatefulOp) stream).isOrdered();在断点处执行表达式来检查它是否被排序,您会发现它确实是有序的:

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Stream<String> stream = Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));

// breakpoint here
System.out.println(stream);
Run Code Online (Sandbox Code Playgroud)

这意味着这很可能是一个实现错误.

进入守则

正如其他人所怀疑的那样,我现在也认为这可能flatMap渴望有关.更确切地说,这两个问题可能具有相同的根本原因.

查看源代码WhileOps,我们可以看到以下方法:

@Override
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}

@Override
public boolean cancellationRequested() {
    return !take || downstream.cancellationRequested();
}
Run Code Online (Sandbox Code Playgroud)

此代码用于takeWhile检查给定的流元素t是否predicate已满足:

  • 如果是这样,它会将元素传递给downstream操作,在这种情况下System.out::println.
  • 如果不是,则设置take为false,因此当下次询问管道是否应该被取消时(即已完成),它将返回true.

这涵盖了takeWhile操作.您需要知道的另一件事是forEachOrdered导致执行该方法的终端操作ReferencePipeline::forEachWithCancel:

@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    boolean cancelled;
    do { } while (
            !(cancelled = sink.cancellationRequested())
            && spliterator.tryAdvance(sink));
    return cancelled;
}
Run Code Online (Sandbox Code Playgroud)

这一切都是:

  1. 检查管道是否被取消
  2. 如果没有,将水槽推进一个元素
  3. 如果这是最后一个元素,请停止

看起来很有前途吧?

没有 flatMap

在"好的情况"(没有flatMap;你的第二个例子)forEachWithCancel直接对WhileOpas进行操作sink,你可以看到它是如何发挥作用的:

  • ReferencePipeline::forEachWithCancel 它的循环:
    • WhileOps::accept 给出每个流元素
    • WhileOps::cancellationRequested 每个元素后都会被查询
  • 在某些时候"Sample4",谓词失败,流被取消

好极了!

flatMap

在"最坏情况"(与flatMap您的第一个例子),forEachWithCancel运行在flatMap运行,虽然,这只是调用forEachRemainingArraySpliterator进行{"Sample3", "Sample4", "Sample5"},这将会:

if ((a = array).length >= (hi = fence) &&
    (i = index) >= 0 && i < (index = hi)) {
    do { action.accept((T)a[i]); } while (++i < hi);
}
Run Code Online (Sandbox Code Playgroud)

忽略所有这些hifence东西,只有在为并行流拆分数组处理时才使用,这是一个简单的for循环,它将每个元素传递给takeWhile操作,但从不检查它是否被取消.因此,它会在停止之前热切地穿过那个"子流"中的所有元素,甚至可能通过流的其余部分.

  • @Eugene:好吧,我打赌它与[这一个](/sf/ask/2046056141/)相关联.它恰好适用于终端短路操作,因为它们忽略了多余的元素,但现在我们有中间短路操作......所以它实际上是好消息,因为它意味着现在有更多的压力需要修复这个bug(糟糕的性能或打破子流是无限的,显然是不够的)... (17认同)
  • 好像,他们听说过你:https://bugs.openjdk.java.net/browse/JDK-8193856 (16认同)
  • 它不会遍历整个流.如果子流的最后一个元素与谓词匹配,则外部流的取消支持将起作用,例如使用`String [] [] strArray = {{"Sample1","Sample2"},{"Sample3","Sample4" },{"Sample5","Sample6"},};`作为输入,它似乎工作.如果只有一个中间元素匹配,那么`flatMap`对取消的无知会导致该标志被后续元素的评估覆盖. (10认同)

Eug*_*ene 20

无论我怎么看,这都是一个错误 - 感谢Holger的评论.我不想把这个答案放在这里(严肃地说!),但没有一个答案清楚地说明这是一个错误.

人们说这必须是有序/无序的,这不是真的,因为这将报告true3次:

Stream<String[]> s1 = Arrays.stream(strArray);
System.out.println(s1.spliterator().hasCharacteristics(Spliterator.ORDERED));

Stream<String> s2 = Arrays.stream(strArray)
            .flatMap(indStream -> Arrays.stream(indStream));
System.out.println(s2.spliterator().hasCharacteristics(Spliterator.ORDERED));

Stream<String> s3 = Arrays.stream(strArray)
            .flatMap(indStream -> Arrays.stream(indStream))
            .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));
System.out.println(s3.spliterator().hasCharacteristics(Spliterator.ORDERED));
Run Code Online (Sandbox Code Playgroud)

如果您将其更改为:

String[][] strArray = { 
         { "Sample1", "Sample2" }, 
         { "Sample3", "Sample5", "Sample4" }, // Sample4 is the last one here
         { "Sample7", "Sample8" } 
};
Run Code Online (Sandbox Code Playgroud)

然后Sample7Sample8不会成为输出的一部分,否则他们会.它似乎flatmap 忽略了将引入的取消标志dropWhile.


Mic*_*ael 11

如果您查看以下文档takeWhile:

如果对此流进行排序,则[返回]一个流,该流由从该流中获取的与给定谓词匹配的最长元素前缀组成.

如果此流是无序的,则[返回]一个流,该流由从该流中获取的与给定谓词匹配的元素子集组成.

你的流是巧合的,但takeWhile 不知道它是.因此,它返回第二个条件 - 子集.你takeWhile的表现就像一个人filter.

如果你sorted之前添加了一个电话takeWhile,你会看到你期望的结果:

Arrays.stream(strArray)
      .flatMap(indStream -> Arrays.stream(indStream))
      .sorted()
      .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
      .forEach(ele -> System.out.println(ele));
Run Code Online (Sandbox Code Playgroud)

  • 为什么没有订购,或者它为什么不知道它是?应该订购有序流的"串联",不是吗? (17认同)
  • "*但是TakeWhile并不知道它是*"......好吧*为什么*当流和它的子流*被排序时它不知道它为什么是`.sorted().unordered().takeWhile (...)`那时还在做正确的事吗?我会说,这是因为`sorted`是一个有状态的操作,它缓冲整个输入,然后是一个真正的懒惰迭代. (10认同)
  • @JBNizet然后如果你采取每个单独的步骤`Stream <String []> s1 = Arrays.stream(strArray); System.out.println(s1.spliterator().hasCharacteristics(Split erator.ORDERED))`等等每个步骤 - 它们都会生成一个`ORDERED`流,这看起来像一个尚未报告的bug (9认同)
  • @Michael,因为我看到它(根据之前的评论) - 你的结论对我来说是错误的 (8认同)
  • "你的流是巧合地排序的,但是TakeWhile并不知道它是.因此,它正在返回第二个条件 - 子集.你的takeWhile就像一个过滤器.":但这听起来真的错了.如果未对流进行排序,则它将以某种不可预测的顺序返回其元素.现在,`takeWhile`应该按照它接收它们的顺序对它实际接收的元素进行操作,并在元素不满足其谓词时立即停止.如果想要过滤无序流,他们应该使用`filter`. (2认同)

Nam*_*man 9

其原因是flatMap也作为一个操作的中间操作与(之一)的状态短路中间操作 takeWhile时使用.

flatMapHolger在这个答案中指出的行为当然是一个不应错过的参考,以理解这种短路操作的意外输出.

通过引入终端操作来确定性地使用有序流并对样本执行以下操作,可以通过拆分这两个中间操作来实现预期结果:

List<String> sampleList = Arrays.stream(strArray).flatMap(Arrays::stream).collect(Collectors.toList());
sampleList.stream().takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
            .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

此外,似乎有一个相关的Bug#JDK-8075939来跟踪已注册的此行为.

编辑:这可以在 JDK-8193856中进一步跟踪,作为错误接受.

  • 我不明白你的解释.对我来说,这种行为似乎是一个错误.而您建议的替代方案需要两个Stream流水线,这可能不太理想. (8认同)
  • @Eran确实这种行为看起来像个bug.建议的另一种方法是引入终端操作来完成(排除)"flatMap"操作,然后处理流以执行`takeWhile`. (2认同)