spi*_*ce7 15 java system.reactive rx-java
所以我正在玩RX(非常酷),我一直在转换我的api,它访问Android中的sqlite数据库以返回observables.
所以我开始尝试解决的问题之一就是"如果我想要进行3次API调用,得到结果,然后在完成后再进行一些处理,该怎么办?"
我花了一个小时或两个小时,但我最终找到了Zip功能,它帮助我轻松地:
Observable<Integer> one = getNumberedObservable(1);
Observable<Integer> two = getNumberedObservable(2);
Observable<Integer> three = getNumberedObservable(3);
Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
@Override
public Integer call(Integer arg0, Integer arg1, Integer arg2) {
System.out.println("Zip0: " + arg0);
System.out.println("Zip1: " + arg1);
System.out.println("Zip2: " + arg2);
return arg0 + arg1 + arg2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer arg0) {
System.out.println("Zipped Result: " + arg0);
}
});
public static Observable<Integer> getNumberedObservable(final int value) {
return Observable.create(new OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(Observer<? super Integer> observer) {
observer.onNext(value);
observer.onCompleted();
return Subscriptions.empty();
}
});
}
Run Code Online (Sandbox Code Playgroud)
大!这太酷了.
因此,当我压缩3个可观察量时,它们会串行运行.如果我希望它们同时并行运行,那么我最终会得到更快的结果呢?我玩了几件事,甚至尝试阅读人们用C#编写的一些原始RX内容.我确信有一个简单的答案.谁能指出我正确的方向?这样做的正确方法是什么?
Jam*_*rld 18
zip 确实并行地运行了observable - 但它也连续地订阅了它们.由于您getNumberedObservable在订阅方法中完成,因此给出了连续运行的印象,但实际上没有这样的限制.
您可以尝试使用一些长期运行的Observable,这些Observable比其订阅逻辑更长,例如timer,或者使用该subscribeOn方法异步订阅传递给每个流zip.
在RxJava中,使用toAsync将常规函数转换为将在线程上运行并将其结果返回到observable中的函数.
我不太了解Java语法,但它看起来像:
public static Integer getNumber(final int value) { return value; }
public static Observable<Integer> getNumberedObservable(final int value) {
return rx.util.functions.toAsync(new Func<Integer,Integer>() {
@Override
public Integer call(Integer value) { return getNumber(value); }
});
};
Run Code Online (Sandbox Code Playgroud)
如果getNumber真的访问数据库,这将工作.当你调用getNumberedObservable它时,返回一个observable,getNumber当你订阅它时,它将在一个单独的线程上运行.