我可以在Java 8中复制Stream吗?

nec*_*cer 54 java java-8 java-stream

有时我想在流上执行一组操作,然后以不同的方式处理结果流与其他操作.

我是否可以这样做而无需指定两次常见的初始操作?

例如,我希望dup()存在以下方法:

Stream [] desired_streams = IntStream.range(1, 100).filter(n -> n % 2 == 0).dup();
Stream stream14 = desired_streams[0].filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_streams[1].filter(n -> n % 5 == 0); // multiples of 10
Run Code Online (Sandbox Code Playgroud)

nos*_*sid 47

无法以这种方式复制流.但是,您可以通过将公共部分移动到方法或lambda表达式来避免代码重复.

Supplier<IntStream> supplier = () ->
    IntStream.range(1, 100).filter(n -> n % 2 == 0);
supplier.get().filter(...);
supplier.get().filter(...);
Run Code Online (Sandbox Code Playgroud)

  • @necromancer:谢谢你的提问.随意更改已接受的答案. (7认同)
  • 我正在考虑将接受的答案转换为 Elazar 的答案,并链接到你的答案,作为第二个解决方案的一个很好的例子,以及我在我的问题中使用的具体例子的解决方案。希望没关系。谢谢! (3认同)

Ela*_*zar 28

一般来说这是不可能的.

如果要复制输入流或输入迭代器,则有两个选项:

答:把所有东西放在一个集合中,比如说 List<>

假设您将流复制到两个流s1s2.如果您拥有高级n1元素s1n2元素s2,则必须将|n2 - n1|元素保留在内存中,以保持同步.如果您的流是无限的,则可能没有所需存储的上限.

看看Python的内容tee(),看看它需要什么:

这个itertool可能需要大量的辅助存储(取决于需要存储多少临时数据).一般来说,如果一个迭代器使用了大部分或全部数据的另一迭代开始前,它是更快地使用list()替代tee().

B.如果可能:复制创建元素的生成器的状态

要使此选项起作用,您可能需要访问流的内部工作方式.换句话说,生成器 - 创建元素的部分 - 应该首先支持复制.[OP:看到这个好的答案,作为如何在问题中为例子做到这一点的一个例子]

它不适用于用户的输入,因为您必须复制整个"外部世界"的状态.Java Stream不支持复制,因为它设计得尽可能通用,专门用于处理文件,网络,键盘,传感器,随机性等.[OP:另一个例子是按需读取温度传感器的流.不存储读数副本就不能复制]

这不仅仅是Java中的情况; 这是一般规则.您可以看到,std::istream在C++中,仅支持移动语义,而不支持复制语义("复制构造函数(已删除)"),因此(以及其他).

  • +1 很棒的答案;可能会接受并链接到当前接受的答案作为“B”点的具体示例。 (2认同)
  • 阻塞队列将是一种允许有界存储问题的解决方案,其中第一个流的读取器将被阻塞,直到第二个流被消耗为止.自然,并不总是适用,但可能适用于某些用例esp.有一个大缓冲区. (2认同)

Adr*_*ian 7

从 Java 12 开始,我们Collectors::teeing可以将主流管道的元素传递给 2 个或更多下游收集器。

根据您的示例,我们可以执行以下操作:

@Test
void shouldProcessStreamElementsInTwoSeparateDownstreams() {
    class Result {
        List<Integer> multiplesOf7;
        List<Integer> multiplesOf5;

        Result(List<Integer> multiplesOf7, List<Integer> multiplesOf5) {
            this.multiplesOf7 = multiplesOf7;
            this.multiplesOf5 = multiplesOf5;
        }
    }

    var result = IntStream.range(1, 100)
            .filter(n -> n % 2 == 0)
            .boxed()
            .collect(Collectors.teeing(
                    Collectors.filtering(n -> n % 7 == 0, Collectors.toList()),
                    Collectors.filtering(n -> n % 5 == 0, Collectors.toList()),
                    Result::new
            ));

    assertTrue(result.multiplesOf7.stream().allMatch(n -> n % 7 == 0));
    assertTrue(result.multiplesOf5.stream().allMatch( n -> n % 5 == 0));
}
Run Code Online (Sandbox Code Playgroud)

还有许多其他收集器允许执行其他操作,例如通过Collectors::mapping在下游使用,您可以从同一源获取两个不同的对象/类型,如本文所示。


Luk*_*der 6

如果你正在缓冲你在一个副本中消耗的元素,但在另一个副本中却没有.

我们duplicate()jOOλ中实现了一个流方法,这是一个我们创建的开源库,用于改进jOOQ的集成测试.基本上,你可以写:

Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
    IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate();
Run Code Online (Sandbox Code Playgroud)

(注意:我们目前需要打包流,因为我们尚未实现IntSeq)

在内部,有一个LinkedList缓冲区存储从一个流中消耗但从另一个流中消耗的所有值.如果您的两个流以相同的速率消耗,那么这可能是有效的.

以下是算法的工作原理:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final LinkedList<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return tuple(seq(new Duplicate()), seq(new Duplicate()));
}
Run Code Online (Sandbox Code Playgroud)

这里有更多源代码

事实上,使用jOOλ,你将能够像这样写一个完整的单行:

Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
    IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate()
 .map1(s -> s.filter(n -> n % 7 == 0))
 .map2(s -> s.filter(n -> n % 5 == 0));

// This will yield 14, 28, 42, 56...
desired_streams.v1.forEach(System.out::println)

// This will yield 10, 20, 30, 40...
desired_streams.v2.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

  • 谢谢,但当前接受的答案确实指出:“如果您在 s1 和 n2 元素中使用了 s2 高级 n1 元素,则必须将 |n2 - n1| 元素保留在内存中,以跟上步伐。如果您的流是无限的,则会有对所需的存储没有上限。” (3认同)

Tom*_*RKA 5

您还可以将流生成移动到单独的方法/函数中,该方法/函数返回此流并调用它两次。