Java 多线程中的 Thread、Runnable 和 CompletableFuture

Jac*_*ack 4 java spring multithreading process completable-future

我正在尝试在我的 Spring Boot 应用程序中实现多线程。我只是 Java 多线程的初学者,在进行了一些搜索并阅读了各个页面上的文章后,我需要澄清以下几点。所以;

  1. 据我所知,我可以使用Thread,RunnableCompletableFuture来在 Java 应用程序中实现多线程。CompletableFuture似乎是一种更新、更清洁的方式,但Thread可能有更多优点。那么,我应该根据场景坚持使用CompletableFuture还是全部使用?

  2. 基本上我想使用以下方法向同一个服务方法发送 2 个并发请求CompletableFuture

    CompletableFuture<Integer> future1 = fetchAsync(1);
    CompletableFuture<Integer> future2 = fetchAsync(2);
    
    Integer result1 = future1.get();
    Integer result2 = future2.get();
    
    Run Code Online (Sandbox Code Playgroud)

如何同时发送这些请求,然后根据以下条件返回结果:

  • 如果第一个结果不为空,则返回结果并停止进程
  • 如果第一个结果为空,则返回第二个结果并停止进程

我怎样才能做到这一点?我应该用CompletableFuture.anyOf()它吗?

Hol*_*ger 5

CompletableFutureExecutor是一个建立在/抽象之上的工具ExecutorService,它具有处理 和Runnable的实现Thread。您通常没有理由手动处理Thread创建。如果您发现CompletableFuture不适合特定任务,您可以首先尝试其他工具/抽象。

\n

如果您想继续处理第一个(更快的)非\xe2\x80\x91null 结果,您可以使用类似

\n
CompletableFuture<Integer> future1 = fetchAsync(1);\nCompletableFuture<Integer> future2 = fetchAsync(2);\n\nInteger result = CompletableFuture.anyOf(future1, future2)\n    .thenCompose(i -> i != null?\n        CompletableFuture.completedFuture((Integer)i):\n        future1.thenCombine(future2, (a, b) -> a != null? a: b))\n    .join();\n
Run Code Online (Sandbox Code Playgroud)\n

anyOf允许您继续处理第一个结果,但无论其实际值如何。thenCombine因此,要使用第一个非\xe2\x80\x91null 结果,我们需要链接另一个操作,如果第一个结果是,则将诉诸另一个操作null。只有当两个 future 都完成时才会完成,但此时我们已经知道更快的结果是null并且需要第二个。null当两个结果都是时,整体代码仍然会产生null.

\n

请注意,anyOf接受任意类型的 future 并生成CompletableFuture<Object>. 因此,i是类型Object和类型转换。具有完整类型安全性的替代方案是

\n
CompletableFuture<Integer> future1 = fetchAsync(1);\nCompletableFuture<Integer> future2 = fetchAsync(2);\n\nInteger result = future1.applyToEither(future2, Function.identity())\n    .thenCompose(i -> i != null?\n        CompletableFuture.completedFuture(i):\n        future1.thenCombine(future2, (a, b) -> a != null? a: b))\n    .join();\n
Run Code Online (Sandbox Code Playgroud)\n

这要求我们指定一个我们不需要的函数,因此这段代码求助于Function.identity(). 您也可以仅使用i -> i来表示恒等函数;that\xe2\x80\x99s主要是一种风格选择

\n
\n

请注意,大多数复杂性源于设计,该设计试图通过始终链接要在前一阶段完成时执行的依赖操作来避免阻塞线程。上面的例子最终都遵循这个原则join()调用只是为了演示目的;如果调用者期望未来而不是被阻止,您可以轻松删除它并返回未来。

\n

如果您要执行最后的阻止join(),因为你需要立即得到结果值,你也可以使用

\n
Integer result = future1.applyToEither(future2, Function.identity()).join();\nif(result == null) {\n    Integer a = future1.join(), b = future2.join();\n    result = a != null? a: b;\n}\n
Run Code Online (Sandbox Code Playgroud)\n

这可能更容易阅读和调试。这种易用性是即将推出的虚拟线程功能背后的动机。当操作在虚拟线程上运行时,您不需要避免阻塞调用。因此,有了这个功能,如果您仍然需要返回一个CompletableFuture而不阻塞调用者线程,您可以使用

\n
CompletableFuture<Integer> resultFuture = future1.applyToEitherAsync(future2, r-> {\n    if(r != null) return r;\n    Integer a = future1.join(), b = future2.join();\n    return a != null? a: b;\n}, Executors.newVirtualThreadPerTaskExecutor());\n
Run Code Online (Sandbox Code Playgroud)\n

通过为依赖操作请求虚拟线程,我们可以join()毫不犹豫地在函数内使用阻塞调用,这使得代码更简单,事实上,类似于之前的非异步变体。

\n
\n

在所有情况下,如果代码为 non\xe2\x80\x91null,则代码将提供更快的结果,而无需等待第二个 future 的完成。但这并不能阻止对不必要的未来的评估。CompletableFuture根本不支持停止已经正在进行的评估。您可以调用cancel(\xe2\x80\xa6)它,但这只会将未来的完成状态(结果)设置为 \xe2\x80\x9c 异常完成时使用CancellationException\xe2\x80\x9d

\n

因此,无论您cancel是否致电,已经进行的评估都将在后台继续进行,只有其最终结果将被忽略。

\n

这对于某些操作来说可能是可以接受的。如果没有,您将不得不显着改变实施fetchAsync。您可以直接使用ExecutorServicesubmit操作来获取Future支持取消中断的操作。

\n

但也要求操作\xe2\x80\x99s代码对中断敏感,才能有实际效果:

\n
    \n
  • 调用阻塞操作时,请使用那些可能中止并抛出异常InterruptedException且不会捕获并继续的方法。

    \n
  • \n
  • 当执行长时间运行的计算密集型任务时,Thread.interrupted()偶尔进行轮询并在 时退出true

    \n
  • \n
\n