在文档中写道,您应该将阻塞代码包装到Mono:http://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
但没有写如何实际做到这一点。
我有以下代码:
@PostMapping(path = "some-path", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> doeSomething(@Valid @RequestBody Flux<Something> something) {
something.subscribe(something -> {
// some blocking operation
});
// how to return Mono<Void> here?
}
Run Code Online (Sandbox Code Playgroud)
我在这里遇到的第一个问题是我需要归还一些东西,但我不能。例如,如果我返回 a,Mono.empty则请求将在通量工作完成之前关闭。
第二个问题是:我如何实际包装阻塞代码,就像文档中建议的那样:
Mono blockingWrapper = Mono.fromCallable(() -> {
return /* make a remote synchronous call */
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic());
Run Code Online (Sandbox Code Playgroud)
您不应subscribe在控制器处理程序中调用,而应构建反应式管道并返回它。最终,HTTP 客户端将请求数据(通过 Spring WebFlux 引擎),这就是向管道订阅和请求数据的原因。
手动订阅会将请求处理与其他操作分离,这将 1) 消除有关操作顺序的任何保证,2) 如果其他操作正在使用 HTTP 资源(例如请求正文),则会中断处理。
在这种情况下,源不会阻塞,只有变换操作会阻塞。因此,我们最好使用publishOn来表示链的其余部分应该在特定的调度程序上执行。如果这里的操作是 I/O 密集型的,那么Schedulers.elastic()是最好的选择,如果它是 CPU 密集型的,那么Schedulers .paralell是更好的选择。这是一个例子:
@PostMapping(path = "/some-path", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> doSomething(@Valid @RequestBody Flux<Something> something) {
return something.collectList()
.publishOn(Schedulers.elastic())
.map(things -> {
return processThings(things);
})
.then();
}
public ProcessingResult processThings(List<Something> things) {
//...
}
Run Code Online (Sandbox Code Playgroud)
有关该主题的更多信息,请查看reactor文档中的调度程序部分。如果您的应用程序倾向于执行很多类似的操作,那么您将失去反应式流的许多优势,并且您可能会考虑切换到基于 Servlet 的模型,在该模型中您可以相应地配置线程池。
| 归档时间: |
|
| 查看次数: |
4655 次 |
| 最近记录: |