CompletableFuture 从内部注入

xma*_*s79 3 java concurrency asynchronous future java-8

是否可以从内部在链中注入a ?CompletableFutureCompletableFuture

我正在使用这样的函数:

public CompletableFuture<Boolean> getFutureOfMyLongRunningTask() {
    CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
        // ... Some processing here ...
        if (somecondition failed)
            return false; // Task failed!

        return true; // OK
    }).thenApplyAsync((Boolean result) -> {
        if (!result) // check of previous stage fail
            return false;

        // ... Some processing here ...

        if (!some condition satisfied) {
            // This is where I want the injection to happen. 
            // This stage should be suspended and a new stage should be injected between this point and the next stage.
        }

        return true; // OK
    }).thenApplyAsync((Boolean result) -> {
        if (!result) // check of previous stage fail
            return false;

        // ... Some processing here ...

        return true; // OK
    });

    // This is the result we have to wait for.
    return future;
}
Run Code Online (Sandbox Code Playgroud)

在注入点if (!some condition satisfied),我想运行一个查询(比如)需要 5 秒来执行和检索最后阶段所需的一些数据。我不想阻塞线程 5 秒,例如在 . 内部使查询同步if,我希望它异步运行,当结果返回时直接进入下一阶段。我遇到的问题是条件仅内部已知。

有人对此有想法吗?


编辑

我会尽力澄清这个问题。我本来一条唯一的代码。现在我正在尝试优化代码,以便生成较少数量的线程。

关键是在注入点我想发出类似的东西(对不起,Cassandra代码片段的Datastax Java 驱动程序):

ResultSetFuture rsFuture = session.executeAsync(query);
Run Code Online (Sandbox Code Playgroud)

并注入未来进入连锁。这将使调用线程“自由”执行其他操作,而不是坐着等待结果。


我不知道我是否可以比这更清楚,但让我们按照这个例子。

我在主线程中运行一个循环:

for (int i = 0; i < 1000; i++) {
    getFutureOfMyLongRunningTask(i);
}
Run Code Online (Sandbox Code Playgroud)

这个循环只存在于主线程上,但每次调用该函数都会在线程池P 中加入一个新任务。现在假设P是大小为1的固定线程池。这意味着P 中只存在一个线程,并且它只能处理1 个任务。然而,主循环会将所有 1000 个任务排入队列。然后主循环将需要等待所有任务完成。

现在假设1000 个任务中的第一个任务需要执行长数据库查询。我们现在有两个选择:

  1. 查询在处理线程(属于线程池P同步 执行。这意味着我只是在块内发出查询并等待结果。这有效地阻止了任务处理,因为线程池P没有空闲线程。唯一的一个是在 IO 上阻塞if (!some condition satisfied)

  2. 查询在处理线程(属于线程池P异步 执行。这意味着我在块内发出查询并立即返回我将侦听的未来(可能数据库驱动程序将产生另一个线程并阻止线程等待结果)。但是,现在属于P的线程可以自由地处理至少另一个任务。if (!some condition satisfied)

在我看来,选项2比选项1更好,同样的推理可以应用于大小 > 1或动态大小的线程池。

我想要的只是保持线程池尽可能空闲以生成最少数量的线程以避免浪费资源。

希望这是有道理的。如果没有,请您解释一下我错在哪里?

Mat*_*all 5

而不是 using thenApplyAsync, use thenComposeor thenComposeAsync,这让函数返回 aCompletableFuture<Foo>而不是 a Foo。取而代之的return true,如果some condition 满足,你需要return CompletableFuture.completedFuture(true)

public CompletableFuture<Boolean> getFutureOfMyLongRunningTask() {
    CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
        // ... Some processing here ...
        if (somecondition failed)
            return false; // Task failed!

        return true; // OK
    }).thenComposeAsync((Boolean result) -> {
        if (!result) // check of previous stage fail
            return CompletableFuture.completedFuture(false);

        // ... Some processing here ...

        if (!some condition satisfied) {
            return runSomeOtherQuery()
        }

        return CompletableFuture.completedFuture(true); // OK
    }).thenApplyAsync((Boolean result) -> {
        if (!result) // check of previous stage fail
            return false;

        // ... Some processing here ...

        return true; // OK
    });

    // This is the result we have to wait for.
    return future;
}


public CompletableFuture<Boolean> runSomeOtherQuery() {
    ....
}
Run Code Online (Sandbox Code Playgroud)