我正在通过一个简单的 ajax 调用加载一个初始数据数组,并将该数据放入 Observable 中,我将其称为history。同时,我连接到 websocket 并定期接收数据,我们将其称为updates,我想将此数据附加到history。
具体而言,假设Ajax调用发送回阵列[0,1,2]和插座发出(一段时间内)3,4,5那么我希望积累像这样这些值:
[0,1,2] // historical
[0,1,2,3] // historical + updates1
[0,1,2,3,4] // historical + updates1 + updates2
[0,1,2,3,4,5] // etc
Run Code Online (Sandbox Code Playgroud)
(请注意,这里有一个并发边缘情况需要处理:历史收益率[0,1,2,3]和前两个更新可能是3and 4,在这种情况下,我想要的结果仍然是[0,1,2,3,4]- NOT [0,1,2,3,3,4]。)
最终目标是得到一个单一的 Observable流,它是所描述的 Observables历史和更新的组合。
仅累积 websocket 数据很容易。我创建了updates,这是 websocket 发出的 Observable 序列。每次观察到一个值时,我可以将它累积到数组中,使用scan():
updates.scan((acc, update) => acc.concat([update]), [])
Run Code Online (Sandbox Code Playgroud)
这会产生类似的东西
[3]
[3,4]
[3,4,5]
Run Code Online (Sandbox Code Playgroud)
我的下一个问题是如何将它与history结合起来。并且由于历史数据可能在已经观察到一个或多个更新之后到达,因此在我们等待历史数据时需要累积这些更新。我设法使用withLatestFrom()以下方法实现了这一目标:
const stream = historical
.withLatestFrom(
updates.scan((acc, update) => acc.concat([update]), []),
(history, buffer) => history.concat(buffer) /* could eliminate duplicates here */
)
Run Code Online (Sandbox Code Playgroud)
观察流产生一个单一的值,[0,1,2,3,4,5]它是历史和任何在历史之前到达的更新的组合。正是我想要的。
但是,我不知道从那里去哪里。我怎样才能继续向流追加更新,以便随着时间的推移,流产生如下结果:
[0,1,2,3,4,5]
[0,1,2,3,4,5,6]
[0,1,2,3,4,5,6,7]
Run Code Online (Sandbox Code Playgroud)
我没有看到一种scan用于此的方法,就像我为updates所做的那样,因为在这种情况下,我需要scan的初始(种子)值是 Observable,而不是 Array。
有没有办法做到这一点 - 要么通过添加我目前拥有的东西,要么用更好的替代方法来完成整个事情?
如果我理解正确,我会使用skipUntil()运算符来继续收集更新而不进一步发出更新。然后对于withLatestFrom()操作员,我会选择updatesObservable 作为其来源。这将等待,skipUntil()直到历史数据可用,然后从updates.
let updates = Observable
.timer(0, 1000)
.scan((acc, update) => {
acc.push(update);
return acc;
}, []);
let historical = Observable.defer(() => {
console.log('Sending AJAX request ...');
return Observable.of(['h1', 'h2', 'h3']);
})
.delay(3000)
.share();
const stream = updates.skipUntil(historical)
.withLatestFrom(historical, (buffer, history) => {
return history.concat(buffer);
})
.map(val => val) // remove duplicates;
stream.subscribe(val => console.log(val));
Run Code Online (Sandbox Code Playgroud)
控制台输出如下:
Sending AJAX request ...
["h1", "h2", "h3", 0, 1, 2, 3]
["h1", "h2", "h3", 0, 1, 2, 3, 4]
["h1", "h2", "h3", 0, 1, 2, 3, 4, 5]
Run Code Online (Sandbox Code Playgroud)
看现场演示:https : //jsbin.com/kumolez/11/edit?js,console
我不知道你的用例是什么,但我会尽量避免使用,concat()因为它在buffer增长时可能会变慢。
此外,如果您在更新到达一项时发出更新(而不是累积它们),您可以使用distinct()运算符过滤掉重复项。
顺便说一句,我假设您使用的是 RxJS 5。
| 归档时间: |
|
| 查看次数: |
1252 次 |
| 最近记录: |