ruh*_*hul 6 java reactor reactive-programming apache-commons-httpclient
从Mono#block() 的官方文档中可以看出:
订阅此 Mono 并无限期阻塞,直到收到下一个信号。返回该值,如果 Mono 完成为空,则返回 null。如果 Mono 出错,则会抛出原始异常(如果是受检查的异常,则将其包装在 RuntimeException 中)。
因此可以肯定 block() 方法是阻塞的,并且在解决之前它不会执行下一行block()。
但我的困惑是,当我使用时,toFuture()期望它是非阻塞的,但它的行为与block方法完全相同。在Mono#toFuture() 的文档中指出:
将此 Mono 转换为 CompletableFuture 在 onNext 或 onComplete 上完成并在 onError 上失败。
不太清楚。本文档中没有任何地方说Mono#toFuture()阻塞。
toFuture()方法是阻塞还是非阻塞?CompletableFuture?更新:添加了代码片段
使用Mono.block()方法:
long time = System.currentTimeMillis();
String block = Mono.fromCallable(() -> {
logger.debug("inside in fromCallable() block()");
//Upstream httpcall with apache httpClient().
// which takes atleast 1sec to complete.
return "Http response as string";
}).block();
logger.info("total time needed {}", (System.currentTimeMillis()-time));
return CompletableFuture.completedFuture(block);
Run Code Online (Sandbox Code Playgroud)
使用Mono.ToFuture()方法:
long time = System.currentTimeMillis();
CompletableFuture<String> toFuture = Mono.fromCallable(() -> {
logger.debug("inside in fromCallable() block()");
//Upstream httpcall with apache httpClient().
// which takes atleast 1sec to complete.
return "Http response as string";
}).toFuture();
logger.info("total time needed {}", (System.currentTimeMillis()-time));
return toFuture;
Run Code Online (Sandbox Code Playgroud)
这两个代码片段的行为完全相同。
——编辑:我错了。mono.toFuture() 不会阻塞——
mono.toFuture() 不阻塞。看看这个测试:
@Test
void testMonoToFuture() throws ExecutionException, InterruptedException {
System.out.println(LocalTime.now() + ": start");
Mono<String> mono = Mono.just("hello StackOverflow")
.delayElement(Duration.ofMillis(500))
.doOnNext((s) -> System.out.println(LocalTime.now() + ": mono completed"));
Future<String> future = mono.toFuture();
System.out.println(LocalTime.now() + ": future created");
String result = future.get();
System.out.println(LocalTime.now() + ": future completed");
assertThat(result).isEqualTo("hello StackOverflow");
}
Run Code Online (Sandbox Code Playgroud)
这是结果:
20:18:49.557: start
20:18:49.575: future created
20:18:50.088: mono completed
20:18:50.088: future completed
Run Code Online (Sandbox Code Playgroud)
未来几乎立刻就被创造出来了。半秒后,单声道完成,紧接着,未来完成。这正是我所期望发生的事情。
那么为什么单声道在问题中提供的示例中看起来会阻塞呢?这是因为 mono.fromCallable() 的工作方式。该可调用实际运行的时间和地点?mono.fromCallable() 不会产生额外的线程来完成这项工作。从我的测试来看,当您第一次在单声道上调用 subscribe() 或 block() 或类似的东西时,可调用似乎会运行,并且它将在执行此操作的线程中运行。
这是一个测试,表明如果您使用 fromCallable() 创建一个 mono, subscribe 将导致 callable 在主线程中执行,甚至 subscribe() 方法也会看起来阻塞。
@Test
void testMonoToFuture() throws ExecutionException, InterruptedException {
System.out.println(LocalTime.now() + ": start");
System.out.println("main thread: " + Thread.currentThread().getName());
Mono<String> mono = Mono.fromCallable(() -> {
System.out.println("callabel running in thread: " + Thread.currentThread().getName());
Thread.sleep(1000);
return "Hello StackOverflow";
})
.doOnNext((s) -> System.out.println(LocalTime.now() + ": mono completed"));
System.out.println("before subscribe");
mono.subscribe(System.out::println);
System.out.println(LocalTime.now() + ": after subscribe");
}
Run Code Online (Sandbox Code Playgroud)
结果:
20:53:37.071: start
main thread: main
before subscribe
callabel running in thread: main
20:53:38.099: mono completed
Hello StackOverflow
20:53:38.100: after subscribe
Run Code Online (Sandbox Code Playgroud)
结论: mono.toFuture() 并不比 mono.subscribe() 更阻塞。如果您想异步执行某些代码,则不应使用 Mono.fromCallable()。您可以考虑使用 Executors.newSingleThreadExecutor().submit(someCallable)
作为参考,这是我最初的(错误的)答案,我贬低了 mono.block() 方法,该方法肯定是由比我更了解 Java 和编码的人编写的。我想这是关于谦卑的个人教训。
下面的一切都是废话
我想验证它到底是如何工作的,所以我编写了一些测试。不幸的是,事实证明 mono.toFuture() 确实是阻塞的,并且结果是同步计算的。老实说,我不知道你为什么会使用这个功能。Future 的全部意义在于保存异步评估的结果。
这是我的测试:
@Test
void testMonoToFuture() throws ExecutionException, InterruptedException {
Mono<Integer> mono = Mono.fromCallable(() -> {
System.out.println("start mono");
Thread.sleep(1000);
System.out.println("mono completed");
return 0;
});
Future<Integer> future = mono.toFuture();
System.out.println("future created");
future.get();
System.out.println("future completed");
}
Run Code Online (Sandbox Code Playgroud)
结果:
start mono
mono completed
future created
future completed
Run Code Online (Sandbox Code Playgroud)
这是 monoToFuture() 的实现,它按照我期望的方式工作:
@Test
void testMonoToFuture() throws ExecutionException, InterruptedException {
Mono<Integer> mono = Mono.fromCallable(() -> {
System.out.println("start mono");
Thread.sleep(1000);
System.out.println("mono completed");
return 0;
});
Future<Integer> future = monoToFuture(mono, Executors.newSingleThreadExecutor());
System.out.println("future created");
future.get();
System.out.println("future completed");
}
private <T> Future<T> monoToFuture(Mono<T> mono, ExecutorService executorService){
return executorService.submit((Callable<T>) mono::block);
}
Run Code Online (Sandbox Code Playgroud)
结果:
future created
start mono
mono completed
future completed
Run Code Online (Sandbox Code Playgroud)
TL;DR
Mono.toFuture()不是阻塞,而是Mono.toFuture().get()阻塞。block()从技术上讲是相同的toFuture().get()并且都是阻塞的。
Mono.toFuture()只需订阅它并立即解决即可转换Mono为 a 。CompletableFuture但这并不意味着您可以在此之后访问String相应的结果(在您的情况下)Mono。CompletableFuture仍然是异步的,您可以使用thenApply(), thenCompose(), thenCombine(), ... 等方法继续异步处理。
CompletableFuture<Double> result = getUserDetail(userId)
.toFuture()
.thenCompose(user -> getCreditRating(user));
Run Code Online (Sandbox Code Playgroud)
其中getUserDetail定义为
Mono<User> getUserDetail(String userId);
Run Code Online (Sandbox Code Playgroud)
Mono.toFuture当您需要组合不同的异步 API 时非常有用。例如,AWS Java v2 API 是异步的,但基于CompletableFuture但我们可以使用Mono.toFuture或组合 API Mono.fromFuture。
| 归档时间: |
|
| 查看次数: |
9759 次 |
| 最近记录: |