使用无序终端操作的Stream.skip行为

Tag*_*eev 32 java parallel-processing java-8 java-stream collectors

我已经阅读过这个这个问题,但仍然怀疑Stream.skipJDK作者是否打算观察到这种行为.

让我们简单输入数字1..20:

List<Integer> input = IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

现在让我们创建一个并行流,以不同的方式结合unordered()使用skip()并收集结果:

System.out.println("skip-skip-unordered-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .skip(1)
            .unordered()
            .collect(Collectors.toList()));
System.out.println("skip-unordered-skip-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .unordered()
            .skip(1)
            .collect(Collectors.toList()));
System.out.println("unordered-skip-skip-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .unordered()
            .skip(1)
            .skip(1)
            .collect(Collectors.toList()));
Run Code Online (Sandbox Code Playgroud)

过滤步骤在这里基本没什么,但为流引擎增加了更多的难度:现在它不知道输出的确切大小,因此关闭了一些优化.我有以下结果:

skip-skip-unordered-toList: [3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// absent values: 1, 2
skip-unordered-skip-toList: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19, 20]
// absent values: 1, 15
unordered-skip-skip-toList: [1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 20]
// absent values: 7, 18
Run Code Online (Sandbox Code Playgroud)

结果完全没问题,一切都按预期工作.在第一种情况下,我要求跳过前两个元素,然后收集列表,没有特别的顺序.在第二种情况下,我要求跳过第一个元素,然后转为无序并跳过一个元素(我不关心哪一个).在第三种情况下,我首先转为无序模式,然后跳过两个任意元素.

让我们跳过一个元素并以无序模式收集到自定义集合.我们的定制系列将是HashSet:

System.out.println("skip-toCollection: "
        + input.parallelStream().filter(x -> x > 0)
        .skip(1)
        .unordered()
        .collect(Collectors.toCollection(HashSet::new)));
Run Code Online (Sandbox Code Playgroud)

输出令人满意:

skip-toCollection: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// 1 is skipped
Run Code Online (Sandbox Code Playgroud)

所以一般来说,我希望只要有序流,skip()跳过第一个元素,否则它会跳过任意​​元素.

但是,让我们使用等效的无序终端操作collect(Collectors.toSet()):

System.out.println("skip-toSet: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .unordered()
            .collect(Collectors.toSet()));
Run Code Online (Sandbox Code Playgroud)

现在的输出是:

skip-toSet: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 17, 18, 19, 20]
// 13 is skipped
Run Code Online (Sandbox Code Playgroud)

相同的结果可以与任何其他无序终端操作来实现(如forEach,findAny,anyMatch等等).unordered()在这种情况下删除步骤不会改变.似乎unordered()步骤正确地使流从当前操作开始无序,无序终端操作使整个流从一开始就开始无序,尽管如果skip()使用它会影响结果.这对我来说似乎完全是误导:我希望使用无序收集器与在终端操作之前将流转换为无序模式并使用等效的有序收集器相同.

所以我的问题是:

  1. 这种行为是打算还是错误?
  2. 如果是,它在某处记录了吗?我读过Stream.skip()文档:它没有说明无序的终端操作.另外,Characteristics.UNORDERED文档不是很理解,也没有说整个流的排序会丢失.最后,包概要中的订购部分也未涵盖此案例.可能我错过了什么?
  3. 如果无意的终端操作意图使整个流无序,那么为什么unordered()步骤仅在此之后才使其无序?我可以依靠这种行为吗?或者我很幸运,我的第一次测试工作得很好?

Bri*_*etz 24

回想一下,流标志(ORDERED,SORTED,SIZED,DISTINCT)的目标是使操作能够避免做不必要的工作.涉及流标志的优化示例如下:

  • 如果我们知道流已经排序,那么sorted()就是无操作;
  • 如果我们知道流的大小,我们可以预先分配一个正确大小的数组toArray(),避免复制;
  • 如果我们知道输入没有有意义的遭遇顺序,我们就不需要采取额外的步骤来保留遭遇顺序.

管道的每个阶段都有一组流标志.中间操作可以注入,保留或清除流标志.例如,过滤保留了sorted-ness/distinct-ness但不保留大小; 映射保留大小但不是排序或不同的.排序注入排序.中间操作的标志处理相当简单,因为所有决策都是本地的.

终端操作的标志处理更加微妙.ORDERED是终端操作最相关的标志.如果终端操作是UNORDERED,那么我们会反向传播无序的.

我们为什么要做这个?好吧,考虑这个管道:

set.stream()
   .sorted()
   .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

由于forEach不限制按顺序操作,因此对列表进行排序的工作完全是浪费精力.所以我们反向传播这些信息(直到我们遇到短路操作,例如limit),以免失去这个优化机会.同样,我们可以使用distinct无序流的优化实现.

这种行为是打算还是错误?

是:)反向传播是预期的,因为它是一种有用的优化,不应产生不正确的结果.然而,bug部分是我们正在传播过去的skip,我们不应该.所以UNORDERED标志的反向传播是过于激进的,这是一个错误.我们会发布一个错误.

如果是,它在某处记录了吗?

它应该只是一个实现细节; 如果它被正确实现,你不会注意到(除了你的流更快.)

  • 经过一些分析后,我们选择完全退出反向传播.唯一可以获得回报的地方就是优化排序; 如果你有一个排序到无序终端op的管道,那么无论如何这可能是一个用户错误. (7认同)
  • @Holger更正,终端标志没有更多的反向传播,这意味着终端操作的有序性(或缺乏)不会影响*之前*操作的行为.当然,`forEach`和`forEachOrdered.之间仍然存在差异 (3认同)
  • 谢谢!这正是我正在等待的答案:-) 我已经实现了一个 [skipOrdered](https://github.com/amaembo/streamex/blob/8e097c4eaf63baec162424ef07c7b2cf1d669f72/src/main/java/javax/util/streamex/AbsStream java#L1186) 方法在我的库中解决该错误。它采用流拆分器,将其转换为顺序,执行“skip()”,然后在必要时将其转回“parallel()”。希望原来的`skip()` 会在JDK9 中被修复,所以这个方法将变得不必要。 (2认同)
  • @Brian Goetz:只是为了让它正确,终端操作将不再反向传播无序属性?那么这是否意味着`forEach`和`forEachOrdered`在这方面没有区别? (2认同)