制作流副本的最有效方法是什么?

Jer*_*cks 6 java java-8 java-stream

我有一个方法,在流上执行处理.部分处理需要在一个锁的控制下完成 - 一个锁定部分用于处理所有元素 - 但其中一部分没有(并且不应该因为它可能非常耗时).所以我不能只说:

Stream<V> preprocessed = Stream.of(objects).map(this::preProcess);
Stream<V> toPostProcess;
synchronized (lockObj) {
    toPostProcess = preprocessed.map(this::doLockedProcessing);
}
toPostProcess.map(this::postProcess).forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

因为调用doLockedProcessing只会在forEach调用终端操作时执行,并且在锁定之外.

所以我认为我需要在每个阶段使用终端操作制作流的副本,以便在正确的时间完成正确的位.就像是:

Stream<V> preprocessed = Stream.of(objects).map(this::preProcess).copy();
Stream<V> toPostProcess;
synchronized (lockObj) {
    toPostProcess = preprocessed.map(this::doLockedProcessing).copy();
}
toPostProcess.map(this::postProcess).forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

当然,该copy()方法不存在,但如果确实如此,它将对流执行终端操作并返回包含所有相同元素的新流.

我知道实现这一目标的几种方法:

(1)通过数组(如果元素类型是泛型类型,则不是那么容易):

copy = Stream.of(stream.toArray(String[]::new));
Run Code Online (Sandbox Code Playgroud)

(2)通过列表:

copy = stream.collect(Collectors.toList()).stream();
Run Code Online (Sandbox Code Playgroud)

(3)通过流构建器:

Stream.Builder<V> builder = Stream.builder();
stream.forEach(builder);
copy = builder.build();
Run Code Online (Sandbox Code Playgroud)

我想知道的是:这些方法中哪一种在时间和记忆方面最有效?或者还有另一种方式更好吗?

Fed*_*ner 3

我认为您已经提到了所有可能的选择。没有其他结构性方法可以满足您的需要。首先,您必须使用原始流。然后,创建一个新流,获取锁并使用这个新流(从而调用锁定操作)。最后,创建一个更新的流,释放锁并继续处理这个更新的流。

从您正在考虑的所有选项中,我会使用第三个,因为它可以处理的元素数量仅受内存限制,这意味着它没有隐式最大大小限制,就像 ie 那样(ArrayList它可以包含大约Integer.MAX_VALUE元素)。

不用说,这将是一项相当昂贵的操作,无论是时间还是空间。你可以这样做:

Stream<V> temp = Stream.of(objects)
        .map(this::preProcess)
        .collect(Stream::<V>builder,
                 Stream.Builder::accept,
                 (b1, b2) -> b2.build().forEach(b1))
        .build();

synchronized (lockObj) {
    temp = temp
            .map(this::doLockedProcessing)
            .collect(Stream::<V>builder,
                     Stream.Builder::accept,
                     (b1, b2) -> b2.build().forEach(b1))
            .build();
}

temp.map(this::postProcess).forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

请注意,我使用了单个Stream实例temp,以便在需要时可以对中间流(及其构建器)进行垃圾收集。


正如 @Eugene 在评论中所建议的,最好有一个实用方法来避免代码重复。这是这样的方法:

public static <T> Stream<T> copy(Stream<T> source) {
    return source.collect(Stream::<T>builder,
                          Stream.Builder::accept,
                          (b1, b2) -> b2.build().forEach(b1))
                 .build();
}
Run Code Online (Sandbox Code Playgroud)

然后,您可以按如下方式使用此方法:

Stream<V> temp = copy(Stream.of(objects).map(this::preProcess));

synchronized (lockObj) {
    temp = copy(temp.map(this::doLockedProcessing));
}

temp.map(this::postProcess).forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

  • @Eugene,旋转缓冲区是一个数组数组,因此当目标数组的容量耗尽时,一个新数组将被添加到外部数组中,而不是替换该数组。因此与 ArrayList 不同,它在扩展时不需要复制所有包含的元素。当外部数组的容量耗尽时,调整它的大小只需要廉价地复制数组引用,但无论如何你都超出了普通集合的容量。因此进行随机访问会更复杂,但这就是为什么它不是“List”。`Stream.Builder` 添加了对空和单元素情况的特殊支持。 (3认同)