为什么在使用Where或OfType运算符时抛出异常后,被动扩展会停止调用订阅者?

Rob*_*bin 4 c# system.reactive

Test1通过.为什么Test2和Test3失败?我正在使用.NET 4.0和Rx 2.0.

[TestClass]
public class RxQuestion
{
   private Subject<string> sequence;
   [TestInitialize] public void Intialize() { sequence = new Subject<string>(); }
   [TestMethod] public void Test1() { Test(sequence); }
   [TestMethod] public void Test2() { Test(sequence.Where(s => true)); }
   [TestMethod] public void Test3() { Test(sequence.OfType<string>()); }
   private void Test(IObservable<string> observable)
   {
      var observed = string.Empty;
      observable.Subscribe(s => { observed = s; if (s == "a") throw new Exception(); });
      try { sequence.OnNext("a"); } catch { }
      sequence.OnNext("b");
      Assert.AreEqual("b", observed);
   }
}
Run Code Online (Sandbox Code Playgroud)

Lee*_*ell 13

对我来说真正的问题是为什么Test1会通过?对我来说,看起来这种Subject<T>类型并没有像所有其他实现一样遵循相同的规则IObservable<T>.

在更深入的检查(确实反映好),你可以拆分Subject<T>DotPeek/Reflector中的类型,并看到在进行OnNext(T)调用时,它被直接委托给它的_observer实例.在任何订阅之前,这只是一个NullObject/NopObserver.订阅完成后(通常),观察者是一个Observer<T>实现.这个实现实际上是IObserver<T>接口的复合模式实现,它只调用它OnNext(T)的每个实例.

此外,如果我们考虑使用Subscribe的扩展方法,只需要一个OnNext处理程序,我们现在知道我们真正的实现IObserver<T>是一个AnonymousObserver<T>.打开这个,我们看到任何呼叫OnNext(T)都基本上没有受到保护.

现在让我们将其与IObservable<T>来自WhereCast运算符的实现进行比较.这两种扩展方法都将返回一个IObservable<T>扩展Producer<T>该类的实现.当对这些可观察序列之一进行预订时,预订观察者被SafeObserver<T>实现包装.这是关键的区别.

如果我们研究这个实现,我们会看到,对于我们的代码路径,我们的anon观察者将调用其MakeSafe方法.这现在用try/finally包装对OnNext的任何调用.

public void OnNext(T value)
{
    if(this.isStopped!=0)
        return;
    bool flag = true;
    try
    {
        this._onNext(value);
        flag=true;        //Flag only set if OnNext doesn't throw!!
    }
    finally
    {
        if(!flag)
            this._disposable.Dispose();
    }
}
Run Code Online (Sandbox Code Playgroud)

注意,一旦我们有一个安全的观察者,如果有任何OnNext处理程序抛出,那么flag将不会设置为true并且_disposable实例被释放.在这种情况下,_disposable实例表示订阅.

因此,您可以解释为什么原始Subject通过测试以及看似无害的操作员导致行为改变的地方.

至于为什么Subject<T>默认情况下不会出现这种情况,我想这是由于2.0版本中的性能改进.我觉得主题是针对原始性能进行调整的,并假设如果你足够勇敢地使用它们,那么你就知道自己在做什么(即不要丢弃你的OnNext处理程序!).我基于这样的假设,他们默认情况下也从主题中删除了并发安全性,你必须使用Synchronize()扩展方法打开它,也许他们还认为所有这些额外的try/finally调用只应在你选择时付费.选择这些安全功能的方法是做你上面Where(_=>true)或更常见的事情AsObservable().