RxJava Observable.cache无效

fed*_*aol 15 android rx-java

我想在Android环境中学习rxjava.假设我有一个可观察到的网络调用结果.如果我理解正确,处理配置更改的一种常见方法是:

  • 将observable存储在保留的片段/单例/应用程序对象中

  • 缓存运算符应用于observable

  • 订阅/取消订阅适当的生命周期处理程序

这样做,我们不会松开一旦新配置发生后将重新观察到的observable的结果.

现在,我的问题是:

有没有办法强制observable发出一个新值(并使缓存的值无效)?每次我需要来自网络的新数据时,我是否需要创建一个新的observable(这听起来不像Android世界中的坏习惯,因为这会让gc做额外的工作)?

非常感谢,

费德里科

Dav*_*ten 20

制作OnSubscribe满足您需求的自定义实现:

public static class OnSubscribeRefreshingCache<T> implements OnSubscribe<T> {

    private final AtomicBoolean refresh = new AtomicBoolean(true);
    private final Observable<T> source;
    private volatile Observable<T> current;

    public OnSubscribeRefreshingCache(Observable<T> source) {
        this.source = source;
        this.current = source;
    }

    public void reset() {
        refresh.set(true);
    }

    @Override
    public void call(Subscriber<? super T> subscriber) {
        if (refresh.compareAndSet(true, false)) {
            current = source.cache();
        }
        current.unsafeSubscribe(subscriber);
    }

}
Run Code Online (Sandbox Code Playgroud)

这段代码演示了用法,并显示缓存基本上被重置:

Observable<Integer> o = Observable.just(1)
        .doOnCompleted(() -> System.out.println("completed"));
OnSubscribeRefreshingCache<Integer> cacher = 
    new OnSubscribeRefreshingCache<Integer>(o);
Observable<Integer> o2 = Observable.create(cacher);
o2.subscribe(System.out::println);
o2.subscribe(System.out::println);
cacher.reset();
o2.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

输出:

completed
1
1
completed
1
Run Code Online (Sandbox Code Playgroud)

顺便提一下,你可能会注意到.cache直到完成才会发出.这是一个应该由rxjava 1.0.14修复的错误.

就GC压力问题而言,应用于Observable的每个操作符通常通过lift或创建一个新的Observable create.与创建新Observable相关联的基本成员状态是对该onSubscribe函数的引用.cache与大多数人不同的是,它在订阅中保持状态,如果它拥有大量状态并且经常被抛弃,这就有可能导致GC压力.即使您使用相同的可变数据结构来保持跨重置的状态,GC在清除时仍然必须处理数据结构的内容,因此您可能不会获得太多收益.

RxJava cache运算符是为多个并发订阅而构建的.您可以想象重置功能可能会导致实施问题.如果你想进一步探索,请务必在RxJava github上提出一个问题.