Gar*_*son 6 java parallel-processing executor completable-future
我开始熟悉 JavaCompletableFuture组合,使用过 JavaScript 承诺。基本上,组合只是在指定的执行器上安排了链式命令。但是我不确定在执行组合时哪个线程正在运行。
假设我有两个执行者,executor1并且executor2;为简单起见,假设它们是单独的线程池。我安排了一个CompletableFuture(使用非常松散的描述):
CompletableFuture<Foo> futureFoo = CompletableFuture.supplyAsync(this::getFoo, executor1);
Run Code Online (Sandbox Code Playgroud)
然后,当做到这一点我转换Foo到Bar使用第二执行人:
CompletableFuture<Bar> futureBar .thenApplyAsync(this::fooToBar, executor2);
Run Code Online (Sandbox Code Playgroud)
我知道getFoo()将从executor1线程池中的线程调用。我知道fooToBar()将从executor2线程池中的线程调用。
但是什么线程用于实际的组合,即在getFoo()完成和futureFoo()完成之后;但之前的fooToBar()命令被安排在executor2?换句话说,哪个线程实际运行代码以在第二个执行器上调度第二个命令?
调度是否作为executor1调用的同一线程的一部分执行getFoo()?如果是这样,这个可完成的未来组合是否等同于我fooToBar()在executor1任务的第一个命令中自己手动安排?
这是故意未指定的。在实践中,当Async调用没有后缀的变体并表现出类似的行为时,它将由同样处理链式操作的相同代码处理。
所以当我们使用下面的测试代码
CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
return "";
}, r -> new Thread(r, "A").start())
.thenAcceptAsync(s -> {}, r -> {
System.out.println("scheduled by " + Thread.currentThread());
new Thread(r, "B").start();
});
Run Code Online (Sandbox Code Playgroud)
它可能会打印
scheduled by Thread[A,5,main]
Run Code Online (Sandbox Code Playgroud)
因为完成前一阶段的线程用于调度依赖操作。
但是当我们使用
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> "",
r -> new Thread(r, "A").start());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
first.thenAcceptAsync(s -> {}, r -> {
System.out.println("scheduled by " + Thread.currentThread());
new Thread(r, "B").start();
});
Run Code Online (Sandbox Code Playgroud)
它可能会打印
scheduled by Thread[main,5,main]
Run Code Online (Sandbox Code Playgroud)
到主线程调用时thenAcceptAsync,第一个 future 已经完成,主线程将自行调度操作。
但这并不是故事的结局。当我们使用
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(5));
return "";
}, r -> new Thread(r, "A").start());
Set<String> s = ConcurrentHashMap.newKeySet();
Runnable submitter = () -> {
String n = Thread.currentThread().getName();
do {
for(int i = 0; i < 1000; i++)
first.thenAcceptAsync(x -> s.add(n+" "+Thread.currentThread().getName()),
Runnable::run);
} while(!first.isDone());
};
Thread b = new Thread(submitter, "B");
Thread c = new Thread(submitter, "C");
b.start();
c.start();
b.join();
c.join();
System.out.println(s);
Run Code Online (Sandbox Code Playgroud)
它不仅可以打印组合B A和C A来自第一个场景B B和C C来自第二个场景。在我的机器上,它也可重现地打印组合,B C并C B指示thenAcceptAsync一个线程传递给的操作被另一个线程同时调用thenAcceptAsync不同的操作提交给执行程序。
这与评估此答案中描述的传递给thenApply(不带Async)的函数的线程的场景相匹配。正如开头所说,这正是我所期望的,因为这两件事很可能由相同的代码处理。但与评估传递给 的函数的线程不同,文档中甚至没有提到调用方法的线程。所以理论上,另一个实现可以使用一个完全不同的线程,既不调用未来的方法也不完成它。thenApplyexecuteExecutor
最后是一个简单的程序,它与您的代码片段相似,并允许您使用它。
输出确认您提供的执行器被调用来完成(除非您足够早地显式调用完成 - 这会在完成的调用线程中发生),当它等待的条件准备就绪时 - Future 上的 get() 会阻塞,直到未来已经完成。
提供一个参数 - 有一个执行器 1 和执行器 2,不提供参数则只有一个执行器。输出是(相同的执行器 - 事物在同一执行器中作为单独的任务顺序运行)-
In thread Thread[main,5,main] - getFoo
In thread Thread[main,5,main] - getFooToBar
In thread Thread[pool-1-thread-1,5,main] - Supplying Foo
In thread Thread[pool-1-thread-1,5,main] - fooToBar
In thread Thread[main,5,main] - Completed
Run Code Online (Sandbox Code Playgroud)
OR(两个执行程序 - 事物再次按顺序运行,但使用不同的执行程序)-
In thread Thread[main,5,main] - getFoo
In thread Thread[main,5,main] - getFooToBar
In thread Thread[pool-1-thread-1,5,main] - Supplying Foo
In thread Thread[pool-2-thread-1,5,main] - fooToBar
In thread Thread[main,5,main] - Completed
Run Code Online (Sandbox Code Playgroud)
请记住:带有执行器的代码(在本示例中可以立即在另一个线程中启动。甚至在设置 FooToBar 之前就调用了 getFoo)。
代码如下-
package your.test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
public class TestCompletableFuture {
private static void dumpWhichThread(final String msg) {
System.err.println("In thread " + Thread.currentThread().toString() + " - " + msg);
}
private static final class Foo {
final int i;
Foo(int i) {
this.i = i;
}
};
public static Supplier<Foo> getFoo() {
dumpWhichThread("getFoo");
return new Supplier<Foo>() {
@Override
public Foo get() {
dumpWhichThread("Supplying Foo");
return new Foo(10);
}
};
}
private static final class Bar {
final String j;
public Bar(final String j) {
this.j = j;
}
};
public static Function<Foo, Bar> getFooToBar() {
dumpWhichThread("getFooToBar");
return new Function<Foo, Bar>() {
@Override
public Bar apply(Foo t) {
dumpWhichThread("fooToBar");
return new Bar("" + t.i);
}
};
}
public static void main(final String args[]) throws InterruptedException, ExecutionException, TimeoutException {
final TestCompletableFuture obj = new TestCompletableFuture();
obj.running(args.length == 0);
}
private String running(final boolean sameExecutor) throws InterruptedException, ExecutionException, TimeoutException {
final Executor executor1 = Executors.newSingleThreadExecutor();
final Executor executor2 = sameExecutor ? executor1 : Executors.newSingleThreadExecutor();
CompletableFuture<Foo> futureFoo = CompletableFuture.supplyAsync(getFoo(), executor1);
CompletableFuture<Bar> futureBar = futureFoo.thenApplyAsync(getFooToBar(), executor2);
try {
// Try putting a complete here before the get ..
return futureBar.get(50, TimeUnit.SECONDS).j;
}
finally {
dumpWhichThread("Completed");
}
}
}
Run Code Online (Sandbox Code Playgroud)
哪个线程触发 Bar 阶段进行 - 在上面 - 它是 executor1。一般来说,完成 future 的线程(即给它一个值)是释放依赖于它的事物的线程。如果您立即在主线程上完成 FutureFoo - 它将是触发它的那个。
所以你必须小心这一点。如果您有“N”件事都在等待未来的结果 - 但仅使用单线程执行程序 - 那么计划的第一个执行程序将阻塞该执行程序,直到它完成。您可以推断出 M 个线程、N 个 future - 它可以衰减为“M”锁,从而阻止其余事情的进展。
| 归档时间: |
|
| 查看次数: |
319 次 |
| 最近记录: |