Ben*_*ler 5 java parallel-processing guava java-7
Guava的ListenableFuture库提供了一种将回调添加到未来任务的机制。这样做如下:
ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
Futures.addCallback(future, new FutureCallback<MyClass>() {
@Override
public void onSuccess(@Nullable MyClass myClass) {
doSomething(myClass);
}
@Override
public void onFailure(Throwable t) {
printWarning(t);
}}, myCallbackExecutor);
}
Run Code Online (Sandbox Code Playgroud)
您可以ListenableFuture通过调用其get函数来等待a 完成。例如:
MyClass myClass = future.get();
Run Code Online (Sandbox Code Playgroud)
我的问题是,一定将来的所有回调都可以保证在get终止之前运行。即如果将来在许多回调执行程序上注册了许多回调,那么所有的回调会在get返回之前完成吗?
编辑
我的用例是,我将一个构建器传递给许多类。每个类填充构建器的一个字段。我希望所有字段都异步填充,因为每个字段都需要一个外部查询来生成该字段的数据。我希望给我打电话的用户asyncPopulateBuilder收到一个Future可以打电话的电话,get并确保所有字段都已填充。我认为的方法如下:
final Builder b;
ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
Futures.addCallback(future, new FutureCallback<MyClass>() {
@Override
public void onSuccess(@Nullable MyClass myClass) {
b.setMyClass(myClass);
}
@Override
public void onFailure(Throwable t) {
printWarning(t);
}}, myCallbackExecutor);
}
// Do the same thing for all other fields.
Run Code Online (Sandbox Code Playgroud)
在这种情况下,直到所有字段都被填充之前,建议采用哪种阻塞方式?
不保证回调在返回之前运行get。下面详细介绍一下。
至于如何解决这个用例,我建议将对每个字段数据的查询变成一个单独的Future,将它们与allAsList+结合起来transform,然后采取行动。(有一天我们可能会提供“合并”步骤的快捷方式。)
ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
final ListenableFuture<Foo> foo =
Futures.transform(
future,
new Function<MyClass, Foo>() { ... },
myCallbackExecutor);
final ListenableFuture<Bar> bar = ...;
final ListenableFuture<Baz> baz = ...;
ListenableFuture<?> allAvailable = Futures.allAsList(foo, bar, baz);
ListenableFuture<?> allSet = Futures.transform(
allAvailable,
new Function<Object, Object>() {
@Override
public Object apply(Object ignored) {
// Use getUnchecked, since we know they already succeeded:
builder.setFoo(Futures.getUnchecked(foo));
builder.setFoo(Futures.getUnchecked(bar));
builder.setFoo(Futures.getUnchecked(baz));
return null;
}
}
};
Run Code Online (Sandbox Code Playgroud)
现在用户可以打电话allSet.get()等待人口。
(或者您可能希望allSet成为 a ,Future<Builder>以便用户能够获得对构建器的引用。或者您可能根本不需要完整的Future,只需要 a CountDownLatch,您可以在addCallback其中使用而不是transform并对锁存器进行倒计时在回调结束时。)
这种方法还可以简化错误处理。
RE:“回调之前运行吗get?”
首先,我很确定我们在规范中的任何地方都不能保证这一点,所以感谢您提出问题而不是仅仅追求它:)如果您最终想要依赖当前实现的某些行为,请提出问题这样我们就可以添加文档和测试。
其次,如果我非常字面地理解你的问题,你所要求的是不可能的:如果get()等待所有侦听器完成,那么任何调用的侦听器get()都会挂起!
您的问题的一个稍微宽松的版本是“所有听众至少在返回之前开始吗?” get()事实证明这也是不可能的:假设我将两个侦听器附加到同一个侦听器上Future以运行directExecutor(). 两个监听者只需调用get()并返回即可。其中一名听众必须先运行。当它调用 时get(),它将挂起,因为第二个侦听器尚未启动 - 在第一个侦听器完成之前也无法启动。(更一般地说,依赖任何给定Executor来迅速执行任务可能是危险的。)
一个更宽松的版本是“在返回之前Future至少会调用submit()每个侦听器吗get()?” 但这最终会导致与我刚才描述的场景相同的问题:调用submit(firstListener)adirectExecutor()运行任务并调用get(),直到第二个侦听器启动后才能完成,而在第一个侦听器完成之前不会发生。
如果有什么不同的话,那就是它开始听起来更有可能在任何侦听器执行之前get()返回。但由于线程调度的不可预测性,我们也不能依赖它。(再次强调:它没有记录,所以请不要依赖它,除非您要求记录它!)
| 归档时间: |
|
| 查看次数: |
3658 次 |
| 最近记录: |