如何在RxJava中的自定义Observable中获得观察者取消订阅操作的通知

vic*_*orx 17 observable rx-java

我正在尝试将一些基于侦听器模式的API包装到Observable中.我的代码大致如下.

def myObservable = Observable.create({ aSubscriber ->
    val listener = {event -> 
      aSubscriber.onNext(event);                
    }
    existingEventSource.addListener(listener)
})
Run Code Online (Sandbox Code Playgroud)

但是,当观察者调用subscription.unscribe()时,我希望我的observable立即从底层的existingEventSource中删除监听器.我怎么能实现这个目标?

Mig*_*gne 29

Subscriber抽象类实际上有一个方法add可以让你添加SubscriptionS的将取消订阅与订户.

def myObservable = Observable.create({ aSubscriber ->
    val listener = {event -> 
      aSubscriber.onNext(event);                
    }
    existingEventSource.addListener(listener)

    // Adds a lambda to be executed when the Subscriber un-subscribes from your Observable
    aSubscriber.add(Subscriptions.create(() -> existingEventSource.removeListener(listener)));
})
Run Code Online (Sandbox Code Playgroud)

想想订阅你aSubscriberObserver那个Observable; 我们称之为Subscriber.只要Subscriber仍然订阅了Observable,就Observable可以发出值.但是当那个Subscriber未订阅的时候它应该停止.但是如果我们想在Subscriber取消订阅时收到通知,我们可以注册一个Action在它发生时运行.这就是该add方法的用途.正如@dwursteisen在评论中提到的那样; 你基本上注册了一个lambda,它将在订阅者取消订阅时执行.

也可以在不同的调度程序上取消订阅订阅.见MainThreadSubscriptionrxanroid项目如何做到这一点的例子.

这是一个如何使用它来取消订阅主线程的示例

aSubscriber.add(new MainThreadSubscription() {
    @Override
    protected void onUnsubscribe() {
        existingEventSource.removeListener(listener);
    }
});
Run Code Online (Sandbox Code Playgroud)

  • 现在如何在RxJava2中完成? (4认同)