如何对我的ReplaySubject的所有发射应用延迟?

Ste*_*art 1 rxjs typescript angular

我想ReplaySubject在我的Angular应用中制作一个,并对其进行设置,以使观察者订阅它和收到更新之间总是存在延迟。

let delayedReplay = new ReplaySubject(1);

delayedReplay.subscribe((data) => {
  console.log('Got:', data);
});

delayedReplay.next('Test');
Run Code Online (Sandbox Code Playgroud)

我想要的是对ReplaySubject本身施加延迟,以便上面的代码将'Got: Test'在1秒后记录。

ggr*_*nig 5

你得管Subjectdelay和使用所产生的Observable订阅,同时使用原来的Subject发射。

看起来像这样:

let replay = new ReplaySubject(1);

let delayedReplay = replay.pipe(delay(1000));

delayedReplay.subscribe((data) => {
  console.log('Got:', data);
});

replay.next('Test');
Run Code Online (Sandbox Code Playgroud)

这在所有情况下都应该有效,但是正如此注释中所述,它lift对于主题也是可能的,因为lift实例化一个,因此将结果转换回主题AnonymousSubject。尽管您可能必须知道Subject的实现以确保类型转换有效,但这可能会导致某些抽象泄漏。

如果要使用它,该解决方案如下所示:

let delayedReplay = <ReplaySubject> new ReplaySubject(1).delay(1000);

// without rxjs-compat in RxJS 6+: 
// let delayedReplay = <ReplaySubject> new ReplaySubject(1).lift(new DelayOperator(1000));

delayedReplay.subscribe((data) => {
    console.log('Got:', data);
});

delayedReplay.next('Test');
Run Code Online (Sandbox Code Playgroud)

请注意,lift可以在RxJS 7中将其删除。

现在,最后一个选择是扩展ReplaySubject,因此您不必键入cast。请注意,这将增加实现与RxJS之间的耦合,并忽略可组合管道的优势。

它可能看起来像这样:

class DelayedReplaySubject<T> extends ReplaySubject<T> {
    constructor(buffer: number, private delay: number) {
        super(delay);
    }

    next(value?: T): void {
        of(value)
            .pipe(delay(this.delay))
            .subscribe(val => super.next(val));
    }
}
Run Code Online (Sandbox Code Playgroud)