RX:并行运行Zipped Observables?

spi*_*ce7 15 java system.reactive rx-java

所以我正在玩RX(非常酷),我一直在转换我的api,它访问Android中的sqlite数据库以返回observables.

所以我开始尝试解决的问题之一就是"如果我想要进行3次API调用,得到结果,然后在完成后再进行一些处理,该怎么办?"

我花了一个小时或两个小时,但我最终找到了Zip功能,它帮助我轻松地:

    Observable<Integer> one = getNumberedObservable(1);
    Observable<Integer> two = getNumberedObservable(2);
    Observable<Integer> three = getNumberedObservable(3);

    Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer arg0, Integer arg1, Integer arg2) {
            System.out.println("Zip0: " + arg0);
            System.out.println("Zip1: " + arg1);
            System.out.println("Zip2: " + arg2);
            return arg0 + arg1 + arg2;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer arg0) {
            System.out.println("Zipped Result: " + arg0);
        }
    });

public static Observable<Integer> getNumberedObservable(final int value) {
    return Observable.create(new OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(value);
            observer.onCompleted();
            return Subscriptions.empty();
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

大!这太酷了.

因此,当我压缩3个可观察量时,它们会串行运行.如果我希望它们同时并行运行,那么我最终会得到更快的结果呢?我玩了几件事,甚至尝试阅读人们用C#编写的一些原始RX内容.我确信有一个简单的答案.谁能指出我正确的方向?这样做的正确方法是什么?

Jam*_*rld 18

zip 确实并行地运行了observable - 但它也连续地订阅了它们.由于您getNumberedObservable在订阅方法中完成,因此给出了连续运行的印象,但实际上没有这样的限制.

您可以尝试使用一些长期运行的Observable,这些Observable比其订阅逻辑更长,例如timer,或者使用该subscribeOn方法异步订阅传递给每个流zip.


Bra*_*don 5

在RxJava中,使用toAsync将常规函数转换为将在线程上运行并将其结果返回到observable中的函数.

我不太了解Java语法,但它看起来像:

public static Integer getNumber(final int value) { return value; }
public static Observable<Integer> getNumberedObservable(final int value) {
    return rx.util.functions.toAsync(new Func<Integer,Integer>() {
        @Override
        public Integer call(Integer value) { return getNumber(value); }
    });
};
Run Code Online (Sandbox Code Playgroud)

如果getNumber真的访问数据库,这将工作.当你调用getNumberedObservable它时,返回一个observable,getNumber当你订阅它时,它将在一个单独的线程上运行.