Mar*_*tin 20 caching rxjs rxjs5 angular
我正在使用RsJS 5(5.0.1)在Angular 2中缓存.它运行良好.
缓存功能的核心是:
const observable = Observable.defer(
() => actualFn().do(() => this.console.log('CACHE MISS', cacheKey))
)
.publishReplay(1, this.RECACHE_INTERVAL)
.refCount().take(1)
.do(() => this.console.log('CACHE HIT', cacheKey));
Run Code Online (Sandbox Code Playgroud)
该actualFn
是this.http.get('/some/resource')
.
就像我说的,这对我来说非常合适.缓存从observable返回持续时间RECACHE_INTERVAL
.如果在该间隔之后发出请求,actualFn()
则将调用该请求.
我想弄清楚的是当RECACHE_INTERVAL
到期和actualFn()
被调用时 - 如何返回最后一个值.在RECACHE_INTERVAL
到期和actualFn()
重放之间有一段时间,observable不返回值.我想及时消除这个差距,并始终返回最后一个值.
我可以使用副作用并.next(lastValue)
在等待HTTP响应返回时存储最后一个好的值调用,但这似乎很幼稚.我想使用"RxJS"方式,一种纯函数解决方案 - 如果可能的话.
更新的答案:
如果在发出新请求时总是想使用以前的值,那么可以在链中放置另一个主题来保留最新的值。
然后,您可以重复该值,以便可以判断它是否来自缓存。如果订阅者对缓存的值不感兴趣,则可以过滤掉这些值。
// Take values while they pass the predicate, then return one more
// i.e also return the first value which returned false
const takeWhileInclusive = predicate => src =>
src
.flatMap(v => Observable.from([v, v]))
.takeWhile((v, index) =>
index % 2 === 0 ? true : predicate(v, index)
)
.filter((v, index) => index % 2 !== 1);
// Source observable will still push its values into the subject
// even after the subscriber unsubscribes
const keepHot = subject => src =>
Observable.create(subscriber => {
src.subscribe(subject);
return subject.subscribe(subscriber);
});
const cachedRequest = request
// Subjects below only store the most recent value
// so make sure most recent is marked as 'fromCache'
.flatMap(v => Observable.from([
{fromCache: false, value: v},
{fromCache: true, value: v}
]))
// Never complete subject
.concat(Observable.never())
// backup cache while new request is in progress
.let(keepHot(new ReplaySubject(1)))
// main cache with expiry time
.let(keepHot(new ReplaySubject(1, this.RECACHE_INTERVAL)))
.publish()
.refCount()
.let(takeWhileInclusive(v => v.fromCache));
// Cache will be re-filled by request when there is another subscription after RECACHE_INTERVAL
// Subscribers will get the most recent cached value first then an updated value
Run Code Online (Sandbox Code Playgroud)
https://acutmore.jsbin.com/kekevib/8/edit?js,console
原答案:
您可以更改源可观察对象以在延迟后重复,而不是在 replaySubject 上设置窗口大小。
const observable = Observable.defer(
() => actualFn().do(() => this.console.log('CACHE MISS', cacheKey))
)
.repeatWhen(_ => _.delay(this.RECACHE_INTERVAL))
.publishReplay(1)
.refCount()
.take(1)
.do(() => this.console.log('CACHE HIT', cacheKey));
Run Code Online (Sandbox Code Playgroud)
操作员repeatWhen
需要 RxJs-beta12 或更高版本
https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#500-beta12-2016-09-09
归档时间: |
|
查看次数: |
1427 次 |
最近记录: |