Flux.create和Flux.generate之间的差异

JJ *_*kar 12 project-reactor

Flux.create和之间有什么区别Flux.generate?我正在寻找 - 理想情况下使用示例用例 - 来了解何时应该使用其中一个.

Sta*_*lfi 19

简而言之:

Flux::create没有对应用程序状态的变化做出反应Flux::generate.


长版

Flux::create

当您想要计算多个(0 ...无穷大)值时,您将使用它,这些值不受应用程序状态和管道状态的影响(管道 == Flux::create == 下游之后的操作链) .

为什么?因为您发送的方法会Flux::create持续计算元素(或无).在下游将确定有多少元素(==元素旁边信号吧)想如果他能跟不上,在一些战略缓冲,其已经发出将被删除那些元素/(默认情况下它们将被缓冲,直到下游的意志要求更多).

第一个也是最简单的用例是用于发出值,理论上,这些值可以汇总到一个集合中,然后只取每个元素并对其执行某些操作:

Flux<String> articlesFlux = Flux.create((FluxSink<String> sink) -> {
    /* get all the latest article from a server and
       emit them one by one to downstream. */
    List<String> articals = getArticalsFromServer();
    articals.forEach(sink::next);
});
Run Code Online (Sandbox Code Playgroud)

如您所见,Flux.create用于阻塞方法(getArticalsFromServer)与异步代码之间的交互.

我确定还有其他用例Flux.create.


Flux::generate

Flux.generate((SynchronousSink<Integer> synchronousSink) -> {
        synchronousSink.next(1);
    })
    .doOnNext(number -> System.out.println(number))
    .doOnNext(number -> System.out.println(number + 4))
    .subscribe();
Run Code Online (Sandbox Code Playgroud)

输出将是 1 5 1 5 1 5................forever

您发送给的方法的每次调用Flux::generate,synchronousSink只能发出:onSubscribe onNext? (onError | onComplete)?.

这意味着Flux::generate按需计算和发出值.你应该什么时候使用它?在情况下,它太贵计算可能未使用的元素下游或你发出的是应用的状态,或者从您的状态受到影响的事件管道(您管道 ==操作链,其自带之后 == 下游). Flux::create

例如,如果您正在构建torrent应用程序,那么您实时接收数据块.您可以使用Flux::generate将任务(块下载)分配给多个线程,Flux::generate只有当某个线程询问时,您才会计算要在其中下载的块.所以你只会发出你没有的块.相同的算法Flux::create将失败,因为Flux::create将发出我们没有的所有块,如果某些块无法下载,那么我们就会遇到问题.因为Flux::create没有对应用程序状态的变化作出反应Flux::generate.


vin*_*ins 14

创建:

在此处输入图片说明

  • 接受一个 Consumer<FluxSink<T>>
  • 每个订阅者只调用一次消费者
  • 消费者可以立即发出 0..N 个元素
  • 发布者不知道下游状态。所以我们需要提供溢出策略作为附加参数
  • 我们可以获得FluxSink的引用,使用它我们可以在需要时使用多线程继续发射元素。

产生:

在此处输入图片说明

  • 接受一个 Consumer<SynchronousSink<T>>
  • 根据下游需求一次次调用消费者
  • 消费者最多只能发出一个元素,并带有可选的完成/错误信号。
  • 发布者根据下游需求生产元素
  • 我们可以得到SynchronousSink的引用。但它可能不是很有用,因为我们只能发出一个元素

查看此博客了解更多详情。