在执行其他操作之前查找Stream大小

Jus*_*ent 13 java concurrency java-stream

在我的节目,我多次1 的Java 8 ,以减少对象的集合到一个单一的一个.在整个执行过程中,此集合的大小可能会有很大差异:从3个对象到数百个.

public void findInterestingFoo(Stream<Foo> foos) {
    internalState.update(foos.collect(customCollector()));
}
Run Code Online (Sandbox Code Playgroud)

在优化我的代码和搜索瓶颈的过程中,我在某些时候使流并行.这在当时很有效,因为收藏品都很大.之后,在更改程序的其他部分和参数后,集合变得更小.我意识到使流并行更有效.这是有道理的:在4个对象的多个线程上分配工作的开销根本不值得.但是对于数百个对象来说它值得的.

如果我只能使大流并行,那将非常方便:

public void findInterestingFoo(Stream<Foo> foos) {
    if (isSmall(foos)) {
        internalState.update(foos.collect(customCollector()));
    } else {
        internalState.update(foos.parallel().collect(customCollector()));
    }
}
Run Code Online (Sandbox Code Playgroud)

当然,当从数组,集合手动创建流时,可以手动执行此操作.也就是说,我们知道流中有哪些元素,因此可以跟踪它.然而,我有兴趣以通用的方式解决这个问题,因此无论传递什么类型的流findInterestingFoo,它都会得到适当的处理和尽可能高效的处理.

count()可能有帮助的东西,除了它在我收集它之前终止流.

我很清楚,流被设计为没有设置大小,特别是:

  • 可能是无限的.虽然集合的大小有限,但流不需要.短路操作,例如limit(n)findFirst()可以允许无限流上的计算在有限时间内完成.- java.util.stream包装说明

不过,我想知道在对它执行任何操作之前是否有任何方法可以确定流中有多少元素.流是否真的不知道它是从有限集合创建的?

__________
1数千次.在我的情况下,优化这导致总运行时间从大约1.5到0.5秒的加速.

Hol*_*ger 16

从理论上讲,你可以这样做:

public void findInterestingFoo(Stream<Foo> foos) {
    Spliterator<Foo> sp = foos.spliterator();
    long size = sp.getExactSizeIfKnown();// returns -1 if not known
          // or sp.estimateSize(); // Long.MAX_VALUE means "unknown"
    internalState.update(
        StreamSupport.stream(sp, size > PARALLEL_THRESHOLD)
                     .collect(customCollector()));
}
Run Code Online (Sandbox Code Playgroud)

spliterator()是消耗在输入流的终端的操作,但可以传递SpliteratorStreamSupport.stream构造一个流具有完全相同的特性.第二个参数已经告诉流是否应该是并行的.

理论上.

实际上,当前流实现将Spliterator根据流是否并行返回不同的实现.这意味着将流重新创建为并行流可能最终会导致当原始流在调用之前尚未并行时无法进行并行处理的流spliterator().

但是,如果没有中间操作,例如直接Stream从集合或数组传入创建时,它确实可以正常工作.

如果您决定这样做,parallel()之前调用spliterator()以获得可能仍然按顺序运行的并行功能流,可以在很多情况下工作.但是,如果sorted()在输入流中存在有状态的中间操作,则它们可能会被修复为并行运行,即使您collect按顺序执行(反之亦然).


另一个问题是基本性质.元素的数量实际上并没有说明并行处理是否会带来好处.这取决于每个元素的工作负载,这不仅取决于您的终端collect操作,还取决于在输入方法之前已链接到流的操作.即使你认为你的集热器的工作量已经高到足以值得并行处理,这也许是因为进入的流具有操作一样skip,limitdistinct(在有序流),这往往会遇到更糟糕的并行,将需要一个完全不同的阈值.

一个更简单的解决方案是让呼叫者决定,因为呼叫者知道流的大小和性质.您甚至不需要为方法的签名添加选项,因为调用者可以在将其传递给您的方法之前通过调用parallel()sequential()在流上做出决定,并且您可以通过简单地不更改模式来尊重它.

  • @Eugene一个`DISTINCT`标志并没有告诉你链中有一个昂贵的`distinct()`操作.流可以是自然不同的,因为源是"Set".甚至`IntStream.range(...)`产生不同的值并具有完美的并行性能.简单地说,没有办法说出来.此外,还有其他源属性可能会影响并行性能,例如,只考虑我们可能会流过文件的行.通常,假设并行处理不好的主要原因在于呼叫方.另一端只知道*附加*障碍. (2认同)