如果中间有linq方法,则rx处理异常订阅

ie1*_*ie1 1 c# reactive-programming system.reactive

当我订阅有时抛出异常的方法时,我会得到2个不同的行为.如果我在中间连接LINQ方法,那么订阅就会被处理掉,另外一点也不是,为什么呢?

void main(){
  var numbersSubject=new Subject<int>();

  numbersSubject.subscribe(throwMethod);   // 1,2,3,4,6,7,8,9,10
  // numbersSubject.select(num=>num).subscribe(throwMethod);   // 1,2,3,4

  for(int i=0;i<10;i++)
  {
    try{
      numbersSubject.OnNext(i);
    }catch{}
  }
}

void throwMethod(int num)
{
   if(num==5)
       throw new Exception();
   Console.writeLine(i);
}
Run Code Online (Sandbox Code Playgroud)

Jam*_*rld 8

所以,澄清一下:

当运行没有 LINQ运算符的版本时,我们会看到:

0,1,2,3,4,6,7,8,9
Run Code Online (Sandbox Code Playgroud)

当运行LINQ运算符的版本时,我们看到:

0,1,2,3,4
Run Code Online (Sandbox Code Playgroud)

值得注意的是,当您向第二个版本订阅一个好的和坏的行为Observer时,您将获得如下所示的输出(注释中的输出),如下所示:

numbersSubject.Subscribe(throwMethod);   
var source = numbersSubject.Select(num=>num);
source.Subscribe(Console.WriteLine); // 0,1,2,3,4,5,6,7,8,9
source.Subscribe(throwMethod);   // 0,1,2,3,4
Run Code Online (Sandbox Code Playgroud)

请注意,"好"观察者获取所有事件.

原因是构建运算符有一个保护层来处理坏Observer 的订阅.

从Rx源代码我们看到:

为了适当的资源清理,需要保护管道免受恶意观察员的侵害.请考虑以下示例:

var xs  = Observable.Interval(TimeSpan.FromSeconds(1));
var ys  = <some random sequence>;
var res = xs.CombineLatest(ys, (x, y) => x + y);
Run Code Online (Sandbox Code Playgroud)

上面查询的大理石图如下所示:

xs  -----0-----1-----2-----3-----4-----5-----6-----7-----8-----9---...
               |     |     |     |     |     |     |     |     |
ys  --------4--+--5--+-----+--2--+--1--+-----+-----+--0--+-----+---...
            |  |  |  |     |  |  |  |  |     |     |  |  |     |
            v  v  v  v     v  v  v  v  v     v     v  v  v     v
res --------4--5--6--7-----8--5--6--5--6-----7-----8--7--8-----9---...
                              |
                             @#&
Run Code Online (Sandbox Code Playgroud)

注意Rx的自由线程特性,其中结果序列上的消息由两个输入序列中的任何一个产生CombineLatest.

现在假设在上面标记的指示点OnNext的观察者的回调中发生异常.回调在上下文中运行,因此异常将取消调度程序线程.这本身就是一个问题(可以通过运算符来缓解),但请注意生成的计时器是如何保持活动的.res@#&ysysCatchISchedulerxs

安全保护代码确保在用户回调抛出时处理所获取的资源.


以上是在称为内部类中实现AutoDetachObserver和包裹通过SafeObserver所使用的大多数内部运算符.

所有这一切都是使用try...finally异常处理程序包装每个OnXXX调用,其中finally块在发生错误时处理订阅 - 例如OnNext:

        var __noError = false;
        try
        {
            observer.OnNext(value);
            __noError = true;
        }
        finally
        {
            if (!__noError)
                Dispose();
        }
Run Code Online (Sandbox Code Playgroud)

Subject,出于性能原因,没有这层保护.添加它(以及防止其他滥用)的快速方法是将Synchronize()操作符添加到主题中.例如:

var numbersSubject = new Subject<int>();
var source = numbersSubject.Synchronize();
source.Subscribe(throwMethod);
Run Code Online (Sandbox Code Playgroud)

会输出

0,1,2,3,4,6,7,8,9
Run Code Online (Sandbox Code Playgroud)

Synchronize如图所示添加:

var numbersSubject = new Subject<int>();
var source = numbersSubject.Synchronize();
source.Synchronize().Subscribe(throwMethod);
Run Code Online (Sandbox Code Playgroud)

会输出

0,1,2,3,4
Run Code Online (Sandbox Code Playgroud)

与其他内置运算符一致(加上你实现的任何运算符Observable.Create).

(编辑)关于处理观察员例外的说明

在评论的提示下,这里有一些关于处理从观察者抛出的异常的附加说明.

当异常来自OnNext处理程序时,它位于堆栈的Rx代码之下,因此无法负责地将其"返回"传递给用户.此时用户必须被视为死亡.我们可以合理地做的就是处理订阅并清理由此产生的资源.这是基于推送的代码的结果.与IEnumerable对比,可以将异常抛出到客户端代码,因为它是客户端执行拉动.

请注意,某些运算符包含用户提供的逻辑(如Where运算符中的谓词表达式),它将通过OnError通道错误传播给观察者,但是一旦观察者通过在其自己的代码中抛出异常而死亡 - 就是这样.它不会再通过任何OnXXX方法调用.

这部史诗般的Bart de Smet帖子的一半左右开始,还有更多的内容.