use*_*555 7 java reactive-programming rx-java
我正在寻找等待异步任务在rx-java中完成的最佳方法.
作为一个常见示例,有一个函数从本地存储获取id列表,然后在远程系统中查询这些Id,然后将远程系统结果合并到单个报告中并返回给函数的调用者.由于对远程系统的调用很慢,我们希望它们以异步方式完成,我只想在所有调用都返回并且其结果已经处理后返回.
我发现这样做的唯一可靠方法是轮询订阅以检查它是否已取消订阅.但我认为似乎并不是'RX'做事的方式!
作为一个例子,我从http://howrobotswork.wordpress.com/2013/10/28/using-rxjava-in-android/中取了一个例子并稍微修改它以使它成为非android并显示我的意思.我必须在main()方法的代码中使用以下代码来阻止它立即退出.
while (!subscription.isUnsubscribed()) {
Thread.sleep(100);
}
Run Code Online (Sandbox Code Playgroud)
下面列出了该示例的完整代码(如果您尝试编译它,则依赖于http://square.github.io/retrofit/)
package org.examples;
import retrofit.RestAdapter;
import retrofit.http.GET;
import retrofit.http.Query;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
public class AsyncExample {
public static void main(String[] args) throws InterruptedException {
final Subscription subscription = Observable.from("London", "Paris", "Berlin")
.flatMap(new Func1<String, Observable<WeatherData>>() {
@Override
public Observable<WeatherData> call(String s) {
return ApiManager.getWeatherData(s);
}
})
.subscribe(
new Action1<WeatherData>() {
@Override
public void call(WeatherData weatherData) {
System.out.println(Thread.currentThread().getName() + " - " + weatherData.name + ", " + weatherData.base);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(Thread.currentThread().getName() + " - ERROR: " + throwable.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println(Thread.currentThread().getName() + " - COMPLETED");
}
}
);
// Have to poll subscription to check if its finished - is this the only way to do it?
while (!subscription.isUnsubscribed()) {
Thread.sleep(100);
}
}
}
class ApiManager {
private interface ApiManagerService {
@GET("/weather")
WeatherData getWeather(@Query("q") String place, @Query("units") String units);
}
private static final RestAdapter restAdapter = new RestAdapter.Builder()
.setEndpoint("http://api.openweathermap.org/data/2.5")
.build();
private static final ApiManagerService apiManager = restAdapter.create(ApiManagerService.class);
public static Observable<WeatherData> getWeatherData(final String city) {
return Observable.create(new Observable.OnSubscribe<WeatherData>() {
@Override
public void call(Subscriber<? super WeatherData> subscriber) {
try {
System.out.println(Thread.currentThread() + " - Getting " + city);
subscriber.onNext(apiManager.getWeather(city, "metric"));
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
}
Run Code Online (Sandbox Code Playgroud)
如果您正在询问如何将异步Observable转换为同步Observable,您可以使用.toBlocking()然后使用.last()等待Observable完成.例如,在计时器完成之前,以下内容将不会继续虽然计时器在计算线程上执行.
try {
Observable
.timer(10, TimeUnit.SECONDS)
.toBlocking()
.last(); // Wait for observable to complete. Last item discarded.
} catch (IllegalArgumentException ex) {
// No items or error.
}
Run Code Online (Sandbox Code Playgroud)
除非绝对必要,否则您可能不应该使用此方法.通常,应用程序终止将由其他一些事件(按键,关闭菜单项单击等)控制.您的网络请求Observables将异步完成,您将在onNext(),onCompleted()和onError()调用中执行操作更新UI或显示错误.
此外,Retrofit的优点在于它内置了对RxJava的支持,并将在后台执行网络请求.要使用该支持,请将接口声明为返回Observable.
interface ApiManagerService {
@GET("/weather")
Observable<WeatherData> getWeather(@Query("q") String place, @Query("units") String units);
}
Run Code Online (Sandbox Code Playgroud)
这允许您将getWeatherData()方法简化为:
public static Observable<WeatherData> getWeatherData(final String city) {
return apiManager.getWeather(city, "metric");
}
Run Code Online (Sandbox Code Playgroud)
在大多数情况下,最好不要阻塞方法。尽量做到非阻塞。如果您需要在 Observable 完成后执行某些操作,只需从onCompleted回调中调用您的逻辑,或使用doOnCompleted运算符。
如果您确实需要阻止,那么您可以使用Blocking Observable Operators. 大多数这些运算符都会阻塞,直到 Observable 完成。
然而,阻止你的主要方法过早退出可以通过System.in.read()最后一个简单的方法来实现。
| 归档时间: |
|
| 查看次数: |
14134 次 |
| 最近记录: |