dyy*_*lex 7 java reactive-programming project-reactor
我有一个异步调用节俭接口:
public CompletableFuture<List<Long>> getFavourites(Long userId){
CompletableFuture<List<Long>> future = new CompletableFuture();
OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
callback.addObserver(new OctoObserver() {
@Override
public void onSuccess(Object o) {
future.complete((List<Long>) o);
}
@Override
public void onFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
});
try {
recommendAsyncService.getFavorites(userId, callback);
} catch (TException e) {
log.error("OctoCall RecommendAsyncService.getFavorites", e);
}
return future;
}
Run Code Online (Sandbox Code Playgroud)
现在它返回一个 CompletableFuture<List>。然后我用 Flux 调用它来做一些处理器。
public Flux<Product> getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
// do not like it
List<Long> recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);
System.out.println(recommendList);
return Flux.fromIterable(recommendList)
.flatMap(id -> Mono.defer(() -> Mono.just(Product.builder()
.userId(userId)
.productId(id)
.productType((int) (Math.random()*100))
.build())))
.take(5)
.publishOn(mdpScheduler);
}
Run Code Online (Sandbox Code Playgroud)
但是,我想从getFavourites方法中获取 Flux 并且可以在getRecommend方法中使用它。
或者,您可以推荐一个Flux API,我可以将其转换List<Long> recommendList为Flux<Long> recommendFlux。
小智 5
简单的解决方案是使用Flux.fromIterable如下例所示
public Flux<Integer> fromListToFlux(){
List<Integer> intList = Arrays.asList(1,2,5,7);
return Flux.fromIterable(intList);
}
Run Code Online (Sandbox Code Playgroud)
Springboot版本为3.1.0
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
Run Code Online (Sandbox Code Playgroud)
注意:不建议这样做,因为当您在反应式管道中工作(此处创建列表)时,它不会完全反应。
要将 a 转换CompletableFuture<List<T>>为 a,Flux<T>您可以使用Mono#fromFuturewith Mono#flatMapMany:
var future = new CompletableFuture<List<Long>>();
future.completeAsync(() -> List.of(1L, 2L, 3L, 4L, 5L),
CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));
Flux<Long> flux = Mono.fromFuture(future).flatMapMany(Flux::fromIterable);
flux.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
List<T>在回调中异步接收的也可以转换为 aFlux<T>而不使用CompletableFuture. 您可以直接Mono#create使用Mono#flatMapMany:
Flux<Long> flux = Mono.<List<Long>>create(sink -> {
Callback<List<Long>> callback = new Callback<List<Long>>() {
@Override
public void onResult(List<Long> list) {
sink.success(list);
}
@Override
public void onError(Exception e) {
sink.error(e);
}
};
client.call("query", callback);
}).flatMapMany(Flux::fromIterable);
flux.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
或者简单地Flux#create在一次传递中使用多个发射:
Flux<Long> flux = Flux.create(sink -> {
Callback<List<Long>> callback = new Callback<List<Long>>() {
@Override
public void onResult(List<Long> list) {
list.forEach(sink::next);
}
@Override
public void onError(Exception e) {
sink.error(e);
}
};
client.call("query", callback);
});
flux.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
29040 次 |
| 最近记录: |