RXJS 出错后继续 concat subscribe

Zan*_*man 8 rxjs typescript rxjs5 rxjs6

我有一系列需要按顺序触发的 observable。一旦发生错误,我需要捕获错误,记录它并继续观察。

目前,一旦发生错误,观察者就会停止。观察者必须继续并且不会因错误而重新启动或完成。

import * as Rx from "rxjs";
const source = [
  Rx.Observable.from("1").delay(200),
  Rx.Observable.from("2").delay(150),
  Rx.Observable.throw("error"),
  Rx.Observable.from("3").delay(124),
  Rx.Observable.from("4").delay(201),
];
let sSource = Rx.Observable.concat(...source);
sSource.subscribe((v) => {console.log(v)}, (e) => {console.log(e)});
Run Code Online (Sandbox Code Playgroud)

电流输出:

1
2
error
Run Code Online (Sandbox Code Playgroud)

预期输出:

1
2
error
3
4
Run Code Online (Sandbox Code Playgroud)

我们能想出的唯一解决方案是预先循环遍历sourceobservable 并单独向它们添加 catch 处理程序,然后一旦发生错误,它就会得到正确处理,观察者可以继续操作,而无需完成整个串联的 observable。

我们觉得应该有一个更优雅的解决方案。如果需要,我会发布我们目前的解决方案。

car*_*ant 7

您可以将catch运算符应用于每个源 observables,并可以在其中执行错误日志记录。像这样:

const sources = [
  Rx.Observable.from("1").delay(200),
  Rx.Observable.from("2").delay(150),
  Rx.Observable.throw("error"),
  Rx.Observable.from("3").delay(124),
  Rx.Observable.from("4").delay(201),
];
const sourcesWithCatch = sources.map(s => s.catch(e => {
  console.log(e);
  return Rx.Observable.empty();
}));
const concatted = Rx.Observable.concat(...sourcesWithCatch);
concatted.subscribe(v => console.log(v));
Run Code Online (Sandbox Code Playgroud)
.as-console-wrapper { max-height: 100% !important; top: 0; }
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)