等待并行RX用户完成

use*_*555 7 java reactive-programming rx-java

我正在寻找等待异步任务在rx-java中完成的最佳方法.

作为一个常见示例,有一个函数从本地存储获取id列表,然后在远程系统中查询这些Id,然后将远程系统结果合并到单个报告中并返回给函数的调用者.由于对远程系统的调用很慢,我们希望它们以异步方式完成,我只想在所有调用都返回并且其结果已经处理后返回.

我发现这样做的唯一可靠方法是轮询订阅以检查它是否已取消订阅.但我认为似乎并不是'RX'做事的方式!

作为一个例子,我从http://howrobotswork.wordpress.com/2013/10/28/using-rxjava-in-android/中取了一个例子并稍微修改它以使它成为非android并显示我的意思.我必须在main()方法的代码中使用以下代码来阻止它立即退出.

while (!subscription.isUnsubscribed()) {
    Thread.sleep(100);
}
Run Code Online (Sandbox Code Playgroud)

下面列出了该示例的完整代码(如果您尝试编译它,则依赖于http://square.github.io/retrofit/)

package org.examples;

import retrofit.RestAdapter;
import retrofit.http.GET;
import retrofit.http.Query;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

public class AsyncExample {

    public static void main(String[] args) throws InterruptedException {
        final Subscription subscription = Observable.from("London", "Paris", "Berlin")
                .flatMap(new Func1<String, Observable<WeatherData>>() {
                    @Override
                    public Observable<WeatherData> call(String s) {
                        return ApiManager.getWeatherData(s);
                    }
                })
                .subscribe(
                        new Action1<WeatherData>() {
                            @Override
                            public void call(WeatherData weatherData) {
                                System.out.println(Thread.currentThread().getName() + " - " + weatherData.name + ", " + weatherData.base);
                            }
                        },
                        new Action1<Throwable>() {
                            @Override
                            public void call(Throwable throwable) {
                                System.out.println(Thread.currentThread().getName() + " - ERROR: " + throwable.getMessage());
                            }
                        },
                        new Action0() {
                            @Override
                            public void call() {
                                System.out.println(Thread.currentThread().getName() + " - COMPLETED");
                            }
                        }
                );

        // Have to poll subscription to check if its finished - is this the only way to do it?
        while (!subscription.isUnsubscribed()) {
            Thread.sleep(100);
        }
    }
}

class ApiManager {

    private interface ApiManagerService {
        @GET("/weather")
        WeatherData getWeather(@Query("q") String place, @Query("units") String units);
    }

    private static final RestAdapter restAdapter = new RestAdapter.Builder()
            .setEndpoint("http://api.openweathermap.org/data/2.5")
            .build();
    private static final ApiManagerService apiManager = restAdapter.create(ApiManagerService.class);

    public static Observable<WeatherData> getWeatherData(final String city) {
        return Observable.create(new Observable.OnSubscribe<WeatherData>() {
            @Override
            public void call(Subscriber<? super WeatherData> subscriber) {
                try {
                    System.out.println(Thread.currentThread() + " - Getting " + city);
                    subscriber.onNext(apiManager.getWeather(city, "metric"));
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        }).subscribeOn(Schedulers.io());
    }
}
Run Code Online (Sandbox Code Playgroud)

kjo*_*nes 5

如果您正在询问如何将异步Observable转换为同步Observable,您可以使用.toBlocking()然后使用.last()等待Observable完成.例如,在计时器完成之前,以下内容将不会继续虽然计时器在计算线程上执行.

try {
    Observable
            .timer(10, TimeUnit.SECONDS)
            .toBlocking()
            .last();    // Wait for observable to complete. Last item discarded.
} catch (IllegalArgumentException ex) {
    // No items or error.
}
Run Code Online (Sandbox Code Playgroud)

除非绝对必要,否则您可能不应该使用此方法.通常,应用程序终止将由其他一些事件(按键,关闭菜单项单击等)控制.您的网络请求Observables将异步完成,您将在onNext(),onCompleted()和onError()调用中执行操作更新UI或显示错误.

此外,Retrofit的优点在于它内置了对RxJava的支持,并将在后台执行网络请求.要使用该支持,请将接口声明为返回Observable.

interface ApiManagerService {
    @GET("/weather")
    Observable<WeatherData> getWeather(@Query("q") String place, @Query("units") String units);
}
Run Code Online (Sandbox Code Playgroud)

这允许您将getWeatherData()方法简化为:

public static Observable<WeatherData> getWeatherData(final String city) {
    return apiManager.getWeather(city, "metric");
}
Run Code Online (Sandbox Code Playgroud)


Geo*_*iki 1

在大多数情况下,最好不要阻塞方法。尽量做到非阻塞。如果您需要在 Observable 完成后执行某些操作,只需从onCompleted回调中调用您的逻辑,或使用doOnCompleted运算符。

如果您确实需要阻止,那么您可以使用Blocking Observable Operators. 大多数这些运算符都会阻塞,直到 Observable 完成。

然而,阻止你的主要方法过早退出可以通过System.in.read()最后一个简单的方法来实现。