是否可以对一段时间内的事件进行计数并在 RxJS 中每秒产生一次总和?我有一个连续的永无止境的事件流。每 1 秒我想获得过去 5 分钟窗口中的事件总数。这个想法是使用它来填充实时图形。
我知道如何以传统方式做到这一点,但真的很想了解它是如何通过反应式编程完成的。
这是我的解决方法。
创建一个可观察对象,仅计算接收到的事件数量并将其作为运行总数发出(通过scan)。创建第二个可观察量,它只是延迟了 5 分钟的运行总计。创建第三个可观察量,它只是从第一个可观察量中减去延迟的可观察量。这将产生小于 5 分钟的事件的运行总数。创建一个最终的可观察量,每秒对第三个可观察量进行一次采样。
const totalLast5Minutes = eventSource.publish(events => {
const runningTotal = events
.scan((e, total) => total + 1, 0)
.startWith(0);
const totalDelayed5Minutes = runningTotal
.delay(5000 * 60)
.startWith(0);
return Rx.Observable
.combineLatest(total, totalDelayed5Minutes, (t, td) => t - td);
});
// only sample the value once per second
Rx.Observable
.interval(1000)
.withLatestFrom(totalLast5Minutes, (interval, total) => total)
.subscribe(total => console.log(`total=${total}`));
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1895 次 |
| 最近记录: |