如何在最后一个订阅者取消订阅后延迟拆除共享的无限Observable

And*_*ger 6 android rx-java rx-android

我们在Android App中使用mutliple服务.这些服务提供的数据是无限的Observables,通常通过组合Observables其他服务来构建.这些的构造Observables可能是昂贵的.此外,服务通常在多个地方消费,因此它们Observable应该在订户之间共享.

例:

  • LocationService,提供无限Observable<Location>,发出当前位置
  • ReminderService,提供无限Observable<List<Reminder>>,在数据集中的每次更改后发出所有存储的提醒的列表
  • LocationAwareReminderService,提供了一个无限Observable<List<Reminders>>的附近提醒Observable.combineLatestObservables前两个服务

第一种方法:内部BehaviorSubjects作为缓存

每个服务都结合了消费者Observables并将其内部订阅BehaviorSubject到生成的Feed中.消费者可以订阅这个BehaviorSubject.该LocationAwareReminderService例如:

public class LocationAwareReminderService {

    Observable<List<Reminder>> feed;

    public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
        BehaviorSubject<List<Reminder>> cache = BehaviorSubject.create();
        Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
            @Override
            public List<Reminder> call(List<Reminder> reminders, Location location) {
                return calculateNearbyReminders(reminders, location);
            }
        }).subscribe(cache);

        feed = cache.asObservable();
    }

    public Observable<List<Reminder>> getFeed() {
        return feed;
    }
}
Run Code Online (Sandbox Code Playgroud)

坏处:

  • 因为行为主题,提醒服务和locatoinService的提要永远不会被删除.即使没有消费者
  • 如果它们依赖于经常发布新项目的服务(如LocationService),则这尤其成问题
  • 由于构造函数中的订阅(缓存),即使没有订户存在,服务也会开始计算附近的提醒

优点:

  • 生成的Feed由所有订阅者共享
  • 因为饲料从未被拆除,没有用户的短期不会使整个管道坍塌

第二种方法:重放(1).refCount().

public class LocationAwareReminderService {

    Observable<List<Reminder>> feed;

    public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
        feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
            @Override
            public List<Reminder> call(List<Reminder> reminders, Location location) {
                return calculateNearbyReminders(reminders, location);
            }
        }).replay(1).refCount();
    }

    public Observable<List<Reminder>> getFeed() {
        return feed;
    }
}
Run Code Online (Sandbox Code Playgroud)

坏处:

  • 短期Subscriber内整个管道没有倒塌.在下一个订阅期间,需要重建整个管道.
  • ActivityA到ActivityB的转换(均订阅)LocationAwareReminderService.getFeed()导致管道的完全去除和重建

优点:

  • 在最后一次Subscriber取消订阅之后,LocationAwareReminderService也将取消订阅LocationService.getFeed()reminderService.getFeed() Observables.
  • LocationAwareReminderService只开始后的第一个提供nearbyReminders Subscriber订阅
  • 生成的Feed由所有Subscribers 共享

第三种方法:使用超时取消订阅refCount

因此,我构建了Transformer一个在最后一次Subscriber取消订阅后将订阅保持活动一段时间

public class RxPublishTimeoutCache<T> implements Observable.Transformer<T, T> {

    private long keepAlive;
    private TimeUnit timeUnit;

    public RxPublishTimeoutCache(long keepAlive, TimeUnit timeUnit) {
        this.keepAlive = keepAlive;
        this.timeUnit = timeUnit;
    }

    @Override
    public Observable<T> call(Observable<T> upstream) {

        final Observable<T> sharedUpstream = upstream.replay(1).refCount();

        return Observable.create(new Observable.OnSubscribe<T>() {
            @Override
            public void call(Subscriber<? super T> subscriber) {
                if (subscriber.isUnsubscribed())
                    return;
                // subscribe an empty Subscriber that keeps the subsription of refCount() alive
                final Subscription keepAliveSubscription = sharedUpstream.subscribe(new NopSubscriber<T>());
                // listen to unsubscribe from the subscriber
                subscriber.add(Subscriptions.create(new Action0() {
                    @Override
                    public void call() {
                        // the subscriber unsubscribed
                        Observable.timer(keepAlive, timeUnit).subscribe(new Action1<Long>() {
                            @Override
                            public void call(Long _) {
                                // unsubscribe the keep alive subscription
                                keepAliveSubscription.unsubscribe();
                            }
                        });
                    }
                }));
                sharedUpstream.subscribe(subscriber);
            }
        });
    }

    public class NopSubscriber<T> extends Subscriber<T> {
        @Override
        public void onCompleted() {}
        @Override
        public void onError(Throwable e) {}
        @Override
        public void onNext(T o) {}
    }
}
Run Code Online (Sandbox Code Playgroud)

LocationAwareReminderService利用RxPublishTimeoutCache

public class LocationAwareReminderService {

    Observable<List<Reminder>> feed;

    public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
        feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
            @Override
            public List<Reminder> call(List<Reminder> reminders, Location location) {
                return calculateNearbyReminders(reminders, location);
            }
        }).compose(new RxPublishTimeoutCache<List<Reminder>>(10, TimeUnit.SECONDS));
    }

    public Observable<List<Reminder>> getFeed() {
        return feed;
    }
}
Run Code Online (Sandbox Code Playgroud)

优点:

  • LocationAwareReminderService只开始后的第一个提供nearbyReminders Subscriber订阅
  • 生成的Feed由所有订阅者共享
  • 没有用户的短期不会使整个管道坍塌
  • 在规定的时间内没有订阅后,整个管道将被拆除

坏处:

  • 也许是一些普遍的缺陷?

问题:

  • 在RxJava中是否已经有其他方法可以实现这一目标?
  • 是否存在一些通用的设计缺陷RxPublishTimeoutCache
  • 用RxJava构建此类服务的整体策略是否存在缺陷?

Dav*_*ten 2

我认为这是一个有趣的问题,并且似乎是一个有用的运算符,所以我Transformers.delayFinalUnsubscriberxjava-extras中做了:

observable
  .publish()
  .refCount()
  .compose(Transformers
      .delayFinalUnsubscribe(1, TimeUnit.MINUTES));
Run Code Online (Sandbox Code Playgroud)

它在 Maven Central 上的 0.7.9.1 版本的 rxjava-extras 中可用。如果您愿意,请尝试一下,看看是否有任何问题。