kou*_*sen 5 project-reactor reactive-streams
在博客文章Flight of the Flux 3中,作者建议将同步阻塞调用包装在Monowith a subscribeOncall 中,如文章中的以下代码片段所示:
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.flatMap(url ->
//wrap the blocking call in a Mono
Mono.fromCallable(() -> blockingWebClient.get(url))
//ensure that Mono is subscribed in an boundedElastic Worker
.subscribeOn(Schedulers.boundedElastic())
); //each individual URL fetch runs in its own thread!
}
Run Code Online (Sandbox Code Playgroud)
但在同一篇文章的前面,他们表明您可以使用它publishOn来确保在单独的线程上完成阻塞调用:
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Run Code Online (Sandbox Code Playgroud)
既然如此,为什么不直接betterFetchUrls使用方法来实现呢publishOn?
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url));
}
Run Code Online (Sandbox Code Playgroud)
那不是更简单吗?附录 C中的 Reactor 参考手册也在 aMono和 a中进行了包装调用subscribeOn,因此我认为一定有一个首选原因,但我无法弄清楚该原因可能是什么。
感谢您的任何见解。
您的代码实际上很好,但您必须使用 aflatMap而不是 a mapas:
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.publishOn(Schedulers.boundedElastic())
.flatMap(url -> blockingWebClient.get(url));
}
Run Code Online (Sandbox Code Playgroud)
两者subscribeOn都publishOn进行上下文切换,因此两者都是相当昂贵的操作(尽管我不知道哪一个更昂贵,但我会喜欢一些有关此的文档)。但通常接受的进行阻塞调用的方法是使用subscribeOn文档中提到的 a 。
| 归档时间: |
|
| 查看次数: |
3520 次 |
| 最近记录: |