Mono vs CompletableFuture

XYZ*_*XYZ 10 java reactive-programming project-reactor completable-future

CompletableFuture在单独的线程上执行任务(使用线程池)并提供回调函数.假设我有一个API调用CompletableFuture.那是一个API调用阻塞吗?线程是否会被阻塞,直到它没有得到API的响应?(我知道主线程/ tomcat线程将是非阻塞的,但是CompletableFuture任务正在执行的线程呢?)

据我所知,单声道完全没有阻挡.

如果我错了,请详细说明并纠正我.

Ole*_*uka 17

CompletableFuture是Async.但它是非阻塞的吗?

关于CompletableFuture的一个原因是它是真正的异步,它允许您从调用程序线程和API异步运行任务,例如thenXXX允许您在结果可用时处理结果.另一方面,CompletableFuture并不总是非阻塞.例如,当您运行以下代码时,它将在默认情况下异步执行ForkJoinPool:

CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {

    }

    return 1;
});
Run Code Online (Sandbox Code Playgroud)

显然,执行任务的Threadin ForkJoinPool将最终被阻止,这意味着我们无法保证该调用将是非阻塞的.

另一方面,CompletableFuture公开API允许您使其真正无阻塞.

例如,您始终可以执行以下操作:

public CompletableFuture myNonBlockingHttpCall(Object someData) {
    var uncompletedFuture = new CompletableFuture(); // creates uncompleted future

    myAsyncHttpClient.execute(someData, (result, exception -> {
        if(exception != null) {
            uncompletedFuture.completeExceptionally(exception);
            return;
        }
        uncompletedFuture.complete(result);
    })

    return uncompletedFuture;
}
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,CompletableFuture未来的API 为您提供了在不需要阻止任何线程的情况下在需要时完成执行的方法completecompleteExceptionally方法.

Mono vs CompletableFuture

在上一节中,我们概述了CF行为,但CompletableFuture和Mono之间的核心区别是什么?

值得一提的是,我们也可以阻止Mono.没有人阻止我们写下以下内容:

Mono.fromCallable(() -> {
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {

    }

    return 1;
})
Run Code Online (Sandbox Code Playgroud)

当然,一旦我们订阅了未来,调用者线程将被阻止.但我们总是可以通过提供额外的subscribeOn运营商来解决这个问题.然而,更广泛的API Mono并不是关键的未来.

为了理解CompletableFuture和之间的主要区别Mono,让我们回到前面提到的myNonBlockingHttpCall方法实现.

public CompletableFuture myUpperLevelBusinessLogic() {
    var future = myNonBlockingHttpCall();

    // ... some code

    if (something) {
       // oh we don't really need anything, let's just throw an exception
       var errorFuture = new CompletableFuture();
       errorFuture.completeExceptionally(new RuntimeException());

       return errorFuture;
    }

   return future;
}
Run Code Online (Sandbox Code Playgroud)

在这种情况下CompletableFuture,一旦调用该方法,它将急切地执行对另一个服务/资源的HTTP调用.即使在验证一些前/后条件后我们确实不需要执行结果,它也会开始执行,并且将为此工作分配额外的CPU/DB-Connections/What-Ever-Machine-Resources.

相反,Mono根据定义,类型是懒惰的:

public Mono myNonBlockingHttpCallWithMono(Object someData) {
    return Mono.create(sink -> {
            myAsyncHttpClient.execute(someData, (result, exception -> {
                if(exception != null) {
                    sink.error(exception);
                    return;
                }
                sink.success(result);
            })
    });
} 

public Mono myUpperLevelBusinessLogic() {
    var mono = myNonBlockingHttpCallWithMono();

    // ... some code

    if (something) {
       // oh we don't really need anything, let's just throw an exception

       return Mono.error(new RuntimeException());
    }

   return mono;
}
Run Code Online (Sandbox Code Playgroud)

在这种情况下,在mono订阅决赛之前不会发生任何事情.因此,只有当方法Mono返回时myNonBlockingHttpCallWithMono,才会订阅,所提供的逻辑Mono.create(Consumer)将被执行.

我们可以走得更远.我们可以让我们的执行更加懒散.您可能知道,从Reactive Streams规范Mono扩展Publisher.Reactive Streams的尖叫未来是背压支持.因此,使用MonoAPI我们只有在真正需要数据时才能执行,而我们的订户已准备好使用它们:

Mono.create(sink -> {
    AtomicBoolean once = new AtomicBoolean();
    sink.onRequest(__ -> {
        if(!once.get() && once.compareAndSet(false, true) {
            myAsyncHttpClient.execute(someData, (result, exception -> {
                if(exception != null) {
                    sink.error(exception);
                    return;
                }
                sink.success(result);
            });
        }
    });
});
Run Code Online (Sandbox Code Playgroud)

在这个例子中,我们只在订阅者调用时才执行数据Subscription#request,因为它声明它已准备好接收数据.

摘要

  • CompletableFuture 是异步的,可以是非阻塞的
  • CompletableFuture很渴望.你不能推迟执行.但你可以取消它们(这比没有好)
  • Mono是异步/非阻塞的,可以Thread通过Mono使用不同的运算符组合主,轻松地执行不同的任何调用.
  • Mono 是真正的懒惰,并允许通过订户存在推迟执行启动并准备消耗数据.

  • @XYZ ForkJoinPool 的底层机制有点聪明,所以在任务数量较多的情况下,它可以在fork中开始做另一项工作而不是阻塞,但是一旦所有工作完成,它就会开始加入任务并被阻塞最终。 (2认同)

小智 6

根据 Oleh 的答案,一个可能的懒惰解决方案CompletableFuture

public CompletableFuture myNonBlockingHttpCall(CompletableFuture<ExecutorService> dispatch, Object someData) {
    var uncompletedFuture = new CompletableFuture(); // creates uncompleted future

    dispatch.thenAccept(x -> x.submit(() -> {
        myAsyncHttpClient.execute(someData, (result, exception -> {
            if(exception != null) {
                uncompletedFuture.completeExceptionally(exception);
                return;
            }
            uncompletedFuture.complete(result);
        })
    }));

    return uncompletedFuture;
}
Run Code Online (Sandbox Code Playgroud)

然后,稍后你只需做

dispatch.complete(executor);
Run Code Online (Sandbox Code Playgroud)

我猜这相当于CompletableFutureMono但没有背压。