并行流非并发无序收集器

Eug*_*ene 6 java-8 java-stream

假设我有这个自定义收藏家:

  public class CustomToListCollector<T> implements Collector<T, List<T>, List<T>> {

     @Override
     public Supplier<List<T>> supplier() {
         return ArrayList::new;
     }

     @Override
     public BiConsumer<List<T>, T> accumulator() {
         return List::add;
     }

     @Override
     public BinaryOperator<List<T>> combiner() {
         return (l1, l2) -> {
            l1.addAll(l2);
            return l1;
         };
     }

     @Override
     public Function<List<T>, List<T>> finisher() {
         return Function.identity();
     }

     @Override
     public Set<java.util.stream.Collector.Characteristics> characteristics() {
         return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
     }
}
Run Code Online (Sandbox Code Playgroud)

这正是Collectors#toList实现的一个细微差别:还添加了UNORDERED特性.

我会假设运行此代码:

    List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

    for (int i = 0; i < 100_000; i++) {
        List<Integer> result = list.parallelStream().collect(new CustomToListCollector<>());
        if (!result.equals(list)) {
            System.out.println(result);
            break;
        }
    }
Run Code Online (Sandbox Code Playgroud)

应该实际上产生一些结果.但事实并非如此.

我看了一下罩子. ReferencePipeline#collect首先检查流是并行的,收集器是并发的还是收集器是无序的.缺少并发,因此它通过从此收集器创建TerminalOp来委托方法评估.引擎盖下的是ReducingSink,它实际上关心收集器是否无序:

         return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
        @Override
        public ReducingSink makeSink() {
            return new ReducingSink();
        }

        @Override
        public int getOpFlags() {
            return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                   ? StreamOpFlag.NOT_ORDERED
                   : 0;
        }
    }; 
Run Code Online (Sandbox Code Playgroud)

我没有进一步调试,因为它变得非常复杂.

因此,这里可能有一条捷径,有人可以解释我所缺少的东西.它是一个并行流,用于收集非并发无序收集器中的元素.不应该没有关于线程如何将结果组合在一起的顺序?如果不是,这里的订单如何(由谁)施加?

Hol*_*ger 6

请注意,使用时结果相同list .parallelStream() .unordered() .collect(Collectors.toList()),在任何一种情况下,当前实现中都不使用无序属性.

但是让我们稍微改变一下设置:

List<Integer> list = Collections.nCopies(10, null).stream()
    .flatMap(ig -> IntStream.range(0, 100).boxed())
    .collect(Collectors.toList());
List<Integer> reference = new ArrayList<>(new LinkedHashSet<>(list));

for (int i = 0; i < 100_000; i++) {
    List<Integer> result = list.parallelStream()
      .distinct()
      .collect(characteristics(Collectors.toList(), Collector.Characteristics.UNORDERED));
    if (!result.equals(reference)) {
        System.out.println(result);
        break;
    }
}
Run Code Online (Sandbox Code Playgroud)

使用这个答案characteristics收集器工厂
有趣的结果是,在之前的Java 8版本中1.8.0_60,这不同的结果.如果我们使用具有不同标识的对象而不是规范Integer实例,我们可以检测到在这些早期版本中,不仅列表的顺序不同,而且结果列表中的对象不是第一个遇到的实例.

这样的终端的操作的无序特性被传播到流,影响的行为distinct(),类似于的skiplimit,如所讨论这里在这里.

正如在第二个链接线程中所讨论的那样,反向传播已被完全删除,这在第二次考虑它时是合理的.因为distinct,skip并且limit,源的顺序是相关的并且忽略它只是因为在后续阶段中将忽略该顺序是不对的.因此,可以从反向传播中受益的唯一剩余的有状态中间操作将在sorted以后忽略该订单时变得过时.但sorted无论如何,与无序接收器结合更像是编程错误......

对于无状态中间操作,订单无论如何都是无关紧要的.流处理的工作原理是将源拆分为块,在合并到结果容器之前,将所有无状态中间操作独立应用于其元素并收集到本地容器中.因此,合并步骤是唯一的地方,尊重或忽略(块的)顺序将对结果和可能的性能产生影响.

但影响不是很大.当您实现此类操作时,例如通过ForkJoinTasks,您只需将任务拆分为两个,等待它们完成并合并它们.或者,任务可以将块拆分为子任务,就地处理其剩余块,等待子任务并合并.在任何一种情况下,由于启动任务具有对相邻任务的引用这一事实,因此按顺序合并结果是自然的.要改为与不同的块合并,首先必须以某种方式找到相关的子任务.

与不同任务合并的唯一好处是,如果任务需要不同的时间来完成,您可以与第一个完成的任务合并.但是当等待Fork/Join框架中的子任务时,线程将不会处于空闲状态,框架将使用该线程处理其中的其他待处理任务.因此,只要主要任务被分成足够的子任务,就会有完全的CPU利用率.此外,分裂器试图分成均匀的块以减少计算时间之间的差异.很可能,替代无序合并实现的好处并不能证明代码重复的合理性,至少在当前实现方面是这样.

尽管如此,报告无序特性允许实现在有益并且实现可以改变时利用它.

  • 确切地说,底线是一样的.我只详细介绍了潜在的好处,历史以及为什么潜在的性能改进被认为不值得在当前实现中添加其他代码.我预计,在此之前会发生重大改写或出现其他有状态操作(再次). (3认同)