如何使用/控制 RxJava Observable.cache

AK_*_*_92 4 reactive-programming rx-android rx-java2

我正在尝试使用 RxJava 缓存机制( RxJava2 ),但我似乎无法理解它是如何工作的,或者我如何控制缓存的内容,因为有操作符cache

我想在发出新数据之前在某些条件下验证缓存的数据。

例如

someObservable.
repeat().
filter { it.age < maxAge }.
map(it.name).
cache() 

Run Code Online (Sandbox Code Playgroud)

我如何检查和过滤缓存值并在成功时发出它,如果不成功,我将请求一个新值。

由于该值会定期更改,因此我需要先验证缓存是否仍然有效,然后才能请求新的缓存。

还有ObservableCache<T>类,但我找不到任何使用它的资源。

任何帮助将非常感激。谢谢。

Han*_*rst 5

这不是重播/缓存的工作原理。请先阅读#replay/#cache 文档。

重播

该运算符返回一个 ConnectableObservable,它具有一些用于连接到源的方法 (#refCount/ #connect/ #autoConnect)。

当在没有过载的情况下应用 #replay 时,源订阅将被多播,并且所有发出的值都会被重播。源订阅是惰性的,可以通过#refCount/ #connect/ #autoConnect 连接到源。

返回一个 ConnectableObservable,它共享对底层 ObservableSource 的单个订阅,该订阅将向任何未来的观察者重放其所有项目和通知。

在没有任何连接方法的情况下应用 #relay (#refCount/ #connect/ #autoConnect) 不会在订阅上发出任何值

Connectable ObservableSource 类似于普通的 ObservableSource,不同之处在于它不会在订阅时开始发射项目,而是仅在调用其 connect 方法时才开始发射项目。

重播(1)#autoConnect(-1) / #refCount(1) / #connect

应用 replay(1) 将缓存最后一个值,并在每个订阅上发出缓存的值。#autoConnect 将立即打开连接并保持打开状态,直到发生终端事件(onComplete、onError)。#refCount 是类似的,但当所有订阅者消失时,将与源断开连接。当您需要等待、对可观察对象完成所有订阅时,可以使用 #connect 操作符,以免错过值。

用法

#replay(1) —— 大部分应该在可观察的末尾使用。

sourcObs.
  .filter()
  .map()
  .replay(bufferSize)
  .refCount(connectWhenXSubsciberSubscribed) 
Run Code Online (Sandbox Code Playgroud)

警告

当您观察到无限时,应用没有缓冲区限制或到期日期的 #replay 将导致内存泄漏

缓存/cacheWithInitialCapacity

运算符与带有 autoConnect(1) 的 #replay 类似。运营商将缓存每个值并在每个订阅上重播。

仅当第一个下游订阅者订阅并维护对此 ObservableSource 的单个订阅时,操作员才会订阅。相反,返回 ConnectableObservable 的 replay() 运算符系列需要显式调用 ConnectableObservable.connect()。注意:当您使用缓存观察者时,您会牺牲处理源的能力,因此请注意不要在 ObservableSource 上使用此观察者,因为它会发出无限或大量的项目,从而耗尽内存。一种可能的解决方法是takeUntil在应用cache()之前(也许之后)应用谓词或另一个源。

例子

    @Test
    fun skfdsfkds() {
        val create = PublishSubject.create<Int>()

        val cacheWithInitialCapacity = create
            .cacheWithInitialCapacity(1)

        cacheWithInitialCapacity.subscribe()

        create.onNext(1)
        create.onNext(2)
        create.onNext(3)

        cacheWithInitialCapacity.test().assertValues(1, 2, 3)
        cacheWithInitialCapacity.test().assertValues(1, 2, 3)
    }
Run Code Online (Sandbox Code Playgroud)

用法

当您无法控制连接阶段时,使用缓存运算符

当您希望 ObservableSource 缓存响应并且无法控制所有观察者的订阅/处置行为时,这非常有用。

警告

与 replay() 一样,缓存是无限的,可能会导致内存泄漏。

注意:容量提示不是缓存大小的上限。为此,请考虑将 replay(int) 与 ConnectableObservable.autoConnect() 或类似方法结合使用。

进一步阅读

https://blog.danlew.net/2018/09/25/connectable-observables-so-hot-right-now/

https://blog.danlew.net/2016/06/13/multicasting-in-rxjava/