如何在CompletionStage中异常地链接非阻塞动作

tkr*_*use 4 java asynchronous exception-handling java-8 completable-future

我正在用Java编写Play2应用程序服务方法,该方法应执行以下操作。异步调用方法A,如果失败,则异步调用方法B。

为了说明,假设该接口用于服务调用的后端:

public interface MyBackend {
    CompletionStage<Object> tryWrite(Object foo);
    CompletionStage<Object> tryCleanup(Object foo);
}
Run Code Online (Sandbox Code Playgroud)

因此,在我的服务方法中,我想返回一个可以完成以下操作的Future:

  • tryWrite成功完成
  • tryWrite失败和tryCleanup成功完成,但tryWrite()例外,失败

(注意:当然tryWrite()本身可以进行任何清理,这是一个简化的示例来说明问题)

在我看来,像这样调用后端的服务的实现似乎很困难,因为CompletionStage.exceptionally()方法不允许进行组合。

版本1:

public class MyServiceImpl {
    public CompletionStage<Object> tryWriteWithCleanup(Object foo) {

        CompletionStage<Object> writeFuture = myBackend.tryWrite(foo)
            .exceptionally((throwable) -> {
                CompletionStage<Object> cleanupFuture = myBackend.tryCleanup(foo);
                throw new RuntimeException(throwable);
        });
        return writeFuture;
    }
}
Run Code Online (Sandbox Code Playgroud)

因此,版本1以非阻塞方式调用tryCleanup(foo),但tryWriteWithCleanup()返回的CompletionStage不会等待cleanupFuture完成。如何更改此代码以从也将等待cleanupFuture完成的服务返回将来?

版本2:

public class MyServiceImpl {
    public CompletionStage<Object> tryWriteWithCleanup(Object foo) {

        final AtomicReference<Throwable> saveException = new AtomicReference<>();
        CompletionStage<Object> writeFuture = myBackend
            .tryWrite(foo)
            .exceptionally(t -> {
                saveException.set(t);
                // continue with cleanup
                return null;
            })
            .thenCompose((nil) -> {
                // if no cleanup necessary, return
                if (saveException.get() == null) {
                    return CompletableFuture.completedFuture(null);
                }
                return CompletionStage<Object> cleanupFuture = myBackend.tryCleanup(foo)
                    .exceptionally(cleanupError -> {
                        // log error
                        return null;
                    })
                    .thenRun(() -> {
                        throw saveException.get();
                    });
        });
        return writeFuture;
    }
}
Run Code Online (Sandbox Code Playgroud)

Version2使用外部AtomicReference来存储故障,如果发生故障,则在另一个thenCompose()块中进行异步第二次调用。

我所做的所有其他尝试都变得笨拙,以至于我不想将其粘贴到此处。

Did*_*r L 8

不幸的是CompletionStage/ CompletableFuture不提供带有组合的异常处理API。

您可以通过依赖于返回a的handle()a BiFunction来解决此问题CompletionStage。这会给你的嵌套阶段(CompletionStage<CompletionStage<Object>>您可以在“)UNNEST ”使用compose(identity())

public CompletionStage<Object> tryWriteWithCleanup(Object foo) {
    return myBackend.tryWrite(foo)
            .handle((r, e) -> {
                if (e != null) {
                    return myBackend.tryCleanup(foo)
                            .handle((r2, e2) -> {
                                // Make sure we always return the original exception
                                // but keep track of new exception if any,
                                // as if run in a finally block
                                if (e2 != null) {
                                    e.addSuppressed(e2);
                                }
                                // wrapping in CompletionException  behaves as if
                                // we threw the original exception
                                throw new CompletionException(e);
                            });
                }
                return CompletableFuture.completedFuture(r);
            })
            .thenCompose(Function.identity());
}
Run Code Online (Sandbox Code Playgroud)