Pav*_*los 27 android rx-java retrofit
我有一个Observable<<List<Foo>> getFoo()从Retrofit服务创建的,在调用该
.getFoo()方法后,我需要与多个订阅者共享它..share()但是,调用该方法会导致重新执行网络呼叫.重播运算符也不起作用.我知道可能有一个潜在的解决方案.cache(),但我不知道为什么会出现这种行为.
// Create an instance of our GitHub API interface.
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(API_URL)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
// Create a call instance for looking up Retrofit contributors.
Observable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.share();
Subscription subscription1 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors);
}
});
Subscription subscription2 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors + " -> 2");
}
});
subscription1.unsubscribe();
subscription2.unsubscribe();
Run Code Online (Sandbox Code Playgroud)
上面的代码可以重现上述行为.您可以对其进行调试,并查看收到的列表属于不同的MemoryAddress.
我还将ConnectableObservables看作是一个潜在的解决方案,但是这需要我携带原始的observable,.connect()每次我想添加一个新的Subscriber时调用.
这种行为.share()一直很好,直到改造1.9.它停止了Retrofit 2 - beta的工作.我还没有使用几小时前发布的Retrofit 2 Release Version进行测试.
编辑:01/02/2017
dav*_*ola 40
在与RxJava开发人员DávidKarnok核实后,我想提出一个完整的解释,说明这里发生了什么.
share()被定义为publish().refCount(),即源Observable首先被转换为a ConnectableObservable,publish()而不是必须connect()"手动" 调用该部分被处理refCount().特别是,refCount将调用connect()上ConnectableObservable时,它本身接收第一签约; 那么,只要有至少一个用户,它就会保留订阅; 最后,当订阅者数量下降到0时,它将取消订阅.对于冷 Observables,就像Retrofit返回的那样,这将停止任何正在运行的计算.
如果在其中一个用户出现其中一个周期之后,refCount将再次调用connect并因此触发对源Observable的新订阅.在这种情况下,它将触发另一个网络请求.
现在,这通常不会在Retrofit 1(实际上是此提交之前的任何版本)中变得明显,因为默认情况下这些旧版本的Retrofit 会将所有网络请求移动到另一个线程.这通常意味着您的所有subscribe()调用都会在第一个请求/ Observable仍在运行时发生,因此新的Subscribers只会被添加到其中refCount,因此不会触发其他请求/ Observables.
然而,较新版本的Retrofit不会默认将工作移动到另一个线程 - 例如,您必须通过调用来明确地执行此操作subscribeOn(Schedulers.io()).如果你不这样做,那么一切都将保留在当前线程中,这意味着第二个subscribe()只会在第一个Observable调用之后发生onCompleted,因此在Subscribers取消订阅并且一切都关闭之后.现在,正如我们在第一段中所看到的,当第二段subscribe()被调用时,share()别无选择,只能使另一个Subscription源到Observable并触发另一个网络请求.
因此,要回到您在Retrofit 1中习惯的行为,只需添加即可subscribeOn(Schedulers.io()).
这应该导致只执行网络请求 - 大多数时候.原则上,您仍然可以获得多个请求(并且您可以使用Retrofit 1),但仅当您的网络请求非常快且/或subscribe()调用发生相当大的延迟时,才会再次完成第一个请求当第二次subscribe()发生时.
因此,Dávid建议使用cache()(但它有你提到的缺点)或replay().autoConnect().根据这些发行说明,autoConnect工作只有前半部分refCount,或者更准确地说,它是
类似于refCount()的行为,除了它在订阅者丢失时不会断开连接.
这意味着只有在第一次subscribe()发生时才触发请求,但随后所有Subscriber发出的项都将接收所有发出的项目,无论是否在0个订户之间的任何时间都有.
Joh*_*wUs 27
你似乎(隐含地)将你的ConnectedObservable回归投射.share()回正常状态Observable.您可能想要了解热和冷可观测量之间的区别.
尝试
ConnectedObservable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.share();
Subscription subscription1 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors);
}
});
Subscription subscription2 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors + " -> 2");
}
});
testObservable.connect();
subscription1.unsubscribe();
subscription2.unsubscribe();
Run Code Online (Sandbox Code Playgroud)
编辑:connect()每次想要新订阅时都不需要调用,只需启动它就可以启动observable.我想你可以replay()用来确保所有后续订阅者都能获得所有项目
ConnectedObservable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.share()
.replay()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
12366 次 |
| 最近记录: |