RxJS:将历史数据与更新流相结合

mee*_*mit 5 javascript rxjs

设想:

我正在通过一个简单的 ajax 调用加载一个初始数据数组,并将该数据放入 Observable 中,我将其称为history。同时,我连接到 websocket 并定期接收数据,我们将其称为updates,我想将此数据附加到history

具体而言,假设Ajax调用发送回阵列[0,1,2]和插座发出(一段时间内)345那么我希望积累像这样这些值:

[0,1,2]        // historical
[0,1,2,3]      // historical + updates1
[0,1,2,3,4]    // historical + updates1 + updates2
[0,1,2,3,4,5]  // etc
Run Code Online (Sandbox Code Playgroud)

(请注意,这里有一个并发边缘情况需要处理:历史收益率[0,1,2,3]和前两个更新可能3and 4,在这种情况下,我想要的结果仍然是[0,1,2,3,4]- NOT [0,1,2,3,3,4]。)

最终目标是得到一个单一的 Observable,它是所描述的 Observables历史更新的组合。

到目前为止我尝试过的:

仅累积 websocket 数据很容易。我创建了updates,这是 websocket 发出的 Observable 序列。每次观察到一个值时,我可以将它累积到数组中,使用scan()

updates.scan((acc, update) => acc.concat([update]), [])
Run Code Online (Sandbox Code Playgroud)

这会产生类似的东西

[3]
[3,4]
[3,4,5]
Run Code Online (Sandbox Code Playgroud)

我的下一个问题是如何将它与history结合起来。并且由于历史数据可能在已经观察到一个或多个更新之后到达,因此在我们等待历史数据时需要累积这些更新。我设法使用withLatestFrom()以下方法实现了这一目标:

const stream = historical
  .withLatestFrom(
    updates.scan((acc, update) => acc.concat([update]), []),
    (history, buffer) => history.concat(buffer) /* could eliminate duplicates here */
  )
Run Code Online (Sandbox Code Playgroud)

观察产生一个单一的值,[0,1,2,3,4,5]它是历史和任何在历史之前到达的更新的组合。正是我想要的。

但是,我不知道从那里去哪里。我怎样才能继续向追加更新,以便随着时间的推移,产生如下结果:

[0,1,2,3,4,5]
[0,1,2,3,4,5,6]
[0,1,2,3,4,5,6,7]
Run Code Online (Sandbox Code Playgroud)

我没有看到一种scan用于此的方法,就像我为updates所做的那样,因为在这种情况下,我需要scan的初始(种子)值是 Observable,而不是 Array。

有没有办法做到这一点 - 要么通过添加我目前拥有的东西,要么用更好的替代方法来完成整个事情?

mar*_*tin 5

如果我理解正确,我会使用skipUntil()运算符来继续收集更新而不进一步发出更新。然后对于withLatestFrom()操作员,我会选择updatesObservable 作为其来源。这将等待,skipUntil()直到历史数据可用,然后从updates.

let updates = Observable
  .timer(0, 1000)
  .scan((acc, update) => {
    acc.push(update);
    return acc;
  }, []);

let historical = Observable.defer(() => { 
    console.log('Sending AJAX request ...');
    return Observable.of(['h1', 'h2', 'h3']); 
  })
  .delay(3000)
  .share();


const stream = updates.skipUntil(historical)
  .withLatestFrom(historical, (buffer, history) => {
    return history.concat(buffer);
  })
  .map(val => val) // remove duplicates;


stream.subscribe(val => console.log(val));
Run Code Online (Sandbox Code Playgroud)

控制台输出如下:

Sending AJAX request ...
["h1", "h2", "h3", 0, 1, 2, 3]
["h1", "h2", "h3", 0, 1, 2, 3, 4]
["h1", "h2", "h3", 0, 1, 2, 3, 4, 5]
Run Code Online (Sandbox Code Playgroud)

看现场演示:https : //jsbin.com/kumolez/11/edit?js,console

我不知道你的用例是什么,但我会尽量避免使用,concat()因为它在buffer增长时可能会变慢。

此外,如果您在更新到达一项时发出更新(而不是累积它们),您可以使用distinct()运算符过滤掉重复项。

顺便说一句,我假设您使用的是 RxJS 5。