我想合并/合并多个Observable,当每个Observable完成时,执行finally函数。该merge
操作似乎在并行执行每个订阅,这正是我需要的,但如果其中任何引发错误的停止执行。
RxJS 版本4具有一个操作符mergeDelayError,该操作符应使所有订阅都一直执行到所有订阅都完成为止,但是该操作符在版本5中未实现。
我应该恢复为其他运营商吗?
var source1 = Rx.Observable.of(1,2,3).delay(3000);
var source2 = Rx.Observable.throw(new Error('woops'));
var source3 = Rx.Observable.of(4,5,6).delay(1000);
// Combine the 3 sources into 1
var source = Rx.Observable
.merge(source1, source2, source3)
.finally(() => {
// finally is executed before all
// subscriptions are completed.
console.log('finally');
});
var subscription = source.subscribe(
x => console.log('next:', x),
e => console.log('error:', e),
() => console.log('completed'));
Run Code Online (Sandbox Code Playgroud)
联合会
我们可以通过收集错误并在最后发出它们来避免阻塞流。
function mergeDelayError(...sources) {
const errors = [];
const catching = sources.map(obs => obs.catch(e => {
errors.push(e);
return Rx.Observable.empty();
}));
return Rx.Observable
.merge(...catching)
.concat(Rx.Observable.defer(
() => errors.length === 0 ? Rx.Observable.empty() : Rx.Observable.throw(errors)));
}
const source1 = Rx.Observable.of(1,2,3);
const source2 = Rx.Observable.throw(new Error('woops'));
const source3 = Rx.Observable.of(4,5,6);
mergeDelayError(source1, source2, source3).subscribe(
x => console.log('next:', x),
e => console.log('error:', e),
() => console.log('completed'));
Run Code Online (Sandbox Code Playgroud)
我认为您可以使用 来模拟相同的行为catch()
。您只需将其附加到每个可观察源:
const sources = [source1, source2, source3].map(obs =>
obs.catch(() => Observable.empty())
);
Rx.Observable
.merge(sources)
.finally(...)
...
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1722 次 |
最近记录: |