每当我同时使用subscribeOn和publishOn时,都不会打印任何内容.如果我只使用一个它将打印.如果我使用subscribeOn(Schedulers.immediate())或弹性它的工作原理.任何想法为什么会这样?
我的理解是,publishOn会影响它发布的线程,并在订阅者运行的线程上进行订阅.你能指点我正确的方向吗?
fun test() {
val testPublisher = EmitterProcessor.create<String>().connect()
testPublisher
.publishOn(Schedulers.elastic())
.map { it ->
println("map on ${Thread.currentThread().name}")
it
}
.subscribeOn(Schedulers.parallel())
.subscribe { println("subscribe on ${Thread.currentThread().name}") }
testPublisher.onNext("a")
testPublisher.onNext("b")
testPublisher.onNext("c")
Thread.sleep(5000)
println("---")
}
Run Code Online (Sandbox Code Playgroud) 简单的例子来理解线程流程。
[ gshp subscribedOn-1 ] INFOreactor.Flux.FlatMap.1 -onSubscribe ( FluxFlatMap.FlatMapMain )
[ gshppublishOn-7 ] INFOreactor.Flux.FlatMap.1 - onNext(6)
这里reactor.Flux.FlatMap.1对于gshp subscribedOn-1和gshppublishOn-7是通用的
当我们运行java时,它从主线程开始,之后会发生什么,它会创建gshp subscribedOn-1还是reactor.Flux.FlatMap.1?
@Test
public void setUpTestTest() {
Scheduler scheduler1 = Schedulers.newParallel("gshp subscribedOn", 3);
Scheduler scheduler2 = Schedulers.newParallel("gshp publishOn", 6);
Flux<String> flux = Flux.range(1, 200)
.flatMap(s-> Flux.just(""+s)
.publishOn(scheduler2)
.concatMap(d->processMessagefluxpause(d, "test")))
.log()
.subscribeOn(scheduler1);
StepVerifier.create(flux).expectNextCount(20).verifyComplete();
}
Run Code Online (Sandbox Code Playgroud)