And*_*ger 6 android rx-java rx-android
我们在Android App中使用mutliple服务.这些服务提供的数据是无限的Observables,通常通过组合Observables其他服务来构建.这些的构造Observables可能是昂贵的.此外,服务通常在多个地方消费,因此它们Observable应该在订户之间共享.
LocationService,提供无限Observable<Location>,发出当前位置ReminderService,提供无限Observable<List<Reminder>>,在数据集中的每次更改后发出所有存储的提醒的列表LocationAwareReminderService,提供了一个无限Observable<List<Reminders>>的附近提醒Observable.combineLatest的Observables前两个服务每个服务都结合了消费者Observables并将其内部订阅BehaviorSubject到生成的Feed中.消费者可以订阅这个BehaviorSubject.该LocationAwareReminderService例如:
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
BehaviorSubject<List<Reminder>> cache = BehaviorSubject.create();
Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).subscribe(cache);
feed = cache.asObservable();
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
Run Code Online (Sandbox Code Playgroud)
坏处:
优点:
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).replay(1).refCount();
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
Run Code Online (Sandbox Code Playgroud)
坏处:
Subscriber内整个管道没有倒塌.在下一个订阅期间,需要重建整个管道.ActivityA到ActivityB的转换(均订阅)LocationAwareReminderService.getFeed()导致管道的完全去除和重建优点:
Subscriber取消订阅之后,LocationAwareReminderService也将取消订阅LocationService.getFeed()和reminderService.getFeed() Observables.LocationAwareReminderService只开始后的第一个提供nearbyReminders Subscriber订阅Subscribers 共享因此,我构建了Transformer一个在最后一次Subscriber取消订阅后将订阅保持活动一段时间
public class RxPublishTimeoutCache<T> implements Observable.Transformer<T, T> {
private long keepAlive;
private TimeUnit timeUnit;
public RxPublishTimeoutCache(long keepAlive, TimeUnit timeUnit) {
this.keepAlive = keepAlive;
this.timeUnit = timeUnit;
}
@Override
public Observable<T> call(Observable<T> upstream) {
final Observable<T> sharedUpstream = upstream.replay(1).refCount();
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
if (subscriber.isUnsubscribed())
return;
// subscribe an empty Subscriber that keeps the subsription of refCount() alive
final Subscription keepAliveSubscription = sharedUpstream.subscribe(new NopSubscriber<T>());
// listen to unsubscribe from the subscriber
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
// the subscriber unsubscribed
Observable.timer(keepAlive, timeUnit).subscribe(new Action1<Long>() {
@Override
public void call(Long _) {
// unsubscribe the keep alive subscription
keepAliveSubscription.unsubscribe();
}
});
}
}));
sharedUpstream.subscribe(subscriber);
}
});
}
public class NopSubscriber<T> extends Subscriber<T> {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(T o) {}
}
}
Run Code Online (Sandbox Code Playgroud)
在LocationAwareReminderService利用RxPublishTimeoutCache
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).compose(new RxPublishTimeoutCache<List<Reminder>>(10, TimeUnit.SECONDS));
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
Run Code Online (Sandbox Code Playgroud)
优点:
LocationAwareReminderService只开始后的第一个提供nearbyReminders Subscriber订阅坏处:
RxPublishTimeoutCache?我认为这是一个有趣的问题,并且似乎是一个有用的运算符,所以我Transformers.delayFinalUnsubscribe在rxjava-extras中做了:
observable
.publish()
.refCount()
.compose(Transformers
.delayFinalUnsubscribe(1, TimeUnit.MINUTES));
Run Code Online (Sandbox Code Playgroud)
它在 Maven Central 上的 0.7.9.1 版本的 rxjava-extras 中可用。如果您愿意,请尝试一下,看看是否有任何问题。
| 归档时间: |
|
| 查看次数: |
289 次 |
| 最近记录: |