Ste*_*art 1 rxjs typescript angular
我想ReplaySubject在我的Angular应用中制作一个,并对其进行设置,以使观察者订阅它和收到更新之间总是存在延迟。
let delayedReplay = new ReplaySubject(1);
delayedReplay.subscribe((data) => {
console.log('Got:', data);
});
delayedReplay.next('Test');
Run Code Online (Sandbox Code Playgroud)
我想要的是对ReplaySubject本身施加延迟,以便上面的代码将'Got: Test'在1秒后记录。
你得管Subject有delay和使用所产生的Observable订阅,同时使用原来的Subject发射。
看起来像这样:
let replay = new ReplaySubject(1);
let delayedReplay = replay.pipe(delay(1000));
delayedReplay.subscribe((data) => {
console.log('Got:', data);
});
replay.next('Test');
Run Code Online (Sandbox Code Playgroud)
这在所有情况下都应该有效,但是正如此注释中所述,它lift对于主题也是可能的,因为lift实例化一个,因此将结果转换回主题AnonymousSubject。尽管您可能必须知道Subject的实现以确保类型转换有效,但这可能会导致某些抽象泄漏。
如果要使用它,该解决方案如下所示:
let delayedReplay = <ReplaySubject> new ReplaySubject(1).delay(1000);
// without rxjs-compat in RxJS 6+:
// let delayedReplay = <ReplaySubject> new ReplaySubject(1).lift(new DelayOperator(1000));
delayedReplay.subscribe((data) => {
console.log('Got:', data);
});
delayedReplay.next('Test');
Run Code Online (Sandbox Code Playgroud)
请注意,lift可以在RxJS 7中将其删除。
现在,最后一个选择是扩展ReplaySubject,因此您不必键入cast。请注意,这将增加实现与RxJS之间的耦合,并忽略可组合管道的优势。
它可能看起来像这样:
class DelayedReplaySubject<T> extends ReplaySubject<T> {
constructor(buffer: number, private delay: number) {
super(delay);
}
next(value?: T): void {
of(value)
.pipe(delay(this.delay))
.subscribe(val => super.next(val));
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
169 次 |
| 最近记录: |