ListenableFuture回调执行顺序

Ben*_*ler 5 java parallel-processing guava java-7

Guava的ListenableFuture库提供了一种将回调添加到未来任务的机制。这样做如下:

ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
Futures.addCallback(future, new FutureCallback<MyClass>() {
    @Override
    public void onSuccess(@Nullable MyClass myClass) {
      doSomething(myClass);
    }

    @Override
    public void onFailure(Throwable t) {
      printWarning(t);
    }}, myCallbackExecutor);
}
Run Code Online (Sandbox Code Playgroud)

您可以ListenableFuture通过调用其get函数来等待a 完成。例如:

MyClass myClass = future.get();
Run Code Online (Sandbox Code Playgroud)

我的问题是,一定将来的所有回调都可以保证在get终止之前运行。即如果将来在许多回调执行程序上注册了许多回调,那么所有的回调会在get返回之前完成吗?

编辑

我的用例是,我将一个构建器传递给许多类。每个类填充构建器的一个字段。我希望所有字段都异步填充,因为每个字段都需要一个外部查询来生成该字段的数据。我希望给我打电话的用户asyncPopulateBuilder收到一个Future可以打电话的电话,get并确保所有字段都已填充。我认为的方法如下:

final Builder b;
ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
Futures.addCallback(future, new FutureCallback<MyClass>() {
  @Override
  public void onSuccess(@Nullable MyClass myClass) {
    b.setMyClass(myClass);
  }

  @Override
  public void onFailure(Throwable t) {
    printWarning(t);
  }}, myCallbackExecutor);
}
// Do the same thing for all other fields.
Run Code Online (Sandbox Code Playgroud)

在这种情况下,直到所有字段都被填充之前,建议采用哪种阻塞方式?

Chr*_*irk 4

不保证回调在返回之前运行get。下面详细介绍一下。

至于如何解决这个用例,我建议将对每个字段数据的查询变成一个单独的Future,将它们与allAsList+结合起来transform,然后采取行动。(有一天我们可能会提供“合并”步骤的快捷方式。)

ListenableFuture<MyClass> future = myExecutor.submit(myCallable);

final ListenableFuture<Foo> foo =
    Futures.transform(
        future,
        new Function<MyClass, Foo>() { ... },
        myCallbackExecutor);
final ListenableFuture<Bar> bar = ...;
final ListenableFuture<Baz> baz = ...;

ListenableFuture<?> allAvailable = Futures.allAsList(foo, bar, baz);
ListenableFuture<?> allSet = Futures.transform(
    allAvailable, 
    new Function<Object, Object>() {
      @Override
      public Object apply(Object ignored) {
        // Use getUnchecked, since we know they already succeeded:
        builder.setFoo(Futures.getUnchecked(foo));
        builder.setFoo(Futures.getUnchecked(bar));
        builder.setFoo(Futures.getUnchecked(baz));
        return null;
      }
    }
};
Run Code Online (Sandbox Code Playgroud)

现在用户可以打电话allSet.get()等待人口。

(或者您可能希望allSet成为 a ,Future<Builder>以便用户能够获得对构建器的引用。或者您可能根本不需要完整的Future,只需要 a CountDownLatch,您可以在addCallback其中使用而不是transform并对锁存器进行倒计时在回调结束时。)

这种方法还可以简化错误处理。


RE:“回调之前运行吗get?”

首先,我很确定我们在规范中的任何地方都不能保证这一点,所以感谢您提出问题而不是仅仅追求它:)如果您最终想要依赖当前实现的某些行为,请提出问题这样我们就可以添加文档和测试。

其次,如果我非常字面地理解你的问题,你所要求的是不可能的:如果get()等待所有侦听器完成,那么任何调用的侦听器get()都会挂起!

您的问题的一个稍微宽松的版本是“所有听众至少在返回之前开始吗?” get()事实证明这也是不可能的:假设我将两个侦听器附加到同一个侦听器上Future以运行directExecutor(). 两个监听者只需调用get()并返回即可。其中一名听众必须先运行。当它调用 时get(),它将挂起,因为第二个侦听器尚未启动 - 在第一个侦听器完成之前也无法启动。(更一般地说,依赖任何给定Executor来迅速执行任务可能是危险的。)

一个更宽松的版本是“在返回之前Future至少会调用submit()每个侦听器吗get()?” 但这最终会导致与我刚才描述的场景相同的问题:调用submit(firstListener)adirectExecutor()运行任务并调用get(),直到第二个侦听器启动后才能完成,而在第一个侦听器完成之前不会发生。

如果有什么不同的话,那就是它开始听起来更有可能在任何侦听器执行之前get()返回。但由于线程调度的不可预测性,我们也不能依赖它。(再次强调:它没有记录,所以请不要依赖它,除非您要求记录它!)

  • @BenjyKessler:一种选择可能是创建一个“CountDownLatch”,并让每个回调在完成其工作后对锁存器进行倒计时。另一个线程可以在闩锁上“await”,知道当“await”成功解除阻塞时,所有字段都已设置。 (3认同)