Séb*_*ZAT 6 caching rxjs rxjs-pipeable-operators
我正在尝试实现此处描述的内容:https ://www.prestonlamb.com/blog/rxjs-cache-and-refresh-in-angular
换句话说,我想在给定时间(假设 1 分钟)内缓存一个可观察值。当在给定时间之后进行订阅时,应再次检索数据并再次缓存 1 分钟。
预期结果示例:
T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:30: Request (8) => RETRIEVE data
Run Code Online (Sandbox Code Playgroud)
shareReplay 运算符可以正常工作,可以在给定时间内缓存数据,但在给定时间过后我无法重新启动它。
使用 shareRelay(1, 1000) 运算符的示例:
T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => no response
T 01:15: Request (6) => no response
T 01:30: Request (7) => no response
T 02:30: Request (8) => no response
Run Code Online (Sandbox Code Playgroud)
上面的链接尝试使用第一个运算符捕获空结果来更改该行为。不幸的是,它不能正常工作,因为第一次之后数据不会被缓存。
这是我使用上面链接的文章得到的内容(下图描述了所使用的代码)
我得到的结果:
T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
T 01:15: Request (6) => RETRIEVE data
T 01:30: Request (7) => RETRIEVE data
T 02:30: Request (8) => RETRIEVE data
Run Code Online (Sandbox Code Playgroud)
我还看到了一些使用计时器运算符的示例,但在这种情况下,即使没有订阅,每分钟都会检索数据。我不想每分钟刷新数据,我想每分钟让缓存过期。不幸的是,我丢失了计时器运算符的代码,但结果是这样的:
定时器运算符的结果:
T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:00: NO REQUEST => RETRIEVE data
T 01:10: Request (5) => data from cache
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:00: NO REQUEST => RETRIEVE data
T 02:30: Request (8) => data from cache
Run Code Online (Sandbox Code Playgroud)
有人有“纯”RxJS 解决方案来做我想做的事吗?
我认为您提供的链接的解决方案有一个小错误,正如我试图在StackBlitz中强调的那样。(或者我可能误解了这个想法)
你可以试试这个:
const refetchSbj = new Subject();
const refetchData$ = refetchSbj.pipe(
switchMap(() => service.fetchData())
).pipe(share());
merge(
src$,
refetchData$
).pipe(
shareReplay(1, 1000),
buffer(concat(timer(0), refetchData$)),
tap(values => !values.length && refetchSbj.next()),
filter(values => values.length !== 0),
// In case there is only one value,
map(([v]) => v),
// Might want to add this, because each subscriber will receive the value emitted by the `shareReplay`
take(1)
)
Run Code Online (Sandbox Code Playgroud)
shareReplay内部使用 a ReplaySubject,它将所有缓存的值同步发送给新订阅者。timer(0)与 类似setTimeout(fn, 0),但这里的重要方面是它是异步的,这允许buffer收集 发出的值ReplaySubject。
buffer(concat(timer(0), refetchData$)),- 我们希望确保提供给的内部可观察量buffer未完成,否则整个流将完成。refetchData$在这种情况下,将发出新获取的数据(稍后我们会看到)。
tap(values => !values.length && refetchSbj.next())- 如果没有发出任何值,则意味着ReplaySubjectin use 没有任何值,这意味着时间已经过去。如果是这种情况,在 的帮助下refetchSbj,我们可以重新填充缓存。
这就是我们可视化流程的方式:
T 00:00: Request (1) => RETRIEVE data
1) `refetchSbj.next()`
2) shareReplay will send the value resulted from `service.fetchData()` to the subscriber
3) the newly fetched value will be added to the `buffer`, and then the `refetchData$` from `concat(timer(0), refetchData$)` will emit(this is why we've used `share()`), meaning that `values` will not be an empty array
4) take(1) is reached, the value will be sent to the subscriber and then it will complete, so the `ReplaySubject` from `shareReplay()` will have no subscribers.
T 00:10: Request (2) => data from cache
`values` will not be empty, so `refetchSbj` won't emit and `take(1)` will be reached
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
Same as `Request (1)`
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:30: Request (8) => RETRIEVE data
Run Code Online (Sandbox Code Playgroud)