具有多个订户的单个可观察者

Pav*_*los 27 android rx-java retrofit

我有一个Observable<<List<Foo>> getFoo()从Retrofit服务创建的,在调用该 .getFoo()方法后,我需要与多个订阅者共享它..share()但是,调用该方法会导致重新执行网络呼叫.重播运算符也不起作用.我知道可能有一个潜在的解决方案.cache(),但我不知道为什么会出现这种行为.

// Create an instance of our GitHub API interface.
Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(API_URL)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .build();

// Create a call instance for looking up Retrofit contributors.
Observable<List<Contributor>> testObservable = retrofit
        .create(GitHub.class)
        .contributors("square", "retrofit")
        .share();

Subscription subscription1 = testObservable
       .subscribe(new Subscriber<List<Contributor>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(List<Contributor> contributors) {
                System.out.println(contributors);
            }
         });

Subscription subscription2 = testObservable
        .subscribe(new Subscriber<List<Contributor>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(List<Contributor> contributors) {
                System.out.println(contributors + " -> 2");
            }
         });

subscription1.unsubscribe();
subscription2.unsubscribe();
Run Code Online (Sandbox Code Playgroud)

上面的代码可以重现上述行为.您可以对其进行调试,并查看收到的列表属于不同的MemoryAddress.

我还将ConnectableObservables看作是一个潜在的解决方案,但是这需要我携带原始的observable,.connect()每次我想添加一个新的Subscriber时调用.

这种行为.share()一直很好,直到改造1.9.它停止了Retrofit 2 - beta的工作.我还没有使用几小时前发布的Retrofit 2 Release Version进行测试.

编辑:01/02/2017

对于未来的读者,我在这里写了一篇文章,详细解释了这个案例!

dav*_*ola 40

在与RxJava开发人员DávidKarnok核实后,我想提出一个完整的解释,说明这里发生了什么.

share()被定义为publish().refCount(),即源Observable首先被转换为a ConnectableObservable,publish()而不是必须connect()"手动" 调用该部分被处理refCount().特别是,refCount将调用connect()ConnectableObservable时,它本身接收第一签约; 那么,只要有至少一个用户,它就会保留订阅; 最后,当订阅者数量下降到0时,它将取消订阅.对于 Observables,就像Retrofit返回的那样,这将停止任何正在运行的计算.

如果在其中一个用户出现其中一个周期之后,refCount将再次调用connect并因此触发对源Observable的新订阅.在这种情况下,它将触发另一个网络请求.

现在,这通常不会在Retrofit 1(实际上是此提交之前的任何版本)中变得明显,因为默认情况下这些旧版本的Retrofit 会将所有网络请求移动到另一个线程.这通常意味着您的所有subscribe()调用都会在第一个请求/ Observable仍在运行时发生,因此新的Subscribers只会被添加到其中refCount,因此不会触发其他请求/ Observables.

然而,较新版本的Retrofit不会默认将工作移动到另一个线程 - 例如,您必须通过调用来明确地执行此操作subscribeOn(Schedulers.io()).如果你不这样做,那么一切都将保留在当前线程中,这意味着第二个subscribe()只会在第一个Observable调用之后发生onCompleted,因此在Subscribers取消订阅并且一切都关闭之后.现在,正如我们在第一段中所看到的,当第二段subscribe()被调用时,share()别无选择,只能使另一个Subscription源到Observable并触发另一个网络请求.

因此,要回到您在Retrofit 1中习惯的行为,只需添加即可subscribeOn(Schedulers.io()).

这应该导致只执行网络请求 - 大多数时候.原则上,您仍然可以获得多个请求(并且您可以使用Retrofit 1),但仅当您的网络请求非常快且/或subscribe()调用发生相当大的延迟时,才会再次完成第一个请求当第二次subscribe()发生时.

因此,Dávid建议使用cache()(但它有你提到的缺点)或replay().autoConnect().根据这些发行说明,autoConnect工作只有前半部分refCount,或者更准确地说,它是

类似于refCount()的行为,除了它在订阅者丢失时不会断开连接.

这意味着只有在第一次subscribe()发生时才触发请求,但随后所有Subscriber发出的项都将接收所有发出的项目,无论是否在0个订户之间的任何时间都有.

  • 大!我只是想添加这个解释,因为`replay`,`share`,`publish`等等都足够复杂,并且对边缘情况的详细解释并没有什么坏处. (2认同)
  • 确实很棒的答案:) (2认同)

Joh*_*wUs 27

你似乎(隐含地)将你的ConnectedObservable回归投射.share()回正常状态Observable.您可能想要了解热和冷可观测量之间的区别.

尝试

ConnectedObservable<List<Contributor>> testObservable = retrofit
        .create(GitHub.class)
        .contributors("square", "retrofit")
        .share();

Subscription subscription1 = testObservable
   .subscribe(new Subscriber<List<Contributor>>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onNext(List<Contributor> contributors) {
        System.out.println(contributors);
    }
});

Subscription subscription2 = testObservable
        .subscribe(new Subscriber<List<Contributor>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(List<Contributor> contributors) {
                System.out.println(contributors + " -> 2");
            }
        });

testObservable.connect();
subscription1.unsubscribe();
subscription2.unsubscribe();
Run Code Online (Sandbox Code Playgroud)

编辑:connect()每次想要新订阅时都不需要调用,只需启动它就可以启动observable.我想你可以replay()用来确保所有后续订阅者都能获得所有项目

ConnectedObservable<List<Contributor>> testObservable = retrofit
        .create(GitHub.class)
        .contributors("square", "retrofit")
        .share()
        .replay()
Run Code Online (Sandbox Code Playgroud)