AK_*_*_92 4 reactive-programming rx-android rx-java2
我正在尝试使用 RxJava 缓存机制( RxJava2 ),但我似乎无法理解它是如何工作的,或者我如何控制缓存的内容,因为有操作符cache。
我想在发出新数据之前在某些条件下验证缓存的数据。
例如
someObservable.
repeat().
filter { it.age < maxAge }.
map(it.name).
cache()
Run Code Online (Sandbox Code Playgroud)
我如何检查和过滤缓存值并在成功时发出它,如果不成功,我将请求一个新值。
由于该值会定期更改,因此我需要先验证缓存是否仍然有效,然后才能请求新的缓存。
还有ObservableCache<T>类,但我找不到任何使用它的资源。
任何帮助将非常感激。谢谢。
这不是重播/缓存的工作原理。请先阅读#replay/#cache 文档。
该运算符返回一个 ConnectableObservable,它具有一些用于连接到源的方法 (#refCount/ #connect/ #autoConnect)。
当在没有过载的情况下应用 #replay 时,源订阅将被多播,并且所有发出的值都会被重播。源订阅是惰性的,可以通过#refCount/ #connect/ #autoConnect 连接到源。
返回一个 ConnectableObservable,它共享对底层 ObservableSource 的单个订阅,该订阅将向任何未来的观察者重放其所有项目和通知。
在没有任何连接方法的情况下应用 #relay (#refCount/ #connect/ #autoConnect) 不会在订阅上发出任何值
Connectable ObservableSource 类似于普通的 ObservableSource,不同之处在于它不会在订阅时开始发射项目,而是仅在调用其 connect 方法时才开始发射项目。
应用 replay(1) 将缓存最后一个值,并在每个订阅上发出缓存的值。#autoConnect 将立即打开连接并保持打开状态,直到发生终端事件(onComplete、onError)。#refCount 是类似的,但当所有订阅者消失时,将与源断开连接。当您需要等待、对可观察对象完成所有订阅时,可以使用 #connect 操作符,以免错过值。
#replay(1) —— 大部分应该在可观察的末尾使用。
sourcObs.
.filter()
.map()
.replay(bufferSize)
.refCount(connectWhenXSubsciberSubscribed)
Run Code Online (Sandbox Code Playgroud)
当您观察到无限时,应用没有缓冲区限制或到期日期的 #replay 将导致内存泄漏
运算符与带有 autoConnect(1) 的 #replay 类似。运营商将缓存每个值并在每个订阅上重播。
仅当第一个下游订阅者订阅并维护对此 ObservableSource 的单个订阅时,操作员才会订阅。相反,返回 ConnectableObservable 的 replay() 运算符系列需要显式调用 ConnectableObservable.connect()。注意:当您使用缓存观察者时,您会牺牲处理源的能力,因此请注意不要在 ObservableSource 上使用此观察者,因为它会发出无限或大量的项目,从而耗尽内存。一种可能的解决方法是
takeUntil在应用cache()之前(也许之后)应用谓词或另一个源。
@Test
fun skfdsfkds() {
val create = PublishSubject.create<Int>()
val cacheWithInitialCapacity = create
.cacheWithInitialCapacity(1)
cacheWithInitialCapacity.subscribe()
create.onNext(1)
create.onNext(2)
create.onNext(3)
cacheWithInitialCapacity.test().assertValues(1, 2, 3)
cacheWithInitialCapacity.test().assertValues(1, 2, 3)
}
Run Code Online (Sandbox Code Playgroud)
当您无法控制连接阶段时,使用缓存运算符
当您希望 ObservableSource 缓存响应并且无法控制所有观察者的订阅/处置行为时,这非常有用。
与 replay() 一样,缓存是无限的,可能会导致内存泄漏。
注意:容量提示不是缓存大小的上限。为此,请考虑将 replay(int) 与 ConnectableObservable.autoConnect() 或类似方法结合使用。
https://blog.danlew.net/2018/09/25/connectable-observables-so-hot-right-now/
https://blog.danlew.net/2016/06/13/multicasting-in-rxjava/
| 归档时间: |
|
| 查看次数: |
5823 次 |
| 最近记录: |