仅使用 RxJS 运算符缓存 Http 请求

Séb*_*ZAT 6 caching rxjs rxjs-pipeable-operators

我正在尝试实现此处描述的内容:https ://www.prestonlamb.com/blog/rxjs-cache-and-refresh-in-angular

换句话说,我想在给定时间(假设 1 分钟)内缓存一个可观察值。当在给定时间之后进行订阅时,应再次检索数据并再次缓存 1 分钟。

预期结果示例:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:30: Request (8) => RETRIEVE data
Run Code Online (Sandbox Code Playgroud)

shareReplay 运算符可以正常工作,可以在给定时间内缓存数据,但在给定时间过后我无法重新启动它。

使用 shareRelay(1, 1000) 运算符的示例:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => no response
T 01:15: Request (6) => no response
T 01:30: Request (7) => no response
T 02:30: Request (8) => no response
Run Code Online (Sandbox Code Playgroud)

上面的链接尝试使用第一个运算符捕获空结果来更改该行为。不幸的是,它不能正常工作,因为第一次之后数据不会被缓存。

这是我使用上面链接的文章得到的内容(下图描述了所使用的代码)

代码详情

我得到的结果:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
T 01:15: Request (6) => RETRIEVE data
T 01:30: Request (7) => RETRIEVE data
T 02:30: Request (8) => RETRIEVE data
Run Code Online (Sandbox Code Playgroud)

我还看到了一些使用计时器运算符的示例,但在这种情况下,即使没有订阅,每分钟都会检索数据。我不想每分钟刷新数据,我想每分钟让缓存过期。不幸的是,我丢失了计时器运算符的代码,但结果是这样的:

定时器运算符的结果:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:00: NO REQUEST => RETRIEVE data
T 01:10: Request (5) => data from cache
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:00: NO REQUEST => RETRIEVE data
T 02:30: Request (8) => data from cache
Run Code Online (Sandbox Code Playgroud)

有人有“纯”RxJS 解决方案来做我想做的事吗?

And*_*tej 2

我认为您提供的链接的解决方案有一个小错误,正如我试图在StackBlitz中强调的那样。(或者我可能误解了这个想法)

你可以试试这个:

const refetchSbj = new Subject();
const refetchData$ = refetchSbj.pipe(
    switchMap(() => service.fetchData())
  ).pipe(share());

merge(
  src$,
  refetchData$
).pipe(
  shareReplay(1, 1000),
  buffer(concat(timer(0), refetchData$)),
  tap(values => !values.length && refetchSbj.next()),
  filter(values => values.length !== 0),
  // In case there is only one value,
  map(([v]) => v),
  // Might want to add this, because each subscriber will receive the value emitted by the `shareReplay`
  take(1)
)
Run Code Online (Sandbox Code Playgroud)

shareReplay内部使用 a ReplaySubject,它将所有缓存的值同步发送给新订阅者。timer(0)与 类似setTimeout(fn, 0),但这里的重要方面是它是异步的,这允许buffer收集 发出的值ReplaySubject

buffer(concat(timer(0), refetchData$)),- 我们希望确保提供给的内部可观察量buffer未完成,否则整个流将完成。refetchData$在这种情况下,将发出新获取的数据(稍后我们会看到)。

tap(values => !values.length && refetchSbj.next())- 如果没有发出任何值,则意味着ReplaySubjectin use 没有任何值,这意味着时间已经过去。如果是这种情况,在 的帮助下refetchSbj,我们可以重新填充缓存。


这就是我们可视化流程的方式:

T 00:00: Request (1) => RETRIEVE data
1) `refetchSbj.next()`
2) shareReplay will send the value resulted from `service.fetchData()` to the subscriber
3) the newly fetched value will be added to the `buffer`, and then the `refetchData$` from `concat(timer(0), refetchData$)` will emit(this is why we've used `share()`), meaning that `values` will not be an empty array
4) take(1) is reached, the value will be sent to the subscriber and then it will complete, so the `ReplaySubject` from `shareReplay()` will have no subscribers.

T 00:10: Request (2) => data from cache
`values` will not be empty, so `refetchSbj` won't emit and `take(1)` will be reached

T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
Same as `Request (1)`
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:30: Request (8) => RETRIEVE data
Run Code Online (Sandbox Code Playgroud)