复制流以避免"流已经被操作或关闭"

Tob*_*oby 100 java lambda java-8 java-stream

我想复制Java 8流,以便我可以处理它两次.我可以collect作为一个列表并从中获得新的流;

// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff
Run Code Online (Sandbox Code Playgroud)

但我认为应该有一种更有效/更优雅的方式.

有没有办法复制流而不将其转换为集合?

我实际上正在使用Eithers 流,所以想要在移动到正确的投影之前以一种方式处理左投影并以另一种方式处理.有点像这样(到目前为止,我被迫使用这个toList技巧).

List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());

Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );

Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
Run Code Online (Sandbox Code Playgroud)

Bri*_*etz 81

我认为你对效率的假设是倒退的.如果您只打算使用一次数据,那么您将获得巨大的效率回报,因为您不必存储它,并且流为您提供强大的"循环融合"优化,使您可以有效地通过管道传输整个数据.

如果要重新使用相同的数据,那么根据定义,您必须生成两次(确定性地)或存储它.如果它恰好在一个集合中,那就太好了; 然后迭代两次便宜.

我们用"分叉流"对设计进行了实验.我们发现支持这个有实际成本; 它以不常见的情况为代价来承担普通案件(使用一次).最大的问题是处理"当两个管道不以相同的速率消耗数据时会发生什么." 现在你还是要恢复缓冲了.这个功能显然没有发挥其重要作用.

如果要重复操作相同的数据,请将其存储,或将操作构建为消费者,并执行以下操作:

stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });
Run Code Online (Sandbox Code Playgroud)

您也可以查看RxJava库,因为它的处理模型更适合这种"流分叉".

  • 流既是_expressive_又是_efficient_.它们具有表现力,因为它们可以让您在阅读代码的过程中设置复杂的聚合操作而不会出现大量意外细节(例如,中间结果).它们也很有效,因为它们(通常)只对数据进行一次传递,而不会填充中间结果容器.这两个属性一起使它们成为许多情况下有吸引力的编程模型.当然,并非所有编程模型都适合所有问题; 您仍然需要决定是否使用适当的工具来完成工作. (10认同)
  • 也许我不应该使用“效率”,我想知道如果我所做的只是立即存储数据(“toList”)以便能够处理它,为什么我还要费心流(而不存储任何东西) (以“任一”情况为例)? (2认同)
  • @NiallConnaughton 我不确定你的意思是。如果你想遍历它两次,就必须有人存储它,或者你必须重新生成它。您是否建议图书馆应该缓冲它,以防有人需要两次?那太愚蠢了。 (2认同)

use*_*679 63

使用java.util.function.Supplier.

来自http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/:

重用流

Java 8流不能重用.只要您调用任何终端操作,流就会关闭:

Stream<String> stream =

Stream.of("d2", "a2", "b1", "b3", "c")

.filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok

stream.noneMatch(s -> true);   // exception
Run Code Online (Sandbox Code Playgroud)

在同一个流上的anyMatch之后调用noneMatch会导致以下异常:

java.lang.IllegalStateException: stream has already been operated upon or closed

at 

java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)

at 

java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)

at com.winterbe.java8.Streams5.test7(Streams5.java:38)

at com.winterbe.java8.Streams5.main(Streams5.java:28)
Run Code Online (Sandbox Code Playgroud)

为了克服这个限制,我们必须为我们想要执行的每个终端操作创建一个新的流链,例如,我们可以创建一个流供应商来构建一个新的流,其中已经设置了所有中间操作:

Supplier<Stream<String>> streamSupplier =

    () -> Stream.of("d2", "a2", "b1", "b3", "c")

            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok

streamSupplier.get().noneMatch(s -> true);  // ok
Run Code Online (Sandbox Code Playgroud)

每次调用get()构造一个我们保存的新流,以调用所需的终端操作.

  • 请注意,如果“Stream”是以“昂贵”的方式构建的,则使用“Supplier”,**您将为每次调用“Supplier.get()”**支付该费用。即如果数据库查询...每次都会完成该查询 (3认同)
  • 漂亮而优雅的解决方案。比最受好评的解决方案更多的java8-ish。 (2认同)

Luk*_*der 8

我们duplicate()jOOλ中实现了一个流方法,这是一个我们创建的开源库,用于改进jOOQ的集成测试.基本上,你可以写:

Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();
Run Code Online (Sandbox Code Playgroud)

在内部,有一个缓冲区存储从一个流中消耗但从另一个流中消耗的所有值.如果您的两个流以相同的速率消耗,并且如果您可以忍受缺乏线程安全性,那么这可能是有效的.

以下是算法的工作原理:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final List<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return tuple(seq(new Duplicate()), seq(new Duplicate()));
}
Run Code Online (Sandbox Code Playgroud)

这里有更多源代码

Tuple2大概是喜欢你的Pair类型,而SeqStream一些增强功能.

  • 此解决方案不是线程安全的:您无法将其中一个流传递给另一个线程.我真的没有看到两个流都可以在单线程中以相同的速率消耗而且实际上需要两个不同的流的情况.如果要从同一个流生成两个结果,那么使用组合收集器(在JOOL中已经存在)会好得多. (2认同)
  • 在这种情况下,您仍将减少一个接一个的流.所以没有必要让生活更加艰难,引入软化的迭代器,无论如何它将整个流收集到引擎盖下的列表中.您可以只是显式地收集到列表,然后在OP告诉它创建两个流(它是相同数量的代码行).好吧,如果第一次减少是短路,你可能只会有一些改进,但它不是OP的情况. (2认同)

ass*_*ias 7

您可以创建一个可运行的流(例如):

results.stream()
    .flatMap(either -> Stream.<Runnable> of(
            () -> failure(either.left()),
            () -> success(either.right())))
    .forEach(Runnable::run);
Run Code Online (Sandbox Code Playgroud)

申请的地点failure和地点success.然而,这将创建相当多的临时对象,并且可能不比从集合开始流式传输/迭代两次更有效.


Ram*_*ams 7

使用供应商为每个终止操作生成流。

Supplier <Stream<Integer>> streamSupplier=()->list.stream();
Run Code Online (Sandbox Code Playgroud)

每当您需要该集合的流时,请使用streamSupplier.get()来获取新的流。

例子:

  1. streamSupplier.get().anyMatch(predicate);
  2. streamSupplier.get().allMatch(predicate2);


Mar*_*tin 6

多次处理元素的另一种方法是使用Stream.peek(Consumer)

doSomething().stream()
.peek(either -> handleFailure(either.left()))
.foreach(either -> handleSuccess(either.right()));
Run Code Online (Sandbox Code Playgroud)

peek(Consumer)可以根据需要多次链接。

doSomething().stream()
.peek(element -> handleFoo(element.foo()))
.peek(element -> handleBar(element.bar()))
.peek(element -> handleBaz(element.baz()))
.foreach(element-> handleQux(element.qux()));
Run Code Online (Sandbox Code Playgroud)

  • @HectorJ 另一个线程是关于修改元素的。我认为这里没有完成。 (2认同)