相关疑难解决方法(0)

在flux上使用publishOn和subscribeOn都不会发生任何事情

每当我同时使用subscribeOn和publishOn时,都不会打印任何内容.如果我只使用一个它将打印.如果我使用subscribeOn(Schedulers.immediate())或弹性它的工作原理.任何想法为什么会这样?

我的理解是,publishOn会影响它发布的线程,并在订阅者运行的线程上进行订阅.你能指点我正确的方向吗?

fun test() {
        val testPublisher = EmitterProcessor.create<String>().connect()
        testPublisher
                .publishOn(Schedulers.elastic())
                .map { it ->
                    println("map on ${Thread.currentThread().name}")
                    it
                }
                .subscribeOn(Schedulers.parallel())  
                .subscribe { println("subscribe on ${Thread.currentThread().name}") }
        testPublisher.onNext("a")
        testPublisher.onNext("b")
        testPublisher.onNext("c")
        Thread.sleep(5000)
        println("---")
    }
Run Code Online (Sandbox Code Playgroud)

spring kotlin project-reactor

5
推荐指数
1
解决办法
3292
查看次数

项目反应堆:在订阅和发布的情况下线程将如何创建,流程如何?,堆栈跟踪?

简单的例子来理解线程流程。

  1. [ gshp subscribedOn-1 ] INFOreactor.Flux.FlatMap.1 -onSubscribe ( FluxFlatMap.FlatMapMain )

  2. [ gshppublishOn-7 ] INFOreactor.Flux.FlatMap.1 - onNext(6)

这里reactor.Flux.FlatMap.1对于gshp subscribedOn-1gshppublishOn-7是通用的

当我们运行java时,它从主线程开始,之后会发生什么,它会创建gshp subscribedOn-1还是reactor.Flux.FlatMap.1

  @Test
  public void setUpTestTest() {
      Scheduler scheduler1 = Schedulers.newParallel("gshp subscribedOn", 3);
      Scheduler scheduler2 = Schedulers.newParallel("gshp publishOn", 6);
      Flux<String> flux = Flux.range(1, 200)
                              .flatMap(s-> Flux.just(""+s)
                                               .publishOn(scheduler2)
                                               .concatMap(d->processMessagefluxpause(d, "test")))
                                               .log()
                              .subscribeOn(scheduler1);

    StepVerifier.create(flux).expectNextCount(20).verifyComplete();
}
Run Code Online (Sandbox Code Playgroud)

这意味着什么,流程如何? 在此输入图像描述

java reactive-programming project-reactor spring-webflux

1
推荐指数
1
解决办法
1228
查看次数