计算一段时间内的事件并在 RxJS 中每秒产生一次总和

Tob*_*oby 5 javascript rxjs

是否可以对一段时间内的事件进行计数并在 RxJS 中每秒产生一次总和?我有一个连续的永无止境的事件流。每 1 秒我想获得过去 5 分钟窗口中的事件总数。这个想法是使用它来填充实时图形。

我知道如何以传统方式做到这一点,但真的很想了解它是如何通过反应式编程完成的。

Bra*_*don 5

这是我的解决方法。

创建一个可观察对象,仅计算接收到的事件数量并将其作为运行总数发出(通过scan)。创建第二个可观察量,它只是延迟了 5 分钟的运行总计。创建第三个可观察量,它只是从第一个可观察量中减去延迟的可观察量。这将产生小于 5 分钟的事件的运行总数。创建一个最终的可观察量,每秒对第三个可观察量进行一次采样。

const totalLast5Minutes = eventSource.publish(events => {
    const runningTotal = events
        .scan((e, total) => total + 1, 0)
        .startWith(0);
    const totalDelayed5Minutes = runningTotal
        .delay(5000 * 60)
        .startWith(0);
    return Rx.Observable
        .combineLatest(total, totalDelayed5Minutes, (t, td) => t - td);
});

// only sample the value once per second
Rx.Observable
    .interval(1000)
    .withLatestFrom(totalLast5Minutes, (interval, total) => total)
    .subscribe(total => console.log(`total=${total}`));
Run Code Online (Sandbox Code Playgroud)