HTTP服务的反应性缓存

Mar*_*tin 20 caching rxjs rxjs5 angular

我正在使用RsJS 5(5.0.1)在Angular 2中缓存.它运行良好.

缓存功能的核心是:

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey))
  )
  .publishReplay(1, this.RECACHE_INTERVAL)
  .refCount().take(1)
  .do(() => this.console.log('CACHE HIT', cacheKey));
Run Code Online (Sandbox Code Playgroud)

actualFnthis.http.get('/some/resource').

就像我说的,这对我来说非常合适.缓存从observable返回持续时间RECACHE_INTERVAL.如果在该间隔之后发出请求,actualFn()则将调用该请求.

我想弄清楚的是当RECACHE_INTERVAL到期和actualFn()被调用时 - 如何返回最后一个值.在RECACHE_INTERVAL到期和actualFn()重放之间有一段时间,observable不返回值.我想及时消除这个差距,并始终返回最后一个值.

我可以使用副作用并.next(lastValue)在等待HTTP响应返回时存储最后一个好的值调用,但这似乎很幼稚.我想使用"RxJS"方式,一种纯函数解决方案 - 如果可能的话.

Ash*_*ley 2

更新的答案:

如果在发出新请求时总是想使用以前的值,那么可以在链中放置另一个主题来保留最新的值。

然后,您可以重复该值,以便可以判断它是否来自缓存。如果订阅者对缓存的值不感兴趣,则可以过滤掉这些值。

// Take values while they pass the predicate, then return one more
// i.e also return the first value which returned false
const takeWhileInclusive = predicate => src =>
  src
  .flatMap(v => Observable.from([v, v]))
  .takeWhile((v, index) =>
     index % 2 === 0 ? true : predicate(v, index)
  )
  .filter((v, index) => index % 2 !== 1);

// Source observable will still push its values into the subject
// even after the subscriber unsubscribes
const keepHot = subject => src =>
  Observable.create(subscriber => {
    src.subscribe(subject);

    return subject.subscribe(subscriber);
  });

const cachedRequest = request
   // Subjects below only store the most recent value
   // so make sure most recent is marked as 'fromCache'
  .flatMap(v => Observable.from([
     {fromCache: false, value: v},
     {fromCache: true, value: v}
   ]))
   // Never complete subject
  .concat(Observable.never())
   // backup cache while new request is in progress
  .let(keepHot(new ReplaySubject(1)))
   // main cache with expiry time
  .let(keepHot(new ReplaySubject(1, this.RECACHE_INTERVAL)))
  .publish()
  .refCount()
  .let(takeWhileInclusive(v => v.fromCache));

  // Cache will be re-filled by request when there is another subscription after RECACHE_INTERVAL
  // Subscribers will get the most recent cached value first then an updated value
Run Code Online (Sandbox Code Playgroud)

https://acutmore.jsbin.com/kekevib/8/edit?js,console

原答案:

您可以更改源可观察对象以在延迟后重复,而不是在 replaySubject 上设置窗口大小。

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey))
  )
  .repeatWhen(_ => _.delay(this.RECACHE_INTERVAL))
  .publishReplay(1)
  .refCount()
  .take(1)
  .do(() => this.console.log('CACHE HIT', cacheKey));
Run Code Online (Sandbox Code Playgroud)

操作员repeatWhen需要 RxJs-beta12 或更高版本 https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#500-beta12-2016-09-09