想象一下,我有一些看起来像这样的东西:
Stream<Integer> stream = Stream.of(2,1,3,5,6,7,9,11,10)
.distinct()
.sorted();
Run Code Online (Sandbox Code Playgroud)
两者的javadocs distinct()并sorted()说它们是"有状态的中间操作".这是否意味着内部流将执行类似创建哈希集,添加所有流值,然后看到sorted()将这些值抛出到排序列表或排序集?或者它比那更聪明?
换句话说,是否.distinct().sorted()导致java遍历流两次或者java延迟直到执行终端操作(例如.collect)?
您已经提出了一个已加载的问题,暗示必须在两个备选方案之间进行选择.
有状态的中间操作必须存储数据,在某些情况下直到能够向下游传递元素之前存储所有元素,但这不会改变这项工作被推迟直到终端操作开始的事实.
说它必须"遍历流两次"也是不正确的.存在完全不同的遍历,例如sorted(),首先,遍历内部缓冲区的源的遍历将被排序,第二,遍历缓冲区.如果distinct()在顺序处理中没有发生第二次遍历,则内部HashSet仅用于确定是否向下游传递元素.
所以当你跑步
Stream<Integer> stream = Stream.of(2,1,3,5,3)
.peek(i -> System.out.println("source: "+i))
.distinct()
.peek(i -> System.out.println("distinct: "+i))
.sorted()
.peek(i -> System.out.println("sorted: "+i));
System.out.println("commencing terminal operation");
stream.forEachOrdered(i -> System.out.println("terminal: "+i));
Run Code Online (Sandbox Code Playgroud)
它打印
commencing terminal operation
source: 2
distinct: 2
source: 1
distinct: 1
source: 3
distinct: 3
source: 5
distinct: 5
source: 3
sorted: 1
terminal: 1
sorted: 2
terminal: 2
sorted: 3
terminal: 3
sorted: 5
terminal: 5
Run Code Online (Sandbox Code Playgroud)
表明在终端操作开始之前没有任何事情发生,并且来自源的元素立即通过distinct()操作(除非是重复),而所有元素在sorted()传递到下游之前在操作中被缓冲.
可以进一步证明,distinct()不需要遍历整个流:
Stream.of(2,1,1,3,5,6,7,9,2,1,3,5,11,10)
.peek(i -> System.out.println("source: "+i))
.distinct()
.peek(i -> System.out.println("distinct: "+i))
.filter(i -> i>2)
.findFirst().ifPresent(i -> System.out.println("found: "+i));
Run Code Online (Sandbox Code Playgroud)
版画
source: 2
distinct: 2
source: 1
distinct: 1
source: 1
source: 3
distinct: 3
found: 3
Run Code Online (Sandbox Code Playgroud)
正如Jose Da Silva的回答所解释和证明的那样,缓冲量可能随着有序并行流而改变,因为部分结果必须在它们传递到下游操作之前进行调整.
由于这些操作在实际终端操作未知之前不会发生,因此可能比OpenJDK中当前发生的更多优化(但可能发生在不同的实现或未来的版本中).例如,sorted().toArray()可以使用并返回相同的数组或者sorted().findFirst()可以变成a min()等.
根据javadoc,distinct和sorted方法都是有状态的中间操作.
该StreamOps说,这个操作如下:
有状态操作可能需要在生成结果之前处理整个输入.例如,在查看流的所有元素之前,不能通过对流进行排序来产生任何结果.因此,在并行计算下,某些包含有状态中间操作的管道可能需要对数据进行多次传递,或者可能需要缓冲重要数据.
但是流的收集仅在终端操作(例如toArray,collect或forEach)中发生,两个操作在流水线中处理并且数据流过它.仍然需要注意的一件重要事情是执行此操作的顺序,该distinct()方法的javadoc 说:
对于有序流,不同元素的选择是稳定的(对于重复元素,保留在遇到顺序中首先出现的元素.)对于无序流,不进行稳定性保证.
对于顺序流,当对此流进行排序时,唯一检查的元素是前一个,当未排序时,内部使用HashSet,因此执行distinct后sort会产生更好的性能.
(注意:正如Eugene评论的那样,在这个secuential流中,性能增益可能很小,特别是当代码很热时,但仍然避免创建额外的时间HashSet)
在这里你可以看到更多的秩序distinct和sort:
另一方面,对于并行流,文档说:
保持并行管道中不同()的稳定性相对昂贵(要求操作充当完全屏障,具有大量缓冲开销),并且通常不需要稳定性.如果您的情境的语义允许,使用无序流源(例如generate(Supplier))或使用BaseStream.unordered()删除排序约束可以显着提高并行管道中distinct()的执行效率.
甲全屏障操作意味着:
必须在下游启动之前执行所有上游操作.Stream API中只有两个完整的屏障操作:.sorted()(每次)和.distinct()(按顺序并行的情况).
出于这个原因,当使用并行流时,相反的顺序通常更好(只要当前流是无序的),即使用distinctbefore sorted,因为在处理distinct时,sorted可以开始接收元素.
使用相反的顺序,首先排序(无序并行流)然后使用distinct,在两者中都设置一个屏障,首先必须处理所有元素(流)sort,然后全部用于distinct.
这是一个例子:
Function<String, IntConsumer> process = name ->
idx -> {
TimeUnit.SECONDS.sleep(ThreadLocalRandom
.current().nextInt(3)); // handle exception or use
// LockSupport.parkNanos(..) sugested by Holger
System.out.println(name + idx);
};
Run Code Online (Sandbox Code Playgroud)
下面的函数接收一个名称,并返回一个从0-2秒休眠然后打印的int消费者.
IntStream.range(0, 8).parallel() // n > number of cores
.unordered() // range generates ordered stream (not sorted)
.peek(process.apply("B"))
.distinct().peek(process.apply("D"))
.sorted().peek(process.apply("S"))
.toArray(); // terminal operation
Run Code Online (Sandbox Code Playgroud)
这将打印,混合B和D,然后是所有S(没有障碍distinct).
如果更改顺序sorted和distinct:
// ... rest
.sorted().peek(process.apply("S"))
.distinct().peek(process.apply("D"))
// ... rest
Run Code Online (Sandbox Code Playgroud)
这将打印,然后是所有B的所有S,然后是所有D(屏障distinct).
如果你想尝试更多的添加unordered之后sorted再次:
// ... rest
.sorted().unordered().peek(process.apply("S"))
.distinct().peek(process.apply("D"))
// ... rest
Run Code Online (Sandbox Code Playgroud)
这将打印,所有B,然后是S和D的混合(distinct再次没有障碍).
编辑:
改变了一些代码,以更好地解释和使用ThreadLocalRandom.current().nextInt(3)sugested.
| 归档时间: |
|
| 查看次数: |
655 次 |
| 最近记录: |