Jat*_*tin 67 java concurrency java-8 completable-future
我想转换List<CompletableFuture<X>>
成CompletableFuture<List<T>>
.这非常有用,因为当您有许多异步任务并且需要获得所有异步任务的结果时.
如果其中任何一个失败,则最终的未来将失败.这就是我实施的方式:
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
Run Code Online (Sandbox Code Playgroud)
要运行它:
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
Run Code Online (Sandbox Code Playgroud)
如果其中任何一个失败则失败.即使有一百万个期货,它也能按预期产出.我遇到的问题是:如果有超过5000个期货,如果其中任何一个失败,我得到一个StackOverflowError
:
java.util.concurrent.CompletableFuture中的java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)中的线程"pool-1-thread-2611"java.lang.StackOverflowError中的异常$ ThenCompose.run(CompletableFuture.java) :1487)java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)at java.util.concurrent.CompletableFuture $ ThenCompose.run( CompletableFuture.java:1487)
我做错了什么?
注意:当任何未来失败时,上述返回的未来将失败.接受的答案也应该采取这一点.
Mis*_*sha 82
用途CompletableFuture.allOf(...)
:
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
Run Code Online (Sandbox Code Playgroud)
关于您的实施的一些评论:
您使用的.thenComposeAsync
,.thenApplyAsync
并且.thenCombineAsync
很可能没有做你的期望.这些...Async
方法在单独的线程中运行提供给它们的函数.因此,在您的情况下,您正在将新项添加到列表中以在提供的执行程序中运行.没有必要将轻量级操作填充到缓存的线程执行程序中.thenXXXXAsync
没有充分理由不要使用方法.
此外,reduce
不应该用于累积到可变容器中.即使流顺序时它可能正常工作,但如果要使流并行,它将失败.要执行可变缩减,请.collect
改为使用.
如果要在第一次失败后立即异常完成整个计算,请在sequence
方法中执行以下操作:
CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
com.forEach(f -> f.whenComplete((t, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
}
}));
return result;
Run Code Online (Sandbox Code Playgroud)
另外,如果要在第一次失败时取消剩余的操作,请在exec.shutdownNow();
之后添加result.completeExceptionally(ex);
.当然,这假设exec
只存在于这一个计算中.如果没有,你将不得不循环并取消每个剩余的Future
单独.
Hol*_*ger 11
正如Misha指出的那样,你过度使用…Async
操作.此外,您正在编写一个复杂的操作链,用于建立依赖关系,这不依赖于您的程序逻辑:
然后,可以递归地执行取消(显式地或由于异常)这个递归组合的作业,并且可能会失败StackOverflowError
.这是依赖于实现的.
正如Misha已经显示的那样,有一种方法allOf
可以让你模拟你的初衷,定义一个取决于你列表中所有工作的工作.
但是,值得注意的是,即使这样也没有必要.由于您使用的是无界线程池执行程序,因此您只需将收集结果的异步作业发布到列表中即可完成.无论如何,通过询问每项工作的结果来暗示等待完成.
ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
.mapToObj(x -> CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
() -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
executorService);
Run Code Online (Sandbox Code Playgroud)
当线程数量有限且作业可能产生额外的异步作业时,使用编写相关操作的方法很重要,以避免等待作业从必须首先完成的作业中窃取线程,但这里也不是这种情况.
在这种特定情况下,一个作业简单地迭代大量的先决条件作业并且在必要时等待可能比对大量依赖性进行建模并使每个作业通知从属作业有关完成更有效.
您可以获得Spotify的CompletableFutures
库和使用allAsList
方法.我认为它的灵感来自Guava的Futures.allAsList
方法.
public static <T> CompletableFuture<List<T>> allAsList(
List<? extends CompletionStage<? extends T>> stages) {
Run Code Online (Sandbox Code Playgroud)
如果您不想使用库,这是一个简单的实现:
public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()])
).thenApply(ignored ->
futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);
}
Run Code Online (Sandbox Code Playgroud)
在 CompletableFuture 上使用 thenCombine 的示例序列操作
public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){
CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>());
BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList =
(acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;});
BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ;
return com.stream()
.reduce(identity,
combineToList,
combineLists);
}
}
Run Code Online (Sandbox Code Playgroud)
如果您不介意使用第 3 方库cyclops-react(我是作者)有一组用于 CompletableFutures(以及 Optionals、Streams 等)的实用方法
List<CompletableFuture<String>> listOfFutures;
CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);
Run Code Online (Sandbox Code Playgroud)
为了加上@Misha 接受的答案,它可以进一步扩展为收集器:
public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() {
return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com));
}
Run Code Online (Sandbox Code Playgroud)
现在你可以:
Stream<CompletableFuture<Integer>> stream = Stream.of(
CompletableFuture.completedFuture(1),
CompletableFuture.completedFuture(2),
CompletableFuture.completedFuture(3)
);
CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
26749 次 |
最近记录: |