在 Reactor 中进行阻塞调用时,publishOn 与 subscribeOn

kou*_*sen 5 project-reactor reactive-streams

在博客文章Flight of the Flux 3中,作者建议将同步阻塞调用包装在Monowith a subscribeOncall 中,如文章中的以下代码片段所示:

final Flux<String> betterFetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
    .flatMap(url -> 
             //wrap the blocking call in a Mono
             Mono.fromCallable(() -> blockingWebClient.get(url))
             //ensure that Mono is subscribed in an boundedElastic Worker
             .subscribeOn(Schedulers.boundedElastic())
    ); //each individual URL fetch runs in its own thread!
}
Run Code Online (Sandbox Code Playgroud)

但在同一篇文章的前面,他们表明您可以使用它publishOn来确保在单独的线程上完成阻塞调用:

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Run Code Online (Sandbox Code Playgroud)

既然如此,为什么不直接betterFetchUrls使用方法来实现呢publishOn

final Flux<String> betterFetchUrls(List<String> urls) {
    return Flux.fromIterable(urls)
        .publishOn(Schedulers.boundedElastic())
        .map(url -> blockingWebClient.get(url));
}
Run Code Online (Sandbox Code Playgroud)

那不是更简单吗?附录 C中的 Reactor 参考手册也在 aMono和 a中进行了包装调用subscribeOn,因此我认为一定有一个首选原因,但我无法弄清楚该原因可能是什么。

感谢您的任何见解。

Pra*_*dey 0

您的代码实际上很好,但您必须使用 aflatMap而不是 a mapas:

final Flux<String> betterFetchUrls(List<String> urls) {
    return Flux.fromIterable(urls)
        .publishOn(Schedulers.boundedElastic())
        .flatMap(url -> blockingWebClient.get(url));
}
Run Code Online (Sandbox Code Playgroud)

两者subscribeOnpublishOn进行上下文切换,因此两者都是相当昂贵的操作(尽管我不知道哪一个更昂贵,但我会喜欢一些有关此的文档)。但通常接受的进行阻塞调用的方法是使用subscribeOn文档中提到的 a 。