项目反应堆:在订阅和发布的情况下线程将如何创建,流程如何?,堆栈跟踪?

G S*_*Ari 1 java reactive-programming project-reactor spring-webflux

简单的例子来理解线程流程。

  1. [ gshp subscribedOn-1 ] INFOreactor.Flux.FlatMap.1 -onSubscribe ( FluxFlatMap.FlatMapMain )

  2. [ gshppublishOn-7 ] INFOreactor.Flux.FlatMap.1 - onNext(6)

这里reactor.Flux.FlatMap.1对于gshp subscribedOn-1gshppublishOn-7是通用的

当我们运行java时,它从主线程开始,之后会发生什么,它会创建gshp subscribedOn-1还是reactor.Flux.FlatMap.1

  @Test
  public void setUpTestTest() {
      Scheduler scheduler1 = Schedulers.newParallel("gshp subscribedOn", 3);
      Scheduler scheduler2 = Schedulers.newParallel("gshp publishOn", 6);
      Flux<String> flux = Flux.range(1, 200)
                              .flatMap(s-> Flux.just(""+s)
                                               .publishOn(scheduler2)
                                               .concatMap(d->processMessagefluxpause(d, "test")))
                                               .log()
                              .subscribeOn(scheduler1);

    StepVerifier.create(flux).expectNextCount(20).verifyComplete();
}
Run Code Online (Sandbox Code Playgroud)

这意味着什么,流程如何? 在此输入图像描述

Tho*_*olf 5

在这里您可以阅读有关subscribeOn和 的内容publishOn

发布与订阅

基本上,一旦有人订阅,整个链就会被创建,并且调用会被分配一个线程。如果链中的任何位置都有a,则整个调用都将使用此调度程序。所以放在subscribeOn哪里并不重要。subscribeOn

您可以在日志中看到,它开始时,调用被放置在subscribeOn调度程序上。

另一方面publishOn,一旦我们到达该语句,线程就会中途切换到该调度程序。所以这更取决于它在链中的位置。

您的日志显示,当内部 Flux 发出时,它是在publishOn调度程序上发出的。