Avi*_*and 3 spring reactive-programming spring-boot project-reactor spring-webflux
我的 Spring boot 应用程序中有一个如下所示的方法。
public Flux<Data> search(SearchRequest request) {
Flux<Data> result = searchService.search(request);//this returns Flux<Data>
Mono<List<Data>> listOfData = result.collectList();
// doThisAsync() // here I want to pass this list and run some processing on it
// the processing should happen async and the search method should return immediately.
return result;
}
//this method uses the complete List<Data> returned by above method
public void doThisAsync(List<Data> data) {
//do some processing here
}
Run Code Online (Sandbox Code Playgroud)
目前,我正在使用带@Async注释的服务类doThisAsync,但不知道如何传递List<Data>,因为我不想调用block. 我所拥有的只是Mono<List<Data>>.
我的主要问题是如何单独处理这个 Mono 并且该search方法应该返回Flux<Data>.
Mar*_*nyi 15
Mono/Fluxpublic Flux<Data> search(SearchRequest request)
{
return searchService.search(request)
.collectList()
.doOnNext(data -> doThisAsync(data).subscribe()) // add error logging here or inside doThisAsync
.flatMapMany(Flux::fromIterable);
}
public Mono<Void> doThisAsync(List<Data> data) {
//do some async/non-blocking processing here like calling WebClient
}
Run Code Online (Sandbox Code Playgroud)
public Flux<Data> search(SearchRequest request)
{
return searchService.search(request)
.collectList()
.doOnNext(data -> Mono.fromRunnable(() -> doThisAsync(data))
.subscribeOn(Schedulers.elastic()) // delegate to proper thread to not block main flow
.subscribe()) // add error logging here or inside doThisAsync
.flatMapMany(Flux::fromIterable);
}
public void doThisAsync(List<Data> data) {
//do some blocking I/O on calling thread
}
Run Code Online (Sandbox Code Playgroud)
请注意,在上述两种情况下,您都失去了背压支持。如果由于doAsyncThis某种原因变慢,那么数据生产者不会关心并继续生产项目。这是火与雾机制的自然结果。
| 归档时间: |
|
| 查看次数: |
4432 次 |
| 最近记录: |