在flux上使用publishOn和subscribeOn都不会发生任何事情

Val*_*ahu 5 spring kotlin project-reactor

每当我同时使用subscribeOn和publishOn时,都不会打印任何内容.如果我只使用一个它将打印.如果我使用subscribeOn(Schedulers.immediate())或弹性它的工作原理.任何想法为什么会这样?

我的理解是,publishOn会影响它发布的线程,并在订阅者运行的线程上进行订阅.你能指点我正确的方向吗?

fun test() {
        val testPublisher = EmitterProcessor.create<String>().connect()
        testPublisher
                .publishOn(Schedulers.elastic())
                .map { it ->
                    println("map on ${Thread.currentThread().name}")
                    it
                }
                .subscribeOn(Schedulers.parallel())  
                .subscribe { println("subscribe on ${Thread.currentThread().name}") }
        testPublisher.onNext("a")
        testPublisher.onNext("b")
        testPublisher.onNext("c")
        Thread.sleep(5000)
        println("---")
    }
Run Code Online (Sandbox Code Playgroud)

Sim*_*slé 17

subscribeOn而不是影响订阅发生的地方.也就是说,触发源发出元素的初始事件.该SubscriberonNext,另一方面钩子在最接近的影响publishOn在环比上涨(非常喜欢你map).

但是EmitterProcessor,像大多数Processors一样,它更先进,可以做一些偷工作.我不确定为什么你没有在你的情况下打印任何东西(你的样本转换为Java在我的机器上工作),但我敢打赌它与该处理器有关.

此代码将更好地演示subscribeOnvs publishOn:

Flux.just("a", "b", "c") //this is where subscription triggers data production
        //this is influenced by subscribeOn
        .doOnNext(v -> System.out.println("before publishOn: " + Thread.currentThread().getName()))
        .publishOn(Schedulers.elastic())
        //the rest is influenced by publishOn
        .doOnNext(v -> System.out.println("after publishOn: " + Thread.currentThread().getName()))
        .subscribeOn(Schedulers.parallel())
        .subscribe(v -> System.out.println("received " + v + " on " + Thread.currentThread().getName()));
    Thread.sleep(5000);
Run Code Online (Sandbox Code Playgroud)

打印出:

before publishOn: parallel-1
before publishOn: parallel-1
before publishOn: parallel-1
after publishOn: elastic-2
received a on elastic-2
after publishOn: elastic-2
received b on elastic-2
after publishOn: elastic-2
received c on elastic-2
Run Code Online (Sandbox Code Playgroud)

  • 在这种情况下,您不一定需要这两个,subscribeOn 应该足以更改初始线程 (2认同)