排序和分离立即处理流?

Cog*_*man 13 java java-stream

想象一下,我有一些看起来像这样的东西:

Stream<Integer> stream = Stream.of(2,1,3,5,6,7,9,11,10)
            .distinct()
            .sorted();
Run Code Online (Sandbox Code Playgroud)

两者的javadocs distinct()sorted()说它们是"有状态的中间操作".这是否意味着内部流将执行类似创建哈希集,添加所有流值,然后看到sorted()将这些值抛出到排序列表或排序集?或者它比那更聪明?

换句话说,是否.distinct().sorted()导致java遍历流两次或者java延迟直到执行终端操作(例如.collect)?

Hol*_*ger 9

您已经提出了一个已加载的问题,暗示必须在两个备选方案之间进行选择.

有状态的中间操作必须存储数据,在某些情况下直到能够向下游传递元素之前存储所有元素,但这不会改变这项工作被推迟直到终端操作开始的事实.

说它必须"遍历流两次"也是不正确的.存在完全不同的遍历,例如sorted(),首先,遍历内部缓冲区的源的遍历将被排序,第二,遍历缓冲区.如果distinct()在顺序处理中没有发生第二次遍历,则内部HashSet仅用于确定是否向下游传递元素.

所以当你跑步

Stream<Integer> stream = Stream.of(2,1,3,5,3)
    .peek(i -> System.out.println("source: "+i))
    .distinct()
    .peek(i -> System.out.println("distinct: "+i))
    .sorted()
    .peek(i -> System.out.println("sorted: "+i));
System.out.println("commencing terminal operation");
stream.forEachOrdered(i -> System.out.println("terminal: "+i));
Run Code Online (Sandbox Code Playgroud)

它打印

commencing terminal operation
source: 2
distinct: 2
source: 1
distinct: 1
source: 3
distinct: 3
source: 5
distinct: 5
source: 3
sorted: 1
terminal: 1
sorted: 2
terminal: 2
sorted: 3
terminal: 3
sorted: 5
terminal: 5
Run Code Online (Sandbox Code Playgroud)

表明在终端操作开始之前没有任何事情发生,并且来自源的元素立即通过distinct()操作(除非是重复),而所有元素在sorted()传递到下游之前在操作中被缓冲.

可以进一步证明,distinct()不需要遍历整个流:

Stream.of(2,1,1,3,5,6,7,9,2,1,3,5,11,10)
    .peek(i -> System.out.println("source: "+i))
    .distinct()
    .peek(i -> System.out.println("distinct: "+i))
    .filter(i -> i>2)
    .findFirst().ifPresent(i -> System.out.println("found: "+i));
Run Code Online (Sandbox Code Playgroud)

版画

source: 2
distinct: 2
source: 1
distinct: 1
source: 1
source: 3
distinct: 3
found: 3
Run Code Online (Sandbox Code Playgroud)

正如Jose Da Silva的回答所解释和证明的那样,缓冲量可能随着有序并行流而改变,因为部分结果必须在它们传递到下游操作之前进行调整.

由于这些操作在实际终端操作未知之前不会发生,因此可能比OpenJDK中当前发生的更多优化(但可能发生在不同的实现或未来的版本中).例如,sorted().toArray()可以使用并返回相同的数组或者sorted().findFirst()可以变成a min()等.


Jos*_*lva 6

根据javadoc,distinctsorted方法都是有状态的中间操作.

StreamOps说,这个操作如下:

有状态操作可能需要在生成结果之前处理整个输入.例如,在查看流的所有元素之前,不能通过对流进行排序来产生任何结果.因此,在并行计算下,某些包含有状态中间操作的管道可能需要对数据进行多次传递,或者可能需要缓冲重要数据.

但是流的收集仅在终端操作(例如toArray,collectforEach)中发生,两个操作在流水线中处理并且数据流过它.仍然需要注意的一件重要事情是执行此操作的顺序,该distinct()方法的javadoc 说:

对于有序流,不同元素的选择是稳定的(对于重复元素,保留在遇到顺序中首先出现的元素.)对于无序流,不进行稳定性保证.


对于顺序流,当对此流进行排序时,唯一检查的元素是前一个,当未排序时,内部使用HashSet,因此执行distinctsort会产生更好的性能.

(注意:正如Eugene评论的那样,在这个secuential流中,性能增益可能很小,特别是当代码很热时,但仍然避免创建额外的时间HashSet)

在这里你可以看到更多的秩序distinctsort:

Java Streams:如何做一个有效的"独特和排序"?


另一方面,对于并行流,文档说:

保持并行管道中不同()的稳定性相对昂贵(要求操作充当完全屏障,具有大量缓冲开销),并且通常不需要稳定性.如果您的情境的语义允许,使用无序流源(例如generate(Supplier))或使用BaseStream.unordered()删除排序约束可以显着提高并行管道中distinct()的执行效率.

全屏障操作意味着:

必须在下游启动之前执行所有上游操作.Stream API中只有两个完整的屏障操作:.sorted()(每次)和.distinct()(按顺序并行的情况).

出于这个原因,当使用并行流时,相反的顺序通常更好(只要当前流是无序的),即使用distinctbefore sorted,因为在处理distinct时,sorted可以开始接收元素.

使用相反的顺序,首先排序(无序并行流)然后使用distinct,在两者中都设置一个屏障,首先必须处理所有元素(流)sort,然后全部用于distinct.

这是一个例子:

Function<String, IntConsumer> process = name ->
        idx -> {
            TimeUnit.SECONDS.sleep(ThreadLocalRandom
                    .current().nextInt(3)); // handle exception or use 
                                            // LockSupport.parkNanos(..) sugested by Holger
            System.out.println(name + idx);
        };
Run Code Online (Sandbox Code Playgroud)

下面的函数接收一个名称,并返回一个从0-2秒休眠然后打印的int消费者.

IntStream.range(0, 8).parallel() // n > number of cores
        .unordered() // range generates ordered stream (not sorted)
        .peek(process.apply("B"))
        .distinct().peek(process.apply("D"))
        .sorted().peek(process.apply("S"))
        .toArray(); // terminal operation
Run Code Online (Sandbox Code Playgroud)

这将打印,混合B和D,然后是所有S(没有障碍distinct).

如果更改顺序sorteddistinct:

        // ... rest
        .sorted().peek(process.apply("S"))
        .distinct().peek(process.apply("D"))
        // ... rest
Run Code Online (Sandbox Code Playgroud)

这将打印,然后是所有B的所有S,然后是所有D(屏障distinct).

如果你想尝试更多的添加unordered之后sorted再次:

        // ... rest
        .sorted().unordered().peek(process.apply("S"))
        .distinct().peek(process.apply("D"))
        // ... rest
Run Code Online (Sandbox Code Playgroud)

这将打印,所有B,然后是S和D的混合(distinct再次没有障碍).


编辑:

改变了一些代码,以更好地解释和使用ThreadLocalRandom.current().nextInt(3)sugested.

  • `distinct()`不*需要收集整个流.此外,你不应该使用`new Random().nextInt()%3`,因为结果可能变为负数.有一个简单的专用方法来获得有界int:`new Random().nextInt(3)`.但更好的选择是`ThreadLocalRandom.current().nextInt(3)`.顺便说一句,Java甚至还有一个不太知名的方法,允许在不需要异常处理的情况下进行休眠:`LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(ThreadLocalRandom.current().nextInt(3))); ` (2认同)