Rxjs 在源发出值之后缓冲指定时间的发出值

rod*_*_la 7 rxjs

source$如果有一些事件触发,我有一个可观察的收集数据流。我想将指定时间发生的这些数据收集到数组中。

const eventSubject = new Subject();
eventSubject.next(data); 

const source$ = eventSubject.asObservable();
source$.pipe(takeUntil(destroyed$)).subscribe(
    data => {
      console.log(data);
    }
);
Run Code Online (Sandbox Code Playgroud)

上面source$立即处理发出的数据。

现在我想改进这一点,等待几秒钟并收集在指定时间内发生的所有数据并发出一次。所以我修改为使用bufferTime如下:

const source$ = eventSubject.asObservable();
source$.pipe(takeUntil(destroyed$), bufferTime(2000)).subscribe(
    data => {
      console.log(data);
    }
);
Run Code Online (Sandbox Code Playgroud)

经过测试后bufferTime,我发现即使源没有接收数据,它也会每 2 秒发出一次。如果源没有接收数据,它会发出空对象。

我想要的是只有在source$接收数据时,然后开始缓冲2s,然后发出值。如果 source$ 没有接收数据,它不应该发出任何东西。

我检查了bufferWhen,,windowWhenwindowTime都符合我的要求。它们在每个指定的时间间隔发出信号。

有其他运营商可以做我想做的事吗?

多谢。

Fan*_*ung 1

您只需添加一个filter运算符即可忽略空对象发射

const source$ = eventSubject.asObservable();
source$.pipe(takeUntil(destroyed$), bufferTime(2000),filter(arr=>arr.length)).subscribe(
    data => {
      console.log(data);
    }
);
Run Code Online (Sandbox Code Playgroud)