小编dyy*_*lex的帖子

如何使用 Reactor 3.x 将 List<T> 转换为 Flux<T>

我有一个异步调用节俭接口:

public CompletableFuture<List<Long>> getFavourites(Long userId){
    CompletableFuture<List<Long>> future = new CompletableFuture();
    OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
    callback.addObserver(new OctoObserver() {
        @Override
        public void onSuccess(Object o) {
            future.complete((List<Long>) o);
        }

        @Override
        public void onFailure(Throwable throwable) {
            future.completeExceptionally(throwable);
        }
    });
    try {
        recommendAsyncService.getFavorites(userId, callback);
    } catch (TException e) {
        log.error("OctoCall RecommendAsyncService.getFavorites", e);
    }
    return future;
}
Run Code Online (Sandbox Code Playgroud)

现在它返回一个 CompletableFuture<List>。然后我用 Flux 调用它来做一些处理器。

public Flux<Product> getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
    // do not like it
    List<Long> recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);

    System.out.println(recommendList);
    return Flux.fromIterable(recommendList)
            .flatMap(id -> …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming project-reactor

7
推荐指数
2
解决办法
3万
查看次数