“CompletionHandler”和“CompletableFuture”是 Java 异步编程的两种不同方法吗?

Tim*_*Tim 3 java asynchronous future callback

简而言之,Java 中

\n\n
\n

异步 I/O 的回调样式基于 a CompletionHandler,\n 定义了两个方法 completed()failed(),\n 操作成功或失败时将被回调。\n 如果您想要立即通知事件,则此样式非常有用以异步 I/O\xe2\x80\x94 为例,如果有大量 I/O 操作正在进行,但任何单个操作的失败并不一定是致命的。

\n
\n\n

来自http://www.deadcoderising.com/java8-writing-asynchronous-code-with-completablefuture/

\n\n
\n

除了实现Future接口之外,CompletableFuture还要实现接口CompletionStage

\n\n

ACompletionStage是一个承诺。它承诺计算最终将会完成。

\n\n

它的伟大之处CompletionStage在于它提供了大量的方法选择,允许您附加将在完成时执行的回调。

\n\n

这样我们就可以以非阻塞的方式构建系统。

\n
\n\n

CompletionHandlerCompletableFuture都可用于指定回调处理程序。

\n\n

它们有什么关系和区别?

\n\n

Java 中的异步编程有两种不同的方法CompletionHandler吗?CompletableFuture

\n\n

或者它们一起使用?

\n\n

谢谢。

\n

ace*_*ent 6

CompletionHandler<V, A>是NIO异步通道的完成接口。

它是在 Java 7 中引入的,几年前 Java 8 引入了 lambda 表达式并将其转换为函数式接口(具有单个抽象方法的接口),因此它有两个方法 和 ,而completed(V result, A attachment)不是failed(Throwable ex, A attachment)(现在)更舒适的单个方法。

CompletableFuture<T>是接口的实现CompletionStage<T>

它是在 Java 8 中引入的,因此它构建 future 的静态方法和构建 Continuation 的实例方法都使用函数式接口,您可以轻松地使用 lambda 表达式。

您可以包装每个 NIO 异步调用以使用CompletableFuture<T>

import java.lang.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;

public class AsynchronousCompletionHandler<T> implements CompletionHandler<T, CompletableFuture<T>> {
    public void completed(T result, CompletableFuture<T> attachment) {
        attachment.complete(result);
    }

    public void failed(Throwable ex, CompletableFuture<T> attachment) {
        attachment.completeExceptionally(ex);
    }

    private static final ConcurrentHashMap<Class<?>, AsynchronousCompletionHandler<?>> cache = new ConcurrentHashMap<>();

    static <T> AsynchronousCompletionHandler<T> getInstance(Class<T> clazz) {
        @SuppressWarnings("unchecked")
        AsynchronousCompletionHandler<T> handler = (AsynchronousCompletionHandler<T>)cache.computeIfAbsent(clazz, c -> new AsynchronousCompletionHandler<T>());
        return handler;
    }

    //
    // AsynchronousByteChannel
    public static CompletableFuture<Integer> readAsync(AsynchronousByteChannel channel, ByteBuffer dst) {
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        channel.read(dst, completableFuture, getInstance(Integer.class));
        return completableFuture;
    }

    public static CompletableFuture<Integer> writeAsync(AsynchronousByteChannel channel, ByteBuffer src) {
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        channel.write(src, completableFuture, getInstance(Integer.class));
        return completableFuture;
    }

    //
    // AsynchronousFileChannel
    public static CompletableFuture<FileLock> lockAsync(AsynchronousFileChannel channel) {
        CompletableFuture<FileLock> completableFuture = new CompletableFuture<>();
        channel.lock(completableFuture, getInstance(FileLock.class));
        return completableFuture;
    }

    public static CompletableFuture<FileLock> lockAsync(AsynchronousFileChannel channel, long position, long size, boolean shared) {
        CompletableFuture<FileLock> completableFuture = new CompletableFuture<>();
        channel.lock(position, size, shared, completableFuture, getInstance(FileLock.class));
        return completableFuture;
    }

    public static CompletableFuture<Integer> readAsync(AsynchronousFileChannel channel, ByteBuffer dst, long position) {
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        channel.read(dst, position, completableFuture, getInstance(Integer.class));
        return completableFuture;
    }

    public static CompletableFuture<Integer> writeAsync(AsynchronousFileChannel channel, ByteBuffer src, long position) {
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        channel.write(src, position, completableFuture, getInstance(Integer.class));
        return completableFuture;
    }

    //
    // AsynchronousServerSocketChannel
    public static CompletableFuture<AsynchronousSocketChannel> acceptAsync(AsynchronousServerSocketChannel channel) {
        CompletableFuture<AsynchronousSocketChannel> completableFuture = new CompletableFuture<>();
        channel.accept(completableFuture, getInstance(AsynchronousSocketChannel.class));
        return completableFuture;
    }

    //
    // AsynchronousSocketChannel
    public static CompletableFuture<Void> connectAsync(AsynchronousSocketChannel channel, SocketAddress remote) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        channel.connect(remote, completableFuture, getInstance(Void.class));
        return completableFuture;
    }

    public static CompletableFuture<Long> readAsync(AsynchronousSocketChannel channel, ByteBuffer[] dsts, int offset, int length, long timeout, TimeUnit unit) {
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        channel.read(dsts, offset, length, timeout, unit, completableFuture, getInstance(Long.class));
        return completableFuture;
    }

    public static CompletableFuture<Integer> readAsync(AsynchronousSocketChannel channel, ByteBuffer dst) {
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        channel.read(dst, completableFuture, getInstance(Integer.class));
        return completableFuture;
    } 

    public static CompletableFuture<Integer> readAsync(AsynchronousSocketChannel channel, ByteBuffer dst, long timeout, TimeUnit unit) {
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        channel.read(dst, timeout, unit, completableFuture, getInstance(Integer.class));
        return completableFuture;
    }

    public static CompletableFuture<Long> writeAsync(AsynchronousSocketChannel channel, ByteBuffer[] srcs, int offset, int length, long timeout, TimeUnit unit) {
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        channel.write(srcs, offset, length, timeout, unit, completableFuture, getInstance(Long.class));
        return completableFuture;
    }

    public static CompletableFuture<Integer> writeAsync(AsynchronousSocketChannel channel, ByteBuffer src) {
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        channel.write(src, completableFuture, getInstance(Integer.class));
        return completableFuture;
    } 

    public static CompletableFuture<Integer> writeAsync(AsynchronousSocketChannel channel, ByteBuffer src, long timeout, TimeUnit unit) {
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        channel.write(src, timeout, unit, completableFuture, getInstance(Integer.class));
        return completableFuture;
    }
}
Run Code Online (Sandbox Code Playgroud)

示例用法(仅用于说明;没有错误处理,没有资源处置):

import static AsynchronousCompletionHandler;

AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(5000));
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
acceptAsync(serverChannel)
    .thenCompose(clientChannel -> readAsync(clientChannel, buffer))
    .thenAccept(readBytes -> System.out.format("read %d bytes from client%n", readBytes));
Run Code Online (Sandbox Code Playgroud)