所有观察者都取消订阅Observable是否有事件发生?

Tes*_*ere 2 rx-java

我有一个Observable我想要冷,也就是说,它应该只在第一个观察者订阅它时才开始发射物品.

然后,我想确保在所有观察者取消订阅同一个observable时从源中释放所有资源.那可能吗?

Jos*_*ozo 5

您可以使用ConnectedObservable的功能来为您处理:

//Replace Observable.range(1,1000) with your Observable implementation
Observable.range(1, 1000).doOnUnsubscribe(() -> freeResources()).share();
Run Code Online (Sandbox Code Playgroud)

share方法调用方法publishrefCount.

publish将您的"普通"Observable转换为ConnectedObservable,它将在您调用时开始发出项目connect.因此,您可以在技术上订阅任意数量的观察者,然后调用connect以同时开始为所有观察者发射项目.

refCount将您的ConnectedObservable再次转换为传统的,但具有新特性!增加的好处是:这个observable现在很冷(只有在订阅者订阅时才开始发出,在内部调用connect创建的原始ConnectedObservable 的方法publish),并跟踪有多少订阅者连接到原始的ConnectedObservable.一旦所有订阅者都取消订阅,它将取消订阅源ConnectedObservable,因此逻辑变得更加简单,因为您只需要处理一个订阅.

这里有一个很好的共享操作图:http://reactivex.io/RxJava/javadoc/rx/Observable.html#share()

或者,如果这不够灵活,我认为您应该能够通过使用defer以创建冷可观察对象doOnSubscribedoOnUnsubscribe方法来轻松实现此行为.

例:

    Observable.defer(() -> {
        final AtomicInteger counter = new AtomicInteger();
        return Observable.range(1, 1000)
                .doOnSubscribe(() -> counter.incrementAndGet())
                .doOnUnsubscribe(() -> {
                    if (counter.decrementAndGet() == 0) {
                        freeResources();
                    }
                });
    });
Run Code Online (Sandbox Code Playgroud)

一旦第一个订阅者订阅,这个observable将开始发出一系列数字(用你的可观察实现替换它),它将增加每个订阅的计数器,并在所有订阅者取消订阅后释放使用过的资源(替换freeResources用于任何你需要).