RXJS - 捕捉观察者抛出的异常

Pet*_*ens 4 exception rxjs

我是 rxjs 的新手并且正在努力处理异常。

采取以下代码

let sub = new Subject();

let observer1:Observer<String> =  {
    next : v => {
        console.log("next1-" + v);
        if(v==='fail') {
            throw new Error("fail1");
        }
    },
    error : e => console.error("error1-" + e),
    complete : () => console.log("complete1")
};
sub.subscribe(observer1);

try {
    sub.next("msg1");
    sub.next("msg2");
    sub.next("fail");
    sub.next("msg3");
} catch(e) {
    console.log("Caught:" + e);
}

console.log("That's all");
Run Code Online (Sandbox Code Playgroud)

我知道在异常之后,主题基本上已经死了,而 msg3 永远不会成功。

我似乎无法弄清楚如何捕获 Observer 的 next 方法中抛出的异常。

我得到的输出是

> next1-msg1 next1-msg2 next1-fail That's all
> /Users/peter/playground/rxjsstuff/node_modules/rxjs/internal/util/hostReportError.js:9
>     setTimeout(function () { throw err; });
>                              ^
> 
> Error: fail1
>     at Object.next (/Users/peter/playground/rxjsstuff/dist/rxjs1.js:97:19)
>     at SafeSubscriber.__tryOrUnsub (/Users/peter/playground/rxjsstuff/node_modules/rxjs/internal/Subscriber.js:263:16)
>     at SafeSubscriber.next (/Users/peter/playground/rxjsstuff/node_modules/rxjs/internal/Subscriber.js:201:22)
>     at Subscriber._next (/Users/peter/playground/rxjsstuff/node_modules/rxjs/internal/Subscriber.js:139:26)
>     at Subscriber.next (/Users/peter/playground/rxjsstuff/node_modules/rxjs/internal/Subscriber.js:103:18)
>     at Subject.next (/Users/peter/playground/rxjsstuff/node_modules/rxjs/internal/Subject.js:63:25)
>     at Object.<anonymous> (/Users/peter/playground/rxjsstuff/dist/rxjs1.js:107:9)
>     at Module._compile (internal/modules/cjs/loader.js:678:30)
>     at Object.Module._extensions..js (internal/modules/cjs/loader.js:689:10)
>     at Module.load (internal/modules/cjs/loader.js:589:32)
Run Code Online (Sandbox Code Playgroud)

我想我在这里遗漏了一些基本的东西,但我似乎找不到什么。

我想高层次的问题是 - 当 Subject 调用 next 时,如何处理 Observer 的 next 方法中抛出的错误/异常?

欢迎所有提示!

发送

彼得

car*_*ant 5

简短的回答是你不能也不应该。一个可观察的源如何知道它的观察者会抛出什么样的错误?

长的答案是 RxJS 中的错误处理已经改变 - 在版本 6 中变得更好。

如果您查看nextin的实现,Subject您会发现没有错误处理:

next(value?: T) {
  if (this.closed) {
    throw new ObjectUnsubscribedError();
  }
  if (!this.isStopped) {
    const { observers } = this;
    const len = observers.length;
    const copy = observers.slice();
    for (let i = 0; i < len; i++) {
      copy[i].next(value);
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

主题只是循环遍历它的观察者和next每个观察者。

然而,每个观察者都被包裹在一个Subscriber. 如果您查看 的源代码subscribe,您会看到 aSubscriber是通过将观察者传递给来创建的toSubscriber

特别是,创建的订阅者是SafeSubscriber. 这就是错误处理的地方。

如果你看一下nextSafeSubscriber,你会看到__tryOrUnsub被称为:

next(value?: T): void {
  if (!this.isStopped && this._next) {
    const { _parentSubscriber } = this;
    if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
      this.__tryOrUnsub(this._next, value);
    } else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
      this.unsubscribe();
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

__tryOrUnsub将尝试调用观察者的next,如果发生错误,它将从源取消订阅观察者。

任何被捕获的错误__tryOrUnsub都将报告使用hostReportError- 异步抛出错误,以便调用堆栈不会展开。这样做是为了使一个观察者中发生的错误不会影响其他观察者。

如果您在示例中添加第二个观察者 - 不会抛出 - ,您应该看到第二个观察者的行为与您期望的一样并接收"msg3"

Ben Lesh 在最近的一次演讲中解释了这些变化——以及做出这些变化的原因。你可能想检查一下。