Mono.toFuture() 是阻塞的吗?

ruh*_*hul 6 java reactor reactive-programming apache-commons-httpclient

Mono#block() 的官方文档中可以看出:

订阅此 Mono 并无限期阻塞,直到收到下一个信号。返回该值,如果 Mono 完成为空,则返回 null。如果 Mono 出错,则会抛出原始异常(如果是受检查的异常,则将其包装在 RuntimeException 中)。

因此可以肯定 block() 方法是阻塞的,并且在解决之前它不会执行下一行block()

但我的困惑是,当我使用时,toFuture()期望它是非阻塞的,但它的行为与block方法完全相同。在Mono#toFuture() 的文档中指出:

将此 Mono 转换为 CompletableFuture 在 onNext 或 onComplete 上完成并在 onError 上失败。

单声道#toFuture()

不太清楚。本文档中没有任何地方说Mono#toFuture()阻塞

  1. 请确认我的toFuture()方法是阻塞还是非阻塞?
  2. 另外,如果它是非阻塞的,那么哪个线程将负责执行里面的代码CompletableFuture

更新:添加了代码片段

使用Mono.block()方法:

    long time = System.currentTimeMillis();
    String block = Mono.fromCallable(() -> {
        logger.debug("inside in fromCallable() block()");
        //Upstream httpcall with apache httpClient().
        // which takes atleast 1sec to complete.
        return "Http response as string";
    }).block();
    logger.info("total time needed {}", (System.currentTimeMillis()-time));

    return CompletableFuture.completedFuture(block);
Run Code Online (Sandbox Code Playgroud)

使用Mono.ToFuture()方法:

    long time = System.currentTimeMillis();
    CompletableFuture<String> toFuture = Mono.fromCallable(() -> {
        logger.debug("inside in fromCallable() block()");
        //Upstream httpcall with apache httpClient().
        // which takes atleast 1sec to complete.
        return "Http response as string";
    }).toFuture();
    logger.info("total time needed {}", (System.currentTimeMillis()-time));
    return toFuture;
Run Code Online (Sandbox Code Playgroud)

这两个代码片段的行为完全相同。

Joo*_*gts 5

——编辑:我错了。mono.toFuture() 不会阻塞——

mono.toFuture() 不阻塞。看看这个测试:

    @Test
    void testMonoToFuture() throws ExecutionException, InterruptedException {
        System.out.println(LocalTime.now() + ": start");
        Mono<String> mono = Mono.just("hello StackOverflow")
            .delayElement(Duration.ofMillis(500))
            .doOnNext((s) -> System.out.println(LocalTime.now() + ": mono completed"));
        Future<String> future = mono.toFuture();
        System.out.println(LocalTime.now() + ": future created");
        String result = future.get();
        System.out.println(LocalTime.now() + ": future completed");
        assertThat(result).isEqualTo("hello StackOverflow");
    }
Run Code Online (Sandbox Code Playgroud)

这是结果:

20:18:49.557: start
20:18:49.575: future created
20:18:50.088: mono completed
20:18:50.088: future completed
Run Code Online (Sandbox Code Playgroud)

未来几乎立刻就被创造出来了。半秒后,单声道完成,紧接着,未来完成。这正是我所期望发生的事情。

那么为什么单声道在问题中提供的示例中看起来会阻塞呢?这是因为 mono.fromCallable() 的工作方式。该可调用实际运行的时间和地点?mono.fromCallable() 不会产生额外的线程来完成这项工作。从我的测试来看,当您第一次在单声道上调用 subscribe() 或 block() 或类似的东西时,可调用似乎会运行,并且它将在执行此操作的线程中运行。

这是一个测试,表明如果您使用 fromCallable() 创建一个 mono, subscribe 将导致 callable 在主线程中执行,甚至 subscribe() 方法也会看起来阻塞。

    @Test
    void testMonoToFuture() throws ExecutionException, InterruptedException {
        System.out.println(LocalTime.now() + ": start");
        System.out.println("main thread: " + Thread.currentThread().getName());
        Mono<String> mono = Mono.fromCallable(() -> {
                System.out.println("callabel running in thread: " + Thread.currentThread().getName());
            Thread.sleep(1000);
            return "Hello StackOverflow";
            })
            .doOnNext((s) -> System.out.println(LocalTime.now() + ": mono completed"));
        System.out.println("before subscribe");
        mono.subscribe(System.out::println);
        System.out.println(LocalTime.now() + ": after subscribe");
    }
Run Code Online (Sandbox Code Playgroud)

结果:

20:53:37.071: start
main thread: main
before subscribe
callabel running in thread: main
20:53:38.099: mono completed
Hello StackOverflow
20:53:38.100: after subscribe
Run Code Online (Sandbox Code Playgroud)

结论: mono.toFuture() 并不比 mono.subscribe() 更阻塞。如果您想异步执行某些代码,则不应使用 Mono.fromCallable()。您可以考虑使用 Executors.newSingleThreadExecutor().submit(someCallable)

作为参考,这是我最初的(错误的)答案,我贬低了 mono.block() 方法,该方法肯定是由比我更了解 Java 和编码的人编写的。我想这是关于谦卑的个人教训。

下面的一切都是废话

我想验证它到底是如何工作的,所以我编写了一些测试。不幸的是,事实证明 mono.toFuture() 确实是阻塞的,并且结果是同步计算的。老实说,我不知道你为什么会使用这个功能。Future 的全部意义在于保存异步评估的结果。

这是我的测试:

@Test
  void testMonoToFuture() throws ExecutionException, InterruptedException {
    Mono<Integer> mono = Mono.fromCallable(() -> {
      System.out.println("start mono");
      Thread.sleep(1000);
      System.out.println("mono completed");
      return 0;
    });
    Future<Integer> future = mono.toFuture();
    System.out.println("future created");
    future.get();
    System.out.println("future completed");
  }
Run Code Online (Sandbox Code Playgroud)

结果:

start mono
mono completed
future created
future completed
Run Code Online (Sandbox Code Playgroud)

这是 monoToFuture() 的实现,它按照我期望的方式工作:

@Test
  void testMonoToFuture() throws ExecutionException, InterruptedException {
    Mono<Integer> mono = Mono.fromCallable(() -> {
      System.out.println("start mono");
      Thread.sleep(1000);
      System.out.println("mono completed");
      return 0;
    });
    Future<Integer> future = monoToFuture(mono, Executors.newSingleThreadExecutor());
    System.out.println("future created");
    future.get();
    System.out.println("future completed");
  }

  private <T> Future<T> monoToFuture(Mono<T> mono, ExecutorService executorService){
    return executorService.submit((Callable<T>) mono::block);
  }
Run Code Online (Sandbox Code Playgroud)

结果:

future created
start mono
mono completed
future completed
Run Code Online (Sandbox Code Playgroud)


Ale*_*lex 3

TL;DR Mono.toFuture()不是阻塞,而是Mono.toFuture().get()阻塞。block()从技术上讲是相同的toFuture().get()并且都是阻塞的。

Mono.toFuture()只需订阅它并立即解决即可转换Mono为 a 。CompletableFuture但这并不意味着您可以在此之后访问String相应的结果(在您的情况下)MonoCompletableFuture仍然是异步的,您可以使用thenApply(), thenCompose(), thenCombine(), ... 等方法继续异步处理。

CompletableFuture<Double> result = getUserDetail(userId)
    .toFuture()
    .thenCompose(user -> getCreditRating(user));
Run Code Online (Sandbox Code Playgroud)

其中getUserDetail定义为

Mono<User> getUserDetail(String userId);
Run Code Online (Sandbox Code Playgroud)

Mono.toFuture当您需要组合不同的异步 API 时非常有用。例如,AWS Java v2 API 是异步的,但基于CompletableFuture但我们可以使用Mono.toFuture或组合 API Mono.fromFuture