Sri*_*nth 4 java multithreading asynchronous completable-future completion-stage
这是我面临的问题的简短代码版本:
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
/*
try {
Thread.sleep(2000);
} catch (InterruptedException ignored) {}
*/
//System.out.println("supplyAsync: " + Thread.currentThread().getName());
return 1;
})
.thenApply(i -> {
System.out.println("apply: " + Thread.currentThread().getName());
return i + 1;
})
.thenAccept((i) -> {
System.out.println("accept: " + Thread.currentThread().getName());
System.out.println("result: " + i);
}).join();
}
Run Code Online (Sandbox Code Playgroud)
这是我得到的输出:
apply: main
accept: main
result: 2
Run Code Online (Sandbox Code Playgroud)
看到main那里我很惊讶!Thread.sleep()我预计当我取消注释该调用或什至取消注释单个语句时会发生类似的情况sysout:
supplyAsync: ForkJoinPool.commonPool-worker-1
apply: ForkJoinPool.commonPool-worker-1
accept: ForkJoinPool.commonPool-worker-1
result: 2
Run Code Online (Sandbox Code Playgroud)
我理解thenApplyAsync()将确保它不会在main线程上运行,但我想避免将供应商从运行的线程返回的数据传递supplyAsync到将要运行的线程以及链中的thenApply其他后续线程。then
该方法thenApply在 caller\xe2\x80\x99s 线程中评估函数,因为 future 已经完成。当然,当你插入一个sleep到供应商时,未来还没有完成的时候,thenApply就被调用了。即使是打印语句也可能会减慢供应商的速度,以至于首先调用主thenApply线程thenAccept。但这不是可靠的行为,重复运行代码时可能会得到不同的结果。
未来不仅不记得哪个线程完成了它,也没有办法告诉任意线程执行特定的代码。该线程可能正忙于其他事情,完全不合作,甚至同时终止。
\n只要考虑一下
\nExecutorService s = Executors.newSingleThreadExecutor();\nCompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {\n System.out.println("supplyAsync: " + Thread.currentThread().getName());\n return 1;\n}, s);\ns.shutdown();\ns.awaitTermination(1, TimeUnit.DAYS);\ncf.thenApply(i -> {\n System.out.println("apply: " + Thread.currentThread().getName());\n return i + 1;\n})\n.thenAccept((i) -> {\n System.out.println("accept: " + Thread.currentThread().getName());\n System.out.println("result: " + i);\n}).join();\nRun Code Online (Sandbox Code Playgroud)\n我们如何期望传递给已终止的 pool\xe2\x80\x99s 工作线程thenApply并thenAccept在其中执行的函数?
我们也可以写
\nCompletableFuture<Integer> cf = new CompletableFuture<>();\n\nThread t = new Thread(() -> {\n System.out.println("completing: " + Thread.currentThread().getName());\n cf.complete(1);\n});\nt.start();\nt.join();\n\nSystem.out.println("completer: " + t.getName() + " " + t.getState());\ncf.thenApply(i -> {\n System.out.println("apply: " + Thread.currentThread().getName());\n return i + 1;\n})\n.thenAccept((i) -> {\n System.out.println("accept: " + Thread.currentThread().getName());\n System.out.println("result: " + i);\n}).join();\nRun Code Online (Sandbox Code Playgroud)\n这会打印类似的东西
\ncompleting: Thread-0\ncompleter: Thread-0 TERMINATED\napply: main\naccept: main\nresult: 2\nRun Code Online (Sandbox Code Playgroud)\n显然,我们可以\xe2\x80\x99t坚持让这个线程处理后续阶段。
\nBut even when the thread is a still alive worker thread of a pool, it doesn\xe2\x80\x99t know that it has completed a future nor has it a notion of \xe2\x80\x9cprocessing subsequent stages\xe2\x80\x9d. Following the Executor abstraction, it just has received an arbitrary Runnable from the queue and after processing it, it proceeds with its main loop, fetching the next Runnable from the queue.
So once the first future has been completed, the only way to tell it to do the work of completing other futures, is by enqueuing the tasks. This is what happens when using thenApplyAsync specifying the same pool or performing all actions with the \xe2\x80\xa6Async methods without an executor, i.e. using the default pool.
When you use a single threaded executor for all \xe2\x80\xa6Async methods, you can be sure that all actions are executed by the same thread, but they will still get through the pool\xe2\x80\x99s queue. Since even then, it\xe2\x80\x99s the main thread actually enqueuing the dependent actions in case of an already completed future, a thread safe queue and hence, synchronization overhead, is unavoidable.
But note that even if you manage to create the chain of dependent actions first, before a single worker thread processes them all sequentially, this overhead is still there. Each future\xe2\x80\x99s completion is done by storing the new state in a thread safe way, making the result potentially visible to all other threads, and atomically checking whether a concurrent completion (e.g. a cancelation) has happened in the meanwhile. Then, the dependent action(s) chained by other threads will be fetched, of course, in a thread safe way, before they are executed.
\nAll these actions with synchronization semantics make it unlikely that there are benefits of processing the data by the same thread when having a chain of dependent CompletableFutures.
The only way to have an actual local processing potentially with performance benefits is by using
\nCompletableFuture.runAsync(() -> {\n System.out.println("supplyAsync: " + Thread.currentThread().getName());\n int i = 1;\n\n System.out.println("apply: " + Thread.currentThread().getName());\n i = i + 1;\n\n System.out.println("accept: " + Thread.currentThread().getName());\n System.out.println("result: " + i);\n}).join();\nRun Code Online (Sandbox Code Playgroud)\nOr, in other words, if you don\xe2\x80\x99t want detached processing, don\xe2\x80\x99t create detached processing stages in the first place.
\n| 归档时间: |
|
| 查看次数: |
2920 次 |
| 最近记录: |