RxJS如何忽略catch的错误并继续前进

dev*_*guy 9 javascript error-handling exception rxjs5

嗨,我有以下代码,我想知道如何防止主(上游)Observable在抛出错误时被删除.

如何更改以下代码以便显示所有期望"4"的数字?

我正在寻找一种通用模式解决方案,可以在其他情况下使用不同的运算符.这是我能提出的最简单的案例.

const Rx = require('rxjs/Rx');

function checkValue(n) {
  if(n === 4) {
    throw new Error("Bad value");
  }
  return true;
}
const source = Rx.Observable.interval(100).take(10);

source.filter(x => checkValue(x))
  .catch(err => Rx.Observable.empty())
  .subscribe(v => console.log(v));
Run Code Online (Sandbox Code Playgroud)

Ber*_*nez 12

您将希望保持源可观察性运行,但如果您在主事件流上发生错误,它将折叠整个可观察对象,您将不再接收项目.

解决方案涉及创建一个独立的流,您可以在其中过滤和捕获,而不会让上游管道崩溃.

const Rx = require('rxjs/Rx');
function checkValue(n) {
  if(n === 4) {
    throw new Error("Bad value");
  }
  return true;
}
const source = Rx.Observable.interval(100).take(10);

source
  // pass the item into the projection function of the switchMap operator
  .switchMap(x => {
     // we create a new stream of just one item
     // this stream is created for every item emitted by the source observable
     return Observable.of(x)
       // now we run the filter
       .filter(checkValue)
       // we catch the error here within the projection function
       // on error this upstream pipe will collapse, but that is ok because it starts within this function and will not effect the source
       // the downstream operators will never see the error so they will also not be effect
       .catch(err => Rx.Observable.empty());
     })
     .subscribe(v => console.log(v));
Run Code Online (Sandbox Code Playgroud)

您还可以使用传递给catch选择器的第二个参数来重新启动可观察源,但这将启动它,就像它之前没有运行一样.

const Rx = require('rxjs/Rx');

function checkValue(n) {
  if(n === 4) {
    throw new Error("Bad value");
  }
  return true;
}
const source = Rx.Observable.interval(100).take(10);

source.filter(x => checkValue(x))
  .catch((err, source) => source)
  .subscribe(v => console.log(v));
Run Code Online (Sandbox Code Playgroud)

但这并没有达到预期的效果.您将看到一个流重复发出1..3直到时间结束...或者您关闭脚本.以先到者为准.(这是必不可少的.retry())


zin*_*ndo 0

您需要使用 flatMap 运算符来进行过滤。在此示例中的 flatMap 中,我使用 Observable.if() 进行过滤,因为它保证我始终返回可观察值。我确信您可以通过其他方式做到这一点,但这对我来说是一个干净的实现。

const source = Rx.Observable.interval(100).take(10).flatMap((x)=>
    Rx.Observable.if(() => x !== 4, 
    Rx.Observable.of(x),
    Rx.Observable.throw("Bad value"))
    .catch((err) => {
        return Rx.Observable.empty()
    })
);

source.subscribe(v => console.log(v));
Run Code Online (Sandbox Code Playgroud)