Dan*_*okh 10 networking android long-polling rx-java retrofit
我真的很喜欢RxJava,它是一个很棒的工具,但有些时候很难理解它是如何工作的.我们在Android项目中使用Retrofit和RxJava,并且有以下用例:
我需要轮询服务器,重试之间有一些延迟,而服务器正在做一些工作.服务器完成后,我必须提供结果.所以我用RxJava成功完成了它,这里是代码片段:我用"skipWhile"和"repeatWhen"
Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob)
.skipWhile(new Func1<CheckJobResponse, Boolean>() {
@Override
public Boolean call(CheckJobResponse checkJobResponse) {
boolean shouldSkip = false;
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus());
switch (checkJobResponse.getJobStatus()){
case CheckJobResponse.PROCESSING:
shouldSkip = true;
break;
case CheckJobResponse.DONE:
case CheckJobResponse.ERROR:
shouldSkip = false;
break;
}
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip);
return shouldSkip;
}
})
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable);
return observable.delay(1, TimeUnit.SECONDS);
}
}).subscribe(new Subscriber<CheckJobResponse>(){
@Override
public void onNext(CheckJobResponse response) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response);
}
@Override
public void onError(BaseError error) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error);
Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show();
}
@Override
public void onCompleted() {
if (SHOW_LOGS) Logger.v(TAG, "onCompleted");
}
});
Run Code Online (Sandbox Code Playgroud)
代码工作正常:
当服务器响应该作业正在处理时,我从"skipWhile"链返回"true",原始的Observable等待1秒并再次执行http请求.重复此过程,直到我从"skipWhile"链返回"false".
这是我不明白的一些事情:
我在"skipWhile"的文档中看到它不会从原始Observable发出任何内容(onError,onNext,onComplete),直到我从其"call"方法返回"false".所以如果它没有发出任何东西为什么"repeatWhen"Observable做它的工作?它等待一秒钟并再次运行请求.是谁推出的?
第二个问题是:为什么"repeatWhen"中的Observable不会永远运行,我的意思是当我从"skipWhile"返回"false"时它为什么会停止重复?如果我返回"false",我会在订阅者中成功获取onNext.
在"repeatWhile"的文档中,它说我最终在订阅者中调用了"onComplete",但从未调用过"onComplete".
如果我改变链接"skipWhile"和"repeatWhen"的顺序没有区别.这是为什么 ?
我知道RxJava是开源的,我只能阅读代码,但正如我所说 - 这真的很难理解.
谢谢.
Dan*_*Lew 14
我之前没有合作repeatWhen过,但这个问题让我很好奇,所以我做了一些研究.
skipWhile 并发出onError和onCompleted,即使它永远不会返回true之前然后.因此,repeatWhen每次checkJob()发出都会被调用onCompleted.这回答了问题#1.
其余的问题都是基于错误的假设.您的订阅永远在运行,因为您repeatWhen永远不会终止.那是因为repeatWhen比你意识到的更复杂的野兽.在Observable它发射null时它会onCompleted从源头.如果您接受并返回onCompleted则结束,否则如果您发出任何内容则会重试.由于delay只需要发射并延迟它,它总是会null再次发射.因此,它不断重新订阅.
那么#2的答案就是它永远在运行; 您可能正在执行此代码之外的其他操作来取消订阅.对于#3,你永远不会得到onCompleted因为它永远不会完成.对于#4,订单无关紧要,因为您无限期地重复.
现在的问题是,你如何得到正确的行为?它就像使用takeUntil而不是使用一样简单skipWhile.这样,你不断重复,直到得到你想要的结果,从而在你希望它结束时终止流.
这是一个代码示例:
Observable<Boolean> source = ...; // Something that eventually emits true
source
.repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS))
.takeUntil(result -> result)
.filter(result -> result)
.subscribe(
res -> System.out.println("onNext(" + res + ")"),
err -> System.out.println("onError()"),
() -> System.out.println("onCompleted()")
);
Run Code Online (Sandbox Code Playgroud)
在这个例子中,source是发射布尔.我每1秒重复一次,直到信号源发出true.我继续服用直到result为true.我会过滤掉所有通知false,因此订阅者直到收到通知才会收到通知true.
| 归档时间: |
|
| 查看次数: |
3417 次 |
| 最近记录: |