并行转换流时如何使用收集器

use*_*315 9 java multithreading java-8 java-stream collectors

我实际上试图回答这个问题如何跳过从Files.lines获得的Stream <String>的行.所以我虽然这个收集器并不能很好地并行工作:

private static Collector<String, ?, List<String>> oddLines() {
    int[] counter = {1};
    return Collector.of(ArrayList::new,
            (l, line) -> {
                if (counter[0] % 2 == 1) l.add(line);
                counter[0]++;
            },
            (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            });
}
Run Code Online (Sandbox Code Playgroud)

但它的确有效.

编辑:它实际上没有工作; 我被我的输入集太小而无法触发任何并行性这一事实所迷惑; 见评论中的讨论.

我认为这是行不通的,因为我想到了以下两个执行计划.


1. counter数组在所有线程之间共享.

线程t1读取Stream的第一个元素,因此满足if条件.它将第一个元素添加到其列表中.然后在他有时间更新数组值之前停止执行.

线程t2,从流的第4个元素开始,将其添加到列表中.所以我们最终得到了一个非想要的元素.

当然,既然这个收藏家似乎有效,我猜它不会那样.而且无论如何更新都不是原子的.


2.每个线程都有自己的数组副本

在这种情况下,更新没有更多的问题,但没有什么能阻止线程t2不会从流的第4个元素开始.所以他也不像那样工作.


所以它似乎根本不起作用,这让我想到了......收集器如何并行使用?

有人能解释我基本上它是如何工作的以及为什么我的收藏家在并行运行时工作?

非常感谢你!

Tho*_*lut 5

parallel()源流传递到收集器足以打破逻辑,因为共享状态(counter)可能会从不同的任务中增加.您可以验证,因为它永远不会为任何有限的流输入返回正确的结果:

    Stream<String> lines = IntStream.range(1, 20000).mapToObj(i -> i + "");
    System.out.println(lines.isParallel());
    lines = lines.parallel();
    System.out.println(lines.isParallel());

    List<String> collected = lines.collect(oddLines());

    System.out.println(collected.size());
Run Code Online (Sandbox Code Playgroud)

请注意,对于无限流(例如,从读取时Files.lines()),您需要在流中生成大量数据,因此它实际上要求任务同时运行一些块.

我的输出是:

false
true
12386
Run Code Online (Sandbox Code Playgroud)

这显然是错的.


正如@Holger在评论中正确指出的那样,当收集器指定时会发生不同的竞争,CONCURRENT并且UNORDERED在这种情况下,它们在任务之间运行单个共享集合(ArrayList::new每个流调用一次),其中只有parallel()它将在每个任务的集合上运行累加器,然后使用您定义的组合器组合结果.

如果您要将特征添加到收集器,则由于单个集合中的共享状态,您可能会遇到以下结果:

false
true
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 73
    at java.util.ArrayList.add(ArrayList.java:459)
    at de.jungblut.stuff.StreamPallel.lambda$0(StreamPallel.java:18)
    at de.jungblut.stuff.StreamPallel$$Lambda$3/1044036744.accept(Unknown Source)
    at java.util.stream.ReferencePipeline.lambda$collect$207(ReferencePipeline.java:496)
    at java.util.stream.ReferencePipeline$$Lambda$6/2003749087.accept(Unknown Source)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:496)
    at de.jungblut.stuff.StreamPallel.main(StreamPallel.java:32)12386
Run Code Online (Sandbox Code Playgroud)

  • 它是.但请注意,您甚至不需要大型数据集来发现问题.它只是需要大数据的文件I/O情况.我设法用一个简单的表达式产生错误的结果,如"Stream.of("1","2","3").parallel().collect(oddLines())`... (2认同)