从多个供应商生成一个流

man*_*ton 5 java java-8 java-stream

我正在尝试使用 Java 8 Lambda/Stream API 来建模一个简单的生产者/消费者系统,就像:

    Stream<Sample> samplesFlow = Stream.generate(new SampleSupplier());
    samplesFlow.forEach(new SampleConsumer());
Run Code Online (Sandbox Code Playgroud)

我意识到向多个消费者“扩展”非常简单:

    samplesFlow
        .peek(new SampleConsumer1())
        .peek(new SampleConsumer2())
        .forEach(new SampleConsumer3());
Run Code Online (Sandbox Code Playgroud)

但是如何在系统中添加新的生产者呢?是否有一些惯用的或“优雅的”方式来从几个无限的供应商生成一个流?喜欢:

Stream.generate(new SampleSupplier1(),new SampleSupplier2());  // made it up
Run Code Online (Sandbox Code Playgroud)

每个供应商都模拟一个从远程源获取数据的网络侦听器。

提前致谢!

Hol*_*ger 5

您没有指定要如何组合提供的值。

如果你想让这些值交替,一个解决方案是:

Stream.generate(supplier1).flatMap(x->Stream.of(x, supplier2.get()))
Run Code Online (Sandbox Code Playgroud)

如果你想要某种对,你可以使用

Stream.generate(()->new Pair<>(supplier1.get(), supplier2.get()))
Run Code Online (Sandbox Code Playgroud)

尽管由您来编写Pair该类,因为 jdk 不提供这样的类。(你可以滥用,AbstractMap.SimpleEntry但那很讨厌)。


如果您有一个有限的,Stream您可以使用它Stream.concat来创建一个流,该流将在第二个项目之前处理第一个流的所有项目,但是,Supplier默认情况下,使用 a 创建的流是无限的,因此您必须先limit在第一个流上使用,然后才能将它连接到另一个流,所以它不是一个通用的解决方案。


如果你想查询每一个Supplier恰好一次,你可以使用

Stream.of(supplier1, supplier2).map(Supplier::get)
Run Code Online (Sandbox Code Playgroud)

不过,当然,如果您不需要对Suppliers 进行惰性求值,

Stream.of(supplier1.get(), supplier2.get())
Run Code Online (Sandbox Code Playgroud)

也会这样做。

  • @VGR:这不起作用,因为从“供应商”生成的流是*无限的*,因此您无法一个接一个地处理流。我会扩展我的答案,因为这似乎一遍又一遍地弹出。 (2认同)