如何刷新RxJava中的observable?

Pet*_*toU 6 android functional-programming reactive-programming rx-java

我有一个observable,它包装了一个HTTP请求

mObservable =  retryObservable(mService.getAddressList(getUserId(), true, 1, Integer.MAX_VALUE, "id", true)
            .map(r -> {
                return r.getItems();
            })
            .observeOn(AndroidSchedulers.mainThread()));
Run Code Online (Sandbox Code Playgroud)

然后订阅

mSubscription = mObservable.subscribe(items -> {
                mAddressAdapter.swapItems(items);
            }, getActivityBase()::showError);
Run Code Online (Sandbox Code Playgroud)

订阅初始化到来时,将激活cold observable并触发HTTP请求.现在,我知道基础数据已经发生了变化,我需要做出同样的新请求.我试过了

mSubscription.unsubscribe();
Run Code Online (Sandbox Code Playgroud)

然后打电话

mObservable.subscribe(items -> {doSomething();})
Run Code Online (Sandbox Code Playgroud)

再次,从我的理解,订阅应该触发可观察,但它不起作用.有什么建议 ?

tom*_*ozb 9

一旦Observable完成它并没有公布任何新项目.这是Rx合同.

将代码包装到方法中,每次都创建一个新的observable.

Observable<?> getObservable() {
    return retryObservable(mService.getAddressList(getUserId(), true, 1, Integer.MAX_VALUE, "id", true)
            .map(r -> {
                return r.getItems();
            })
            .observeOn(AndroidSchedulers.mainThread()));
}
Run Code Online (Sandbox Code Playgroud)


正如@DaveSexton在评论中提到的那样,defer在RxJava中使用函数甚至更好

在订阅者订阅之前不要创建Observable; 在每个订阅上创建一个新的Observable

传递defer()一个Observable工厂函数(一个生成Observables的函数),defer()将返回一个Observable,它将调用此函数,以便在每次新的Subscriber订阅时重新生成其Observable序列.

更多信息:https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#defer