CompletableFuture 是否保证运行新线程?

Dar*_*ius 6 java multithreading thread-safety completable-future

给定一些任意的上下文(例如 Junit 单元测试,而不是特定的,不一定是“主”线程)。

像这样的代码必须引入至少 2 个线程吗?

public static void main (String[] args)
{
    CompletableFuture<Void> s = new CompletableFuture<>();
    CompletableFuture<Void> f = new CompletableFuture<>();
    
    CompletableFuture<Void> someContext =  CompletableFuture.supplyAsync(() ->
    {
        try{    
          System.out.println(Thread.currentThread().getId());
          CompletableFuture<String> update =
          CompletableFuture.supplyAsync(
            () -> {
              String ans = null;
              try {
                System.out.println(Thread.currentThread().getId());
                ans = "Hello";
              } catch (Exception e) {
                ans = e.toString();
              } finally {
                s.complete(null);
                return ans;
              }
            });
          s.get();
          System.out.println(s.isDone());
        } catch (Exception e) {
            System.out.println("Some error");
            return null;
        }
        return null;
    });
    
    System.out.println(f.isDone());
}
Run Code Online (Sandbox Code Playgroud)

当我们进入时s.get()someContextJVM 能否检测到它正在等待 -> 上下文切换以update完成它,然后切换回someContext?

ideone 中运行它时,它始终在两个不同的线程中运行它们,但这只是单一观察。

我想了解语言/运行时提供的保证。

dan*_*1st 5

不,这种保证不存在。

你所做的事情是不安全的。

如果您查看以下文档CompletableFuture.supplyAsync(Supplier)

返回一个新的 CompletableFuture,它由在 中运行的任务异步完成,ForkJoinPool.commonPool()其值是通过调用给定的供应商获得的。

你可以看到它使用了ForkJoinPool从 获得的ForkJoinPool.commonPool()

来自以下文档ForkJoinPool

池中的所有线程都尝试查找并执行提交给池的任务和/或由其他活动任务创建的任务(如果不存在,则最终阻塞等待工作)。当大多数任务产生其他子任务(就像大多数 ForkJoinTasks 一样),以及当许多小任务从外部客户端提交到池时,这可以实现高效的处理。特别是在构造函数中将 asyncMode 设置为 true 时,ForkJoinPools 也可能适合与从未加入的事件样式任务一起使用。所有工作线程均通过 Thread.isDaemon() 设置为 true 进行初始化。静态 commonPool() 可用且适用于大多数应用程序。公共池由任何未显式提交到指定池的 ForkJoinTask 使用。使用公共池通常会减少资源使用(其线程在不使用期间会慢慢回收,并在后续使用时恢复)。对于需要单独或自定义池的应用程序,可以使用给定的目标并行度级别构建 ForkJoinPool;默认情况下,等于可用处理器的数量

这意味着提交的任务可以在任意数量的线程(默认情况下处理器的数量)中执行,并且这些线程被重复使用。如果所有这些线程都忙,则执行可能会等待先前的执行完成。

由于公共池也可能被应用程序的其他部分使用,因此提交的任务应该很快运行并且不应该阻塞,以便其他任务可以快速执行。

虽然OpenJDK 对in进行了特殊处理ForkJoinPoolCompletableFuture#get,以确保在此期间可以执行其他任务,但其他 JDK 可能不提供此功能。

替代方案:异步处理

.get()您可能想要使用诸如 之类的方法,而不是阻止使用CompletableFuture#thenAcceptAsync(Consumer)。这将Consumer在 future 完成后运行。

此外,您还可以使用CompletionStage#exceptionally异步方式处理异常。

public static void main (String[] args) throws java.lang.Exception
{
    CompletableFuture<Void> s = new CompletableFuture();
    CompletableFuture<Void> f = new CompletableFuture();
    
    CompletableFuture<Void> someContext =  CompletableFuture.supplyAsync(() ->
    {
        
          System.out.println(Thread.currentThread().getId());
          CompletableFuture<String> update =
          CompletableFuture.supplyAsync(
            () -> {
              String ans = null;
              try {
                System.out.println(Thread.currentThread().getId());
                ans = "Hello";
              } catch (Exception e) {
                ans = e.toString();
              } finally {
                s.complete(null);
                return ans;
              }
            });
          s thenSupplyAsync(result->{
              System.out.println(s.isDone());
          }).exceptionally(e->{
              System.out.println("Some error");
              return null;
          });
        
        return null;
    });
    
    System.out.println(f.isDone());
}
Run Code Online (Sandbox Code Playgroud)