FlatMap 何时会同时监听多个源?

Sta*_*lfi 0 java spring reactive-programming project-reactor reactive-streams

哪些情况会导致Flux::flatMap同时收听多个源(0...无穷大)?


我在实验时发现,当上游向flatMap线程内发送信号时thread-upstream-1,并且存在NflatMap 将侦听的内部流,并且每个内部流都在不同的线程中发送信号:thread-inner-stream-ifor 1<=i<=N,而不是每个1<=i<=Nif thread-upstream-1 != thread-inner-stream-iflatMap同时侦听所有内部流溪流。

我认为这并不完全正确,我错过了一些其他场景。

Sim*_*slé 7

flatMap不执行任何并行工作,例如:它不更改线程。最简单的例子是

Flux.range(1, 5).hide()
    .flatMap(v -> Flux.range(10 * v, 2))
    .log()
    .blockLast(); //for test purpose
Run Code Online (Sandbox Code Playgroud)

这打印:

[main] INFO  reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO  reactor.Flux.FlatMap.1 - request(unbounded)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(10)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(11)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(20)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(21)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(30)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(31)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(40)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(41)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(50)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(51)
[main] INFO  reactor.Flux.FlatMap.1 - onComplete()
Run Code Online (Sandbox Code Playgroud)

如您所见,仅在 中生成mainpublishOn如果在初始范围后添加 a ,flatMap则publishOn 将切换到同一单线程中生成所有内容。

flatMap然而,它的作用是订阅多个inner Publisher,最多可达concurrency默认值为Queues.SMALL_BUFFER_SIZE(256)的参数。

这意味着,如果将其设置为3flatMap则会将 3 个源元素映射到其内部Publisher并订阅这些发布者,但在开始映射更多源元素之前将等待至少一个完成。

如果内部Publisher使用publishOnor subscribeOn, thenflatMap自然会让它们的事件发生在 then 定义的线程中:

Flux.range(1, 5).hide()
    .flatMap(v -> Flux.range(v * 10, 2)
                      .publishOn(Schedulers.newParallel("foo", 3)))
    .flatMap(v -> Flux.range(10 * v, 2))
    .log()
    .blockLast(); //for test purpose
Run Code Online (Sandbox Code Playgroud)

哪个打印:

[main] INFO  reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO  reactor.Flux.FlatMap.1 - request(unbounded)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(10)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(11)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(20)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(21)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(30)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(31)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(50)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(51)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(40)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(41)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onComplete()
Run Code Online (Sandbox Code Playgroud)