Project Reactor 3 中的 publishOn 与 subscribeOn

Kay*_*ayV 13 publisher publish-subscribe reactive-programming project-reactor reactive-streams

我在相同的通量上使用 publishOn 和 subscribeOn,如下所示:

    System.out.println("*********Calling Concurrency************");
    List<Integer> elements = new ArrayList<>();
    Flux.just(1, 2, 3, 4)
      .map(i -> i * 2)
      .log()
      .publishOn(Schedulers.elastic())
      .subscribeOn(Schedulers.parallel())
      .subscribe(elements::add);
    System.out.println("-------------------------------------");
Run Code Online (Sandbox Code Playgroud)

虽然,当我同时使用两者时,日志中没有打印任何内容。但是当我只使用 publishOn 时,我得到了以下信息日志:

*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Run Code Online (Sandbox Code Playgroud)

是不是publishOn 比subscribeOn 更值得推荐?或者它比 subscribeOn 有更多的偏好?两者有什么区别以及何时使用哪个?

Ism*_*ush 14

我花了一些时间才理解它,可能是因为publishOn之前通常解释过subscribeOn,这里有一个希望更简单的外行解释。

subscribeOn意味着subscribe(), onSubscribe() and request()在指定的调度程序工作者(其他线程)上运行初始源发射,并且对于任何后续操作也是相同的,例如onNext/onError/onComplete, map etc,无论 subscribeOn() 的位置如何,都会发生这种行为

如果你没有publishOn在流畅的调用中做任何事情,那么一切都会在这样的线程上运行。

但是,只要您publishOn()在中间调用,那么任何后续操作员调用都将在提供的调度程序工作器上运行到此类publishOn().

这是一个例子

Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());

Flux.range(1, 5)
        .doOnNext(consumer)
        .map(i -> {
          System.out.println("Inside map the thread is " + Thread.currentThread().getName());
          return i * 10;
        })
        .publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
        .doOnNext(consumer)
        .publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
        .doOnNext(consumer)
        .subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
        .subscribe();
Run Code Online (Sandbox Code Playgroud)

结果是


1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5

Run Code Online (Sandbox Code Playgroud)

正如您所看到的,第一个doOnNext()和下面map()的线程在被调用的线程上运行subscribeOn_thread,直到任何publishOn()被调用的线程都发生这种情况,然后任何后续调用都将在提供的调度程序上运行,publishOn()并且在任何后续调用中都会发生这种情况,直到有人调用另一个publishOn()

  • 如果您忘记了 map 每个周期执行一次而不是提前执行的事实,那么您的代码将是一个痛苦的脑筋急转弯,“Inside map”使用相同的线程名称打印这一事实是对 subscribeOn 如何更改的一个很棒的解释发射的线程以及publishOn如何根据链的位置而不是其调用的时刻来改变执行线程。很不错 (2认同)

Kay*_*ayV 8

这是我得到的一个小文档:

在订阅者链的中间,publishOn 的应用方式与任何其他运营商相同。它从下游获取信号并在上游重放它们,同时从关联的调度程序对工作线程执行回调。因此,它会影响后续运算符的执行位置(直到链接另一个 publishOn)。

subscribeOn 适用于订阅过程,当反向链被构建时。因此,无论您将 subscribeOn 放在链中的哪个位置,它始终会影响源发射的上下文。但是,这不会影响对 publishOn 的后续调用的行为。他们仍然为后面的链部分切换执行上下文。

publishOn 强制下一个操作符(可能还有下一个操作符之后的后续操作符)在不同的线程上运行。类似地, subscribeOn 强制前一个操作符(也可能是前一个操作符之前的操作符)在不同的线程上运行。

  • 准确地说,“publishOn”应该是从上游到下游,而不是相反 (5认同)