我们可以使用cache()运算符来避免多次执行长任务(http请求),并重用其结果:
Observable apiCall = createApiCallObservable().cache(); // notice the .cache()
---------------------------------------------
// the first time we need it
apiCall.andSomeOtherStuff()
.subscribe(subscriberA);
---------------------------------------------
//in the future when we need it again
apiCall.andSomeDifferentStuff()
.subscribe(subscriberB);
Run Code Online (Sandbox Code Playgroud)
第一次执行http请求,但第二次,因为我们使用了cache()运算符,请求将不会被执行,但我们将能够重用第一个结果.
第一个请求成功完成时,此工作正常.但是如果在第一次尝试中调用了onError,那么下次新订阅者订阅同一个observable时,将再次调用onError而不再尝试http请求.
我们要做的是,如果第一次调用onError,那么下次有人订阅同一个observable时,将从头开始尝试http请求.即,observable将仅缓存成功的api调用,即调用onCompleted的调用.
关于如何进行的任何想法?我们尝试使用retry()和cache()运算符没有太多运气.
在沙发基地,Observables
有什么区别:Schedulers.io()和Schedulers.computation()
我有一个绑定服务,负责下载文件,因此它知道下载状态/进度.UI(片段或活动)必须显示/更新服务的下载进度.
实际上我认为常用的方法是使用BroadcastReciever或CallBack来自Activity.但我听说过使用RxJava(ReactiveX编程)和中介类(以及Dagger将其注入服务和活动),如下所述.
所以我的问题是如何使用这些东西来处理RxJava?任何代码示例?有没有比使用意图更有效的方法?
资源: 从Service更新UI的有效方式比意图更新?[见第一个答案更新]
我有一种预感,对于RxJava中的高度计算,并行化任务,传统的ExecutorService将比a更快Scheduler.
我有一个理论,这个代码
Observable<MyItem> source = ...
source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.computation()))
.subscribe();
Run Code Online (Sandbox Code Playgroud)
会跑得比这慢
final ExecutorService svc = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
Observable<MyItem> source = ...
source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.from(svc)))
.finallyDo(svc::shutdown)
.subscribe();
Run Code Online (Sandbox Code Playgroud)
我将这两种方法与我在工作中进行的典型并行处理进行了比较,得到了以下结果.
EXECUTOR
START: 2016-01-25T09:47:04.350
END: 2016-01-25T09:48:37.181
TOTAL TIME (SEC): 92
COMPUTATION SCHEDULER
START: 2016-01-25T09:50:37.799
END: 2016-01-25T09:54:23.674
TOTAL TIME (SEC): 225
Run Code Online (Sandbox Code Playgroud)
所以我的粗略测试表明传统ExecutorService比Scheduler计算更快.
这些结果有原因吗?RxJava调度程序是否未针对并行化进行优化?我得到的结论是计算调度程序使用的线程比执行程序少.
在我们的应用程序中,我们遇到了一个特例 - 如果App.specialFlag == true我们需要,我们需要停止来自代码的任何请求.我们认为,在这种情况下,最好的方法是包括特殊的Interceptor,这将阻止我们的任何请求,如下所示:
if (App.specialFlag) {
// Somehow stop request
} else {
return chain.proceed(chain.request());
}
Run Code Online (Sandbox Code Playgroud)
另外,我们应该注意,我们RxJavaCallAdapterFactory用于将响应包装到Observables中.
但我们没有看到任何在OkHttp拦截器中停止请求的方法.
我们提出了两种方法来解决我们的问题:
a)创建特殊ApiService.class用于包装我们的API中的每个请求,如下所示:
public Observable<User> getUserDetails() {
if (App.specialFlag) {
return Observable.just(null);
}
return api.getUserDetails();
}
Run Code Online (Sandbox Code Playgroud)
但我们认为这是一个丑陋而繁琐的解决方案.
b)RuntimeException在Interceptor中抛出一个,然后在我们使用API的每个地方捕获它.
第二种解决方案也不好,因为我们需要onErrorResumeNext在链中包含很多运算符.
可能有人知道,如何以更"聪明"的方式处理我们的情况?
提前致谢!
如何在收到onNext()后自动取消订阅?
现在我用这个代码:
rxObservable
.compose(bindToLifecycle()) // unsubscribe automatically in onPause() if method was called in onResume()
.subscribe(new Subscriber<Object>() {
...
@Override
public void onNext(Object o) {
unsubscribe();
}
});
Run Code Online (Sandbox Code Playgroud) 我想调用一个函数(同步),然后使用它的返回值作为初始发射(随后将一些其他运算符链接到生成的observable上).
我想在订阅期间调用此函数,所以我不能只使用Observable.of(() => getSomeValue()).我已经看过bindCallback(之前的fromCallback),但我不认为它可以用于这项任务(如果我错了,请纠正我).我start在v4文档中看到了静态运算符,但显然它没有在v5中实现(并且没有表明它在路上).RxJava也有fromCallable运营商完成那个afaik.
我能想到的只有这样:
Observable.create((observer: Observer<void>) => {
let val = getSomeValue();
observer.next(val);
observer.complete();
})
Run Code Online (Sandbox Code Playgroud)
我认为就是这样.但是这对于简单的事情来说似乎很复杂,应该是这样的Observable.fromFunction(() => getSomeValue()).如果我想异步运行它,就像start运算符一样?如何在RxJS的当前版本中执行此操作?
我正在寻找debounce一系列事件的运营商,让我们说用户的点击.输入和输出应该是这样的:
interval : -> <- -> <-
in : 1--2--3-------4--5--5--6-7-8--------
out : 1-------------4---------------------
Run Code Online (Sandbox Code Playgroud)
这个想法就像下划线的immediate选择on http://underscorejs.org/#debounce一样.可以使用支持Reactive Extensions的任何语言呈现/实现运算符
编辑:澄清间隔,比如说5秒(两个箭头之间有5个空格): -> <-
Edit2:一个更容易理解的版本:我有一个用户,他反复点击一个按钮(1,2,3); 我想抓住第一个click(1)并忽略其余部分.过了一会儿,他累了,休息了7秒钟(这比两个箭头之间的5秒间隔长)并继续再次点击按钮(4,5,6,7,8)我想赶上第一个click(4并忽略其余的.
如果他在第四个箭头后点击,我也想抓住那个点击.
Edit3:这是一张图片
可以在原始文章中找到
当我们使用retrofit2与Rx进行API休息调用时,使用Single或Observable的最佳方法是什么?
public interface ApiService{
Single<Data> getDataFromServer();
Observable<Data> getDataFromServer();
}
Run Code Online (Sandbox Code Playgroud) 我们正在为我们的一个项目评估反应式编程框架.我刚刚学习了vert.x教程.我检查了一下RxJava的介绍.RxJava似乎更接近CompletableFuture.但是,尽管存在底层模式,RxJava和Vert.x都可以访问非阻塞编程.我很困惑他们之间有什么区别.在这方面,我将不胜感激.
rx-java ×10
rx-android ×4
android ×3
rxjs ×2
concurrency ×1
couchbase ×1
debounce ×1
java ×1
javascript ×1
observable ×1
okhttp ×1
retrofit ×1
retrofit2 ×1
rxjs5 ×1
ui-thread ×1
vert.x ×1