等待未来的名单

use*_*796 119 java multithreading future

我有一个返回List期货的方法

List<Future<O>> futures = getFutures();
Run Code Online (Sandbox Code Playgroud)

现在我想要等到所有期货都成功处理完成,或者将来输出的任何任务抛出异常.即使一个任务抛出异常,也没有必要等待其他期货.

简单的方法是

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}
Run Code Online (Sandbox Code Playgroud)

但问题是,例如,如果第四个未来抛出异常,那么我将不必要地等待前3个期货可用.

怎么解决这个?会以任何方式倒数闩锁帮助吗?我无法使用Future,isDone因为java doc说

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
Run Code Online (Sandbox Code Playgroud)

dce*_*chi 112

您可以使用CompletionService在它们准备好后立即接收期货,如果其中一个抛出异常,则取消处理.像这样的东西:

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService = 
       new ExecutorCompletionService<SomeResult>(executor);

//4 tasks
for(int i = 0; i < 4; i++) {
   completionService.submit(new Callable<SomeResult>() {
       public SomeResult call() {
           ...
           return result;
       }
   });
}

int received = 0;
boolean errors = false;

while(received < 4 && !errors) {
      Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
      try {
         SomeResult result = resultFuture.get();
         received ++;
         ... // do something with the result
      }
      catch(Exception e) {
             //log
         errors = true;
      }
}
Run Code Online (Sandbox Code Playgroud)

我认为如果其中一个任务抛出错误,您可以进一步改进以取消任何仍在执行的任务.

编辑:我在这里找到了一个更全面的例子:http://blog.teamlazerbeez.com/2009/04/29/java-completionservice/

  • 为了超时等待将来出现在队列中,在`CompletionService`上有一个poll(秒)方法. (2认同)

And*_*ejs 93

如果您使用的是Java 8,则可以使用CompletableFuture和CompletableFuture.allOf更轻松地完成此操作,仅在完成所有提供的CompletableFutures之后才应用回调.

// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.

public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    return CompletableFuture.allOf(cfs)
            .thenApply(ignored -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList())
            );
}
Run Code Online (Sandbox Code Playgroud)

  • 这里最大的问题是“为什么他们不将所有这些逻辑放入 CompletableFuture.allOf 中并使其返回 T 而不是 void?”为什么我必须将其复制并粘贴到 DecemberB 项目中?Scala 有 `Future.sequence` 为什么 java 没有?:'( (7认同)
  • 这解决了一个不同的问题.如果您有"Future"实例,则无法应用此方法.将"Future"转换为"CompletableFuture"并不容易. (4认同)
  • 嗨@Andrejs,你能解释一下这段代码的作用吗?我在多个地方看到了这个建议,但对实际发生的事情感到困惑。如果其中一个线程失败,如何处理异常? (3认同)
  • @VSEWHGHP 来自 javadoc:如果任何给定的 CompletableFutures 异常完成,则返回的 CompletableFuture 也会这样做,CompletionException 将此异常作为其原因。 (2认同)
  • 是的,所以我正在跟进,有没有什么方法可以使用此代码片段,但获取成功完成的所有其他线程的值?我是否应该迭代 CompletableFutures 列表并调用 get 忽略 CompletableFuture&lt;List&lt;T&gt;&gt; 因为序列函数负责确保所有线程都完成,无论结果还是异常? (2认同)

jmi*_*rez 15

您可以使用ExecutorCompletionService.文档甚至还有一个确切用例的示例:

相反,假设您希望使用任务集的第一个非null结果,忽略任何遇到异常,并在第一个任务准备就绪时取消所有其他任务:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    int n = solvers.size();
    List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
    Result result = null;
    try {
        for (Callable<Result> s : solvers)
            futures.add(ecs.submit(s));
        for (int i = 0; i < n; ++i) {
            try {
                Result r = ecs.take().get();
                if (r != null) {
                    result = r;
                    break;
                }
            } catch (ExecutionException ignore) {
            }
        }
    } finally {
        for (Future<Result> f : futures)
            f.cancel(true);
    }

    if (result != null)
        use(result);
}
Run Code Online (Sandbox Code Playgroud)

需要注意的重要一点是ecs.take()将获得第一个完成的任务,而不仅仅是第一个提交的任务.因此,您应该按照完成执行(或抛出异常)的顺序获取它们.


sen*_*982 9

CompletableFuture在Java 8中使用

    // Kick of multiple, asynchronous lookups
    CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
    CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
    CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");

    // Wait until they are all done
    CompletableFuture.allOf(page1,page2,page3).join();

    logger.info("--> " + page1.get());
Run Code Online (Sandbox Code Playgroud)

  • 这应该是公认的答案。它也是 Spring 官方文档的一部分:https://spring.io/guides/gs/async-method/ (6认同)
  • @maaw 问题没有指定 spring,答案也没有显示如何构建这样完整的未来。Executor接口在submit方法中指定了一个常规的Future&lt;T&gt;。 (2认同)