Joã*_*ann 5 javascript rxjs reactivex
我正在使用ReactiveX/RxJS版本.
假设我有一个Rx.ReplaySubject,它每2秒发出一个包含id的对象和一个包含值的数组.我想减少这个值数组并得到它们的总和.
问题是ReplaySubject是一个热的可观察对象并且它永远不会完成,至少我不希望它完成,因为我想要每2秒对该对象的值求和.但是为了使用reduce运算符,应该完成observable.那么,我该怎么办?
EG不工作代码:
var subject = new Rx.ReplaySubject();
subject.
map(x => x.transactions).
// Reduce never concludes because ReplaySubject instance is not completed
reduce((v1, v2) => v1+v2, 0).
subscribe(function (value) {
console.log(value)
});
setInterval(injectData, 2000);
function injectData () {
subject.next({id: Date.now(), transactions: [
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)}
]});
}
Run Code Online (Sandbox Code Playgroud)
考虑使用Observable.prototype.scan()(RxJS文档)
scan()基本上聚合一个可观察的并发出每个连续的值,不同之处reduce()仅在完成时发出结果.(参见Rx解释扫描和减少)
使用OP代码的示例(这是小提琴):
var subject = new Rx.ReplaySubject();
subject
// note: use "selectMany" to flatten observable of observables
.selectMany(x => Rx.Observable.fromArray(x.transactions))
// note: use "scan" to aggregate values
.scan((agg, val) => agg+val.value, 0)
.subscribe(function (value) {
console.log(value)
});
setInterval(injectData, 2000);
function injectData () {
subject.onNext({id: Date.now(), transactions: [
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)}
]});
}
Run Code Online (Sandbox Code Playgroud)
另一个例子:
上面的代码为每个事务发出聚合,因为selectMany().如果你只想让它每2秒钟发出一次,这是一个很好的时间来使用reduce()(这是另一个小提琴):
subject
// note: use "selectMany" to flatten observable of observables
// note: using "reduce" inside here so that we only emit the aggregate
.selectMany(x =>
Rx.Observable
.fromArray(x.transactions)
.reduce((agg, val) => agg + val.value, 0)
)
// note: use "scan" to aggregate values
.scan((agg, val) => agg+val, 0)
.subscribe(function (value) {
console.log(value)
});
Run Code Online (Sandbox Code Playgroud)
附加说明:
Rx科目可以完成; 你需要onCompleted()在准备好时打电话.如果您完成主题,您仍然可以使用reduce().将这个小提琴与上面的小提琴进行比较.