Infinite Java Stream和Reactor Flux之间的区别

Fel*_*lix 4 java reactor flux java-stream spring-webflux

我正在尝试找出无限流和无限磁通量(如果有)之间的概念差异。

为此,我想出了以下示例来说明无限的流/通量

@Test
public void infinteStream() {

  //Prints infinite number of integers
  Stream<Integer> infiniteStream = Stream.iterate(0, i -> i+1);

  infiniteStream.forEach(System.out::println);
}

@Test
public void infiniteFlux()  {

   //Prints infinite number of date strings (every second)
   Flux<LocalDateTime> localDateTimeFlux = Flux.interval(Duration.ofSeconds(1))
            .map(t -> LocalDateTime.now());

    localDateTimeFlux.subscribe(t -> System.out.println(t.format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"))));
}
Run Code Online (Sandbox Code Playgroud)

关于这些示例,我有一个问题:是否有带Flux的infinteStream()和带Stream的infinteFlux()的模拟?而且,更一般而言,无限流和磁通量之间有什么区别吗?

在此先感谢Felix

Bri*_*zel 8

Stream并且Flux有很大的不同:

  • Stream 只能使用一次,而您可以多次订阅 Flux
  • Stream基于拉(使用一个元素调用下一个元素)与Flux拥有混合推/拉模型,发布者可以推入元素,但仍然必须尊重消费者发出的背压
  • Stream是同步序列还是Flux可以表示异步序列

在示例中,您使用生成了一个无限的值序列Stream,它们的生成和消耗要尽可能快。在您的Flux示例中,您以固定的时间间隔生成值(我不确定您可以使用Stream)。使用Flux,您也可以Flux.generate无间隔地进行排序,就像您的Stream示例一样。

通常,您可以考虑FluxStream+ CompletableFuture与以下项混合使用:

  • 很多强大的运营商
  • 背压支持
  • 控制发布者和订阅者的行为
  • 控制时间的概念(缓冲值窗口,添加超时和后退等)
  • 为通过网络(从数据库或远程Web API)获取的异步序列量身定制的内容

  • 在什么情况下你应该使用流超过通量? (2认同)