标签: rx-java

可观察,仅在完成时重试错误和缓存

我们可以使用cache()运算符来避免多次执行长任务(http请求),并重用其结果:

Observable apiCall = createApiCallObservable().cache(); // notice the .cache()

---------------------------------------------
// the first time we need it
apiCall.andSomeOtherStuff()
               .subscribe(subscriberA);

---------------------------------------------
//in the future when we need it again
apiCall.andSomeDifferentStuff()
               .subscribe(subscriberB);
Run Code Online (Sandbox Code Playgroud)

第一次执行http请求,但第二次,因为我们使用了cache()运算符,请求将不会被执行,但我们将能够重用第一个结果.

第一个请求成功完成时,此工作正常.但是如果在第一次尝试中调用了onError,那么下次新订阅者订阅同一个observable时,将再次调用onError而不再尝试http请求.

我们要做的是,如果第一次调用onError,那么下次有人订阅同一个observable时,将从头开始尝试http请求.即,observable将仅缓存成功的api调用,即调用onCompleted的调用.

关于如何进行的任何想法?我们尝试使用retry()和cache()运算符没有太多运气.

rx-java rx-android

15
推荐指数
3
解决办法
4402
查看次数

Schedulers.io()和Schedulers.computation()之间的区别是什么

在沙发基地,Observables

有什么区别:Schedulers.io()和Schedulers.computation()

observable couchbase rx-java

15
推荐指数
2
解决办法
6954
查看次数

如何使用RxJava/RxAndroid从Android服务更新UI

我有一个绑定服务,负责下载文件,因此它知道下载状态/进度.UI(片段或活动)必须显示/更新服务的下载进度.

实际上我认为常用的方法是使用BroadcastRecieverCallBack来自Activity.但我听说过使用RxJava(ReactiveX编程)和中介类(以及Dagger将其注入服务和活动),如下所述.

所以我的问题是如何使用这些东西来处理RxJava?任何代码示例?有没有比使用意图更有效的方法?

资源: 从Service更新UI的有效方式比意图更新?[见第一个答案更新]

android ui-thread android-service rx-java rx-android

15
推荐指数
2
解决办法
6821
查看次数

RxJava - 调度程序与ExecutorService?

我有一种预感,对于RxJava中的高度计算,并行化任务,传统的ExecutorService将比a更快Scheduler.

我有一个理论,这个代码

Observable<MyItem> source = ...

source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.computation()))
.subscribe();
Run Code Online (Sandbox Code Playgroud)

会跑得比这慢

final ExecutorService svc = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
Observable<MyItem> source = ...

source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.from(svc)))
.finallyDo(svc::shutdown)
.subscribe();
Run Code Online (Sandbox Code Playgroud)

我将这两种方法与我在工作中进行的典型并行处理进行了比较,得到了以下结果.

EXECUTOR

START: 2016-01-25T09:47:04.350
END: 2016-01-25T09:48:37.181
TOTAL TIME (SEC): 92


COMPUTATION SCHEDULER

START: 2016-01-25T09:50:37.799
END: 2016-01-25T09:54:23.674
TOTAL TIME (SEC): 225
Run Code Online (Sandbox Code Playgroud)

所以我的粗略测试表明传统ExecutorServiceScheduler计算更快.

这些结果有原因吗?RxJava调度程序是否未针对并行化进行优化?我得到的结论是计算调度程序使用的线程比执行程序少.

java concurrency rx-java

15
推荐指数
1
解决办法
7026
查看次数

我们是否有可能在OkHttp拦截器中停止请求?

在我们的应用程序中,我们遇到了一个特例 - 如果App.specialFlag == true我们需要,我们需要停止来自代码的任何请求.我们认为,在这种情况下,最好的方法是包括特殊的Interceptor,这将阻止我们的任何请求,如下所示:

if (App.specialFlag) {
    // Somehow stop request
} else {
    return chain.proceed(chain.request());
}
Run Code Online (Sandbox Code Playgroud)

另外,我们应该注意,我们RxJavaCallAdapterFactory用于将响应包装到Observables中.

但我们没有看到任何在OkHttp拦截器中停止请求的方法.

我们提出了两种方法来解决我们的问题:

a)创建特殊ApiService.class用于包装我们的API中的每个请求,如下所示:

public Observable<User> getUserDetails() {
    if (App.specialFlag) {
        return Observable.just(null);
    }
    return api.getUserDetails();
}
Run Code Online (Sandbox Code Playgroud)

但我们认为这是一个丑陋而繁琐的解决方案.

b)RuntimeException在Interceptor中抛出一个,然后在我们使用API​​的每个地方捕获它.

第二种解决方案也不好,因为我们需要onErrorResumeNext在链中包含很多运算符.

可能有人知道,如何以更"聪明"的方式处理我们的情况?

提前致谢!

android rx-java retrofit okhttp

15
推荐指数
2
解决办法
6978
查看次数

Rx - 收到onNext()后如何自动取消订阅?

如何在收到onNext()后自动取消订阅?

现在我用这个代码:

rxObservable
.compose(bindToLifecycle()) // unsubscribe automatically in onPause() if method was called in onResume()
.subscribe(new Subscriber<Object>() {
     ...
     @Override
     public void onNext(Object o) {
         unsubscribe();
     }
 });
Run Code Online (Sandbox Code Playgroud)

rx-java rx-android

15
推荐指数
1
解决办法
5592
查看次数

如何从函数创建Observable?

我想调用一个函数(同步),然后使用它的返回值作为初始发射(随后将一些其他运算符链接到生成的observable上).

我想在订阅期间调用此函数,所以我不能只使用Observable.of(() => getSomeValue()).我已经看过bindCallback(之前的fromCallback),但我不认为它可以用于这项任务(如果我错了,请纠正我).我start在v4文档中看到了静态运算符,但显然它没有在v5中实现(并且没有表明它在路上).RxJava也有fromCallable运营商完成那个afaik.

我能想到的只有这样:

Observable.create((observer: Observer<void>) => {
  let val = getSomeValue();
  observer.next(val);
  observer.complete();
})
Run Code Online (Sandbox Code Playgroud)

我认为就是这样.但是这对于简单的事情来说似乎很复杂,应该是这样的Observable.fromFunction(() => getSomeValue()).如果我想异步运行它,就像start运算符一样?如何在RxJS的当前版本中执行此操作?

javascript system.reactive rxjs rx-java rxjs5

15
推荐指数
3
解决办法
1万
查看次数

立即在Rx中去抖动

我正在寻找debounce一系列事件的运营商,让我们说用户的点击.输入和输出应该是这样的:

interval :      ->     <-            ->     <-
in       : 1--2--3-------4--5--5--6-7-8--------
out      : 1-------------4---------------------
Run Code Online (Sandbox Code Playgroud)

这个想法就像下划线的immediate选择on http://underscorejs.org/#debounce一样.可以使用支持Reactive Extensions的任何语言呈现/实现运算符

编辑:澄清间隔,比如说5秒(两个箭头之间有5个空格): -> <-

Edit2:一个更容易理解的版本:我有一个用户,他反复点击一个按钮(1,2,3); 我想抓住第一个click(1)并忽略其余部分.过了一会儿,他累了,休息了7秒钟(这比两个箭头之间的5秒间隔长)并继续再次点击按钮(4,5,6,7,8)我想赶上第一个click(4并忽略其余的.

如果他在第四个箭头后点击,我也想抓住那个点击.

Edit3:这是一张图片 图片可以在原始文章中找到

system.reactive rxjs rx-java debounce

15
推荐指数
2
解决办法
9979
查看次数

Rx 2 Android对于api调用有什么更好的Single或Observable?

当我们使用retrofit2与Rx进行API休息调用时,使用Single或Observable的最佳方法是什么?

public interface ApiService{

Single<Data> getDataFromServer();

Observable<Data> getDataFromServer();
}
Run Code Online (Sandbox Code Playgroud)

android rx-java rx-android retrofit2

15
推荐指数
2
解决办法
3674
查看次数

vert.x和RxJava之间的区别

我们正在为我们的一个项目评估反应式编程框架.我刚刚学习了vert.x教程.我检查了一下RxJava的介绍.RxJava似乎更接近CompletableFuture.但是,尽管存在底层模式,RxJava和Vert.x都可以访问非阻塞编程.我很困惑他们之间有什么区别.在这方面,我将不胜感激.

vert.x rx-java

15
推荐指数
3
解决办法
5930
查看次数