CompletableFuture:异步调用void函数

mad*_*die 5 java callable runnable completable-future

我试图在某些数据库异常上使用重试策略实现数据库查询.重试策略的代码不是很相关,所以我没有包含它.正如您在下面的代码中看到的那样 - 我编写了一个retryCallable,它采用了重试策略和Callable populateData().

getDataFromDB,我从DB获取数据并将数据放在全局散列图中,该散列图在应用程序级别充当缓存.

此代码按预期工作.我想populateData从另一个班级调用.但是,这将是一个阻止呼叫.由于这是数据库并且具有重试策略,因此这可能很慢.我想populateData异步调用.

我如何使用CompletableFuture或FutureTask来实现这一目标? CompletableFuture.runAsync期待一个可运行的.CompletableFuture.supplyAsync期待供应商.我以前没有实现过这些东西.所以关于最佳实践的任何建议都会有所帮助.

Class TestCallableRetry {

public void populateData() {
        final Callable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
        Set<String> data = new HashSet<>();

        data = retryCallable.call();

        if (data != null && !data.isEmpty()) {
            // store data in a global hash map
        }
    }

    private Callable<Set<Building>> getDataFromDB() {
        return new Callable<Set<String>>() {
            @Override
            public Set<String> call() {
                // returns data from database
            }
        };
    }
}

Class InvokeCallableAsynchronously {
    public void putDataInGlobalMap {
      // call populateData asynchronously
    }
}
Run Code Online (Sandbox Code Playgroud)

tep*_*pic 3

如果您将populateData方法分成两部分,一部分Supplier用于获取数据,另一部分Consumer用于存储数据,则可以轻松地将它们与CompletableFuture.

// Signature compatible with Supplier<Set<String>> 
private Set<String> fetchDataWithRetry() {
    final RetryingCallable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
    try {
        return retryCallable.call();
    } catch (Exception e) {
        log.error("Call to database failed", e);
        return Collections.emptySet();
    }
}

// Signature compatible with Consumer<Set<String>>
private void storeData(Set<String> data) {
    if (!data.isEmpty()) {
        // store data in a global hash map
    }
}
Run Code Online (Sandbox Code Playgroud)

然后,在populateData()

private ExecutorService executor = Executors.newCachedThreadPool();

public void populateData() {
    CompletableFuture
        .supplyAsync(this::fetchDataWithRetry, executor)
        .thenAccept(this::storeData);
}
Run Code Online (Sandbox Code Playgroud)

supplyAsync需要的版本的使用Executor是可选的。如果您使用单个参数版本,您的任务将在公共池中运行;对于短时间运行的任务来说可以,但对于阻塞的任务则不行。