下面CompletableFuture示例中join的调用是否会阻塞进程

Gio*_*kis 5 java nonblocking completable-future

我试图理解 CompletableFutures 和返回已完成的 future 的调用链,我创建了下面的示例,它模拟了对数据库的两次调用。

第一个方法应该提供一个包含 userId 列表的完整 future,然后我需要调用另一个提供 userId 的方法来获取用户(在本例中为字符串)。

总结一下:

  1. 获取 id
  2. 获取与这些 id 对应的用户列表。

我创建了简单的方法来模拟睡眠线程的响应。请检查下面的代码

public class PipelineOfTasksExample {

    private Map<Long, String> db = new HashMap<>();

    PipelineOfTasksExample() {
        db.put(1L, "user1");
        db.put(2L, "user2");
        db.put(3L, "user3");
        db.put(4L, "user4");
    }


    private CompletableFuture<List<Long>> returnUserIdsFromDb() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("building the list of Ids" + " - thread: " + Thread.currentThread().getName());
        return CompletableFuture.supplyAsync(() -> Arrays.asList(1L, 2L, 3L, 4L));
    }

    private CompletableFuture<String> fetchById(Long id) {
        CompletableFuture<String> cfId = CompletableFuture.supplyAsync(() -> db.get(id));
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("fetching id: " + id + " -> " + db.get(id) + " thread: " + Thread.currentThread().getName());
        return cfId;
    }

    public static void main(String[] args) {

        PipelineOfTasksExample example = new PipelineOfTasksExample();

        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
                .thenCompose(listOfIds ->
                        CompletableFuture.supplyAsync(
                                () -> listOfIds.parallelStream()
                                        .map(id -> example.fetchById(id).join())
                                        .collect(Collectors.toList()
                                        )
                        )
                );

        System.out.println(result.join());
    }

}

Run Code Online (Sandbox Code Playgroud)

我的问题是, join 调用 ( example.fetchById(id).join()) 是否会破坏进程的非阻塞性质?如果答案是肯定的,我该如何解决这个问题?

先感谢您

Hol*_*ger 10

您的示例有点奇怪,因为您returnUserIdsFromDb()在任何操作开始之前都减慢了主线程的速度,同样,fetchById减慢了调用者而不是异步操作的速度,这违背了异步操作的整个目的。

\n\n

此外,.thenCompose(listOfIds -> CompletableFuture.supplyAsync(() -> \xe2\x80\xa6))您可以简单地使用.thenApplyAsync(listOfIds -> \xe2\x80\xa6).

\n\n

所以一个更好的例子可能是

\n\n
public class PipelineOfTasksExample {\n    private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()\n        .collect(Collectors.toMap(id -> id, id -> "user"+id));\n\n    PipelineOfTasksExample() {}\n\n    private static <T> T slowDown(String op, T result) {\n        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));\n        System.out.println(op + " -> " + result + " thread: "\n            + Thread.currentThread().getName()+ ", "\n            + POOL.getPoolSize() + " threads");\n        return result;\n    }\n    private CompletableFuture<List<Long>> returnUserIdsFromDb() {\n        System.out.println("trigger building the list of Ids - thread: "\n            + Thread.currentThread().getName());\n        return CompletableFuture.supplyAsync(\n            () -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),\n            POOL);\n    }\n    private CompletableFuture<String> fetchById(Long id) {\n        System.out.println("trigger fetching id: " + id + " thread: "\n            + Thread.currentThread().getName());\n        return CompletableFuture.supplyAsync(\n            () -> slowDown("fetching id: " + id , db.get(id)), POOL);\n    }\n\n    static ForkJoinPool POOL = new ForkJoinPool(2);\n\n    public static void main(String[] args) {\n        PipelineOfTasksExample example = new PipelineOfTasksExample();\n        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()\n            .thenApplyAsync(listOfIds ->\n                listOfIds.parallelStream()\n                    .map(id -> example.fetchById(id).join())\n                    .collect(Collectors.toList()\n                ),\n                POOL\n            );\n        System.out.println(result.join());\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

打印类似的东西

\n\n
public class PipelineOfTasksExample {\n    private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()\n        .collect(Collectors.toMap(id -> id, id -> "user"+id));\n\n    PipelineOfTasksExample() {}\n\n    private static <T> T slowDown(String op, T result) {\n        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));\n        System.out.println(op + " -> " + result + " thread: "\n            + Thread.currentThread().getName()+ ", "\n            + POOL.getPoolSize() + " threads");\n        return result;\n    }\n    private CompletableFuture<List<Long>> returnUserIdsFromDb() {\n        System.out.println("trigger building the list of Ids - thread: "\n            + Thread.currentThread().getName());\n        return CompletableFuture.supplyAsync(\n            () -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),\n            POOL);\n    }\n    private CompletableFuture<String> fetchById(Long id) {\n        System.out.println("trigger fetching id: " + id + " thread: "\n            + Thread.currentThread().getName());\n        return CompletableFuture.supplyAsync(\n            () -> slowDown("fetching id: " + id , db.get(id)), POOL);\n    }\n\n    static ForkJoinPool POOL = new ForkJoinPool(2);\n\n    public static void main(String[] args) {\n        PipelineOfTasksExample example = new PipelineOfTasksExample();\n        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()\n            .thenApplyAsync(listOfIds ->\n                listOfIds.parallelStream()\n                    .map(id -> example.fetchById(id).join())\n                    .collect(Collectors.toList()\n                ),\n                POOL\n            );\n        System.out.println(result.join());\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

乍一看,线程数量可能令人惊讶。

\n\n

答案是,join()可能会阻塞线程,但如果这种情况发生在 Fork/Join 池的工作线程内,这种情况将被检测到,并启动一个新的补偿线程,以确保配置的目标并行度。

\n\n

作为一种特殊情况,当使用默认的 Fork/Join 池时,实现可能会在方法内选取新的挂起任务join(),以确保同一线程内的进度。

\n\n

因此,代码总是会取得进展,join()如果替代方案要复杂得多,偶尔调用也没什么问题,但如果过度使用,则存在资源消耗过多的危险。毕竟,使用线程池的原因是为了限制线程数量。

\n\n

另一种方法是在可能的情况下使用链式依赖操作。

\n\n
public class PipelineOfTasksExample {\n    private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()\n        .collect(Collectors.toMap(id -> id, id -> "user"+id));\n\n    PipelineOfTasksExample() {}\n\n    private static <T> T slowDown(String op, T result) {\n        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));\n        System.out.println(op + " -> " + result + " thread: "\n            + Thread.currentThread().getName()+ ", "\n            + POOL.getPoolSize() + " threads");\n        return result;\n    }\n    private CompletableFuture<List<Long>> returnUserIdsFromDb() {\n        System.out.println("trigger building the list of Ids - thread: "\n            + Thread.currentThread().getName());\n        return CompletableFuture.supplyAsync(\n            () -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),\n            POOL);\n    }\n    private CompletableFuture<String> fetchById(Long id) {\n        System.out.println("trigger fetching id: " + id + " thread: "\n            + Thread.currentThread().getName());\n        return CompletableFuture.supplyAsync(\n            () -> slowDown("fetching id: " + id , db.get(id)), POOL);\n    }\n\n    static ForkJoinPool POOL = new ForkJoinPool(2);\n\n    public static void main(String[] args) {\n        PipelineOfTasksExample example = new PipelineOfTasksExample();\n\n        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()\n            .thenComposeAsync(listOfIds -> {\n                List<CompletableFuture<String>> jobs = listOfIds.parallelStream()\n                    .map(id -> example.fetchById(id))\n                    .collect(Collectors.toList());\n                return CompletableFuture.allOf(jobs.toArray(new CompletableFuture<?>[0]))\n                    .thenApply(_void -> jobs.stream()\n                        .map(CompletableFuture::join).collect(Collectors.toList()));\n                },\n                POOL\n            );\n\n        System.out.println(result.join());\n        System.out.println(ForkJoinPool.commonPool().getPoolSize());\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

不同之处在于,首先提交所有异步作业,然后join调度调用它们的依赖操作,仅在所有作业完成时执行,因此这些join调用永远不会阻塞。join只有方法末尾的最终调用main可能会阻塞主线程。

\n\n

所以这会打印出类似的东西

\n\n
trigger building the list of Ids - thread: main\nbuilding the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads\ntrigger fetching id: 2 thread: ForkJoinPool-1-worker-0\ntrigger fetching id: 3 thread: ForkJoinPool-1-worker-1\ntrigger fetching id: 4 thread: ForkJoinPool-1-worker-2\nfetching id: 4 -> user4 thread: ForkJoinPool-1-worker-3, 4 threads\nfetching id: 2 -> user2 thread: ForkJoinPool-1-worker-3, 4 threads\nfetching id: 3 -> user3 thread: ForkJoinPool-1-worker-2, 4 threads\ntrigger fetching id: 1 thread: ForkJoinPool-1-worker-3\nfetching id: 1 -> user1 thread: ForkJoinPool-1-worker-2, 4 threads\n[user1, user2, user3, user4]\n
Run Code Online (Sandbox Code Playgroud)\n\n

显示无需创建补偿线程,因此线程数与配置的目标并行度匹配。

\n\n

请注意,如果实际工作是在后台线程中完成的,而不是在fetchById方法本身内完成的,那么您现在不再需要并行流,因为没有阻塞调用join()。对于这样的场景,仅仅使用stream()通常会带来更高的性能。

\n

  • @ThomasLang,当您在有限线程池(除“ForkJoinPool”之外)的工作线程中使用“join()”时,您确实可以使其完全停止。换句话说,与其他执行器一起使用 join() 比与 ForkJoinPool 一起使用更危险。解决方案是使用虚拟线程,但我想,像我们大多数人一样,您还没有可能在生产代码中使用 Java 21…… (2认同)
  • 是的,这甚至是默认设置;当您不指定执行程序时,[公共池](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/ForkJoinPool.html#将使用 commonPool())。但您也可以指定自己的“ForkJoinPool”作为执行器。 (2认同)
  • @ThomasLang a)仅当您自己的执行程序不是“ForkJoinPool”的实例并且您使用大量“join()”或“get()”调用时。但请记住,这没什么特别的。该行为类似于在线程池中进行大量阻塞调用的任何场景。实际的区别在于,如果在“ForkJoinPool”的工作线程中阻塞调用“CompletableFuture”,则有一个解决方法 b)如上所述,在线程池中进行大量阻塞调用是一个普遍问题可能导致饥饿。 (2认同)