RxJS 主题有条件的 next() 更新,基于当前新值

Raf*_*l R 2 rxjs angular

我有一个角度应用程序,它在一个主题中存储一些数据

dataSubject = new Subject<SomeDataType>();
Run Code Online (Sandbox Code Playgroud)

我有两种不同的方式从服务器更新数据,它们看起来像

connection1.on('messageReceived', (newData: SomeDataType) => this.dataService.dataSubject.next(newData))
Run Code Online (Sandbox Code Playgroud)
connection2.on('messageReceived', (newData: SomeDataType) => this.dataService.dataSubject.next(newData))
Run Code Online (Sandbox Code Playgroud)

问题是最后到达的 newData 不一定是最新的。每个数据对象都包含一个我想使用的时间戳。

有没有办法让主题只接受具有 newData.timestamp > currentData.timestamp 的值?类似于条件 next()

bry*_*n60 5

听起来像是scan运营商的案例:

latestData$ = this.dataSubject.pipe(
  // scan will give you the last emitted value and the current value coming through
  // so you can compare them and select which to emit
  scan((last, current) => {
    if (last.timestamp > current.timestamp) {
      return last
    }
    return current;
  }),
  // optional operator to prevent emitting the same value twice 
  distinctUntilChanged()
);
Run Code Online (Sandbox Code Playgroud)

latestData$ 仅当时间戳大于使用此设置的先前发出的值时才会发出值。