使用已处置的观察者不会重新订阅源

Har*_*rry 7 java android rx-android rx-java2

我正在尝试通过仅创建一个实例并将其通过流上的方法通过它们的实例来重用Observerfor SingleObservablestream :DisposableSingleObserver/DisposableObserversubscribeWith()

public class SomeClass {
    private DisposableSingleObserver<Object> observer;

    public SomeClass() {
        observer = new DisposableSingleObserver<Object>() {
            @Override
            public void onSuccess(Object object) {
                ...
            }

            @Override
            public void onError(Throwable throwable) {
                ...
            }
        };
    }

    public void doSomeStuff() {
        singleStream.subscribeOn(...)
            .observeOn(...)
            .subscribeWith(observer);
    }
}
Run Code Online (Sandbox Code Playgroud)

上面的代码导致ProtocolViolationException当我尝试通过消息多次订阅单个观察者实例时:

io.reactivex.exceptions.ProtocolViolationException:不允许多次使用com.package.name.SomeClass $ 1进行订阅。请创建com.package.name.SomeClass $ 1的新实例,然后将其订阅到目标源。

因此,我对代码进行了如下修改:

public class SomeClass {
    ...

    public void doSomeStuff() {
        if (observer != null) {
            observer.dispose();
        }

        singleStream.subscribeOn(...)
            .observerOn(...)
            .subscribeWith(observer);
    }
}
Run Code Online (Sandbox Code Playgroud)

当我执行上述代码时,ProtocolViolationException不再抛出,并且我能够成功地从单个流中获取事件。但是当我尝试doSomeStuff()两次调用方法时,它在第一次调用时成功完成,但是在第二次调用时未发出任何事件。我能够确认doOnSubscribe()两次都发生了订阅,但是单个流的代码发出事件从未在第二次调用时执行。所以我有三个问题:

  1. 为什么ProtocolViolationException首先抛出?
  2. 即使在两种情况下我们都订阅了源文件,为什么我仍能在第一次订阅时获得事件,而在第二次订阅上却没有?
  3. 如何仅使用一个实例来重用观察者?

aka*_*okd 5

1)不允许重用DisposableSingleObserver和及其表亲,因为它们是有状态的,只能使用一次。这是由于该协议强制要求的Single:恰好一个,onSubscribe然后最多一个onSuccess或一个onError。第二次订阅违反了该协议。

2)DisposableSingleObserver将其置于已处置状态,任何后续的订阅尝试都将被视为立即处置。

3)你不应该。为什么要多次订阅,为什么DisposableSingleObserver每次都不能使用新订阅。您应该重新考虑数据流。