rxjs5合并和错误处理

nul*_*ull 5 rxjs rxjs5

我想合并/合并多个Observable,当每个Observable完成时,执行finally函数。该merge操作似乎在并行执行每个订阅,这正是我需要的,但如果其中任何引发错误的停止执行。

RxJS 版本4具有一个操作符mergeDelayError,该操作符应使所有订阅都一直执行到所有订阅都完成为止,但是该操作符在版本5中未实现。

我应该恢复为其他运营商吗?

var source1 = Rx.Observable.of(1,2,3).delay(3000);
var source2 = Rx.Observable.throw(new Error('woops'));
var source3 = Rx.Observable.of(4,5,6).delay(1000);

// Combine the 3 sources into 1 

var source = Rx.Observable
  .merge(source1, source2, source3)
  .finally(() => {

    // finally is executed before all 
    // subscriptions are completed.

    console.log('finally');

  }); 

var subscription = source.subscribe(
  x => console.log('next:', x),
  e => console.log('error:', e),
  () => console.log('completed'));
Run Code Online (Sandbox Code Playgroud)

联合会

Lau*_*owe 5

我们可以通过收集错误并在最后发出它们来避免阻塞流。

function mergeDelayError(...sources) {
  const errors = [];
  const catching = sources.map(obs => obs.catch(e => {
    errors.push(e);
    return Rx.Observable.empty();
  }));
  return Rx.Observable
    .merge(...catching)
    .concat(Rx.Observable.defer(
      () => errors.length === 0 ? Rx.Observable.empty() : Rx.Observable.throw(errors)));
}


const source1 = Rx.Observable.of(1,2,3);
const source2 = Rx.Observable.throw(new Error('woops'));
const source3 = Rx.Observable.of(4,5,6);

mergeDelayError(source1, source2, source3).subscribe(
  x => console.log('next:', x),
  e => console.log('error:', e),
  () => console.log('completed'));
Run Code Online (Sandbox Code Playgroud)


mar*_*tin 2

我认为您可以使用 来模拟相同的行为catch()。您只需将其附加到每个可观察源:

const sources = [source1, source2, source3].map(obs => 
  obs.catch(() => Observable.empty())
);

Rx.Observable
  .merge(sources)
  .finally(...)
  ...
Run Code Online (Sandbox Code Playgroud)

  • 不幸的是,这实际上并没有延迟错误,而是吞噬了错误。 (2认同)