Project Reactor - 订阅并行调度程序不起作用

wst*_*wst 0 kotlin project-reactor reactive-streams

我正在查看示例并阅读文档,在尝试以并行方式订阅 Flux 时发现了一些问题。

我有 3 个功能,如下所示。

private val log = LoggerFactory.getLogger("main")
private val sequence = Flux.just(1, 2)

fun a() {
    sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
    sequence.subscribe { log.info(">>> {}", it) }
}

fun b() {
    sequence.subscribe { log.info(">>> {}", it) }
}

fun c() {
    sequence.subscribeOn(Schedulers.parallel()).subscribe { log.info("*** {}", it) }
}
Run Code Online (Sandbox Code Playgroud)

a()现在,当我单独运行每个方法时,我从函数和中获得了正确的输出b(),但从 的输出c()为空。这是预期的吗?是设计使然吗?如果是这样,为什么会发生这种情况?

Sim*_*slé 5

Flux.just(...)捕获值,因此被优化为在订阅中立即执行Thread

当您使用 时subscribeOn,您可以将订阅更改Threadmain其他内容,从而实现just真正的异步。

在 中a(),如果没有subscribeOn那一秒just,主线程就会阻塞,以致测试在异步替代方案完成之前无法完成。

在 中c(),不存在这样的main线程阻塞。因此,测试在异步just有时间发出任何内容之前终止,这就是您看不到输出的原因。

为了使其更加明显,请添加 a Thread.sleep(10),您将看到一些输出。