Reactive Extensions吞下在线程池线程上调用的OnNext()的异常?

Gar*_*ang 4 c# reactive-programming system.reactive

我在.Net 4.5中使用Rx 2.当以下代码运行时,它只是静默退出而不执行OnCompleted委托或显示任何错误.如果我使用Scheduler.CurrentThreadin ToObservable,它将至少抛出错误并终止程序,此时不执行OnCompleted是有意义的.但是当它在主线程之外的线程中执行时,这种行为似乎是不合理的,也是不可接受的.我想念什么吗?

static void Main()
{
     Enumerable.Range(0, 1)
                           .ToObservable(Scheduler.Default)
                           .Subscribe(o => { throw new Exception("blah"); }, () => Console.WriteLine("completed"));

     Thread.Sleep(2000);
 }
Run Code Online (Sandbox Code Playgroud)

编辑: 是的,当作为控制台应用程序运行时,无论执行观察的是什么线程,它都将始终抛出错误.

但是,当我在NUnit中将此代码作为测试运行时,如下所示,它在2秒后(线程休眠时间)静默退出,没有任何错误或消息(期望"已完成").那么实际上NUnit会导致这个问题吗?

[TestFixture]
class Program
{
    [Test]
    public void Test()
    {
        Enumerable.Range(0, 1)
                .ToObservable(Scheduler.Default)
                .Subscribe(
                    o => { throw new Exception("blah"); }, 
                    () => Console.WriteLine("completed"));
        Thread.Sleep(2000);
    }
}
Run Code Online (Sandbox Code Playgroud)

Dav*_*ton 9

Rx不会捕获观察者抛出的异常.这是一个非常重要的设计原则,之前已经详细讨论过,但由于某种原因,它仅作为Rx设计指南中§6.4的脚注.

注意:不要保护对Subscribe,Dispose,OnNext,OnErrorOnCompleted方法的调用.这些电话是在monad的边缘.从这些位置调用OnError方法将导致意外行为.

从本质上讲,本指南确保从观察者的角度来看,OnError只会被源自observable本身的异常调用,包括对直接参与计算的用户代码的任何调用(而不仅仅是观察结果).如果不是这种情况,则观察者可能无法区分传递给OnError的异常是否是其OnNext处理程序中的错误,或者可能是observable中的错误.

但更重要的是,它还确保OnNext处理程序抛出的任何异常都不会被处理.这样可以更轻松地调试程序并保护用户数据.

话虽这么说,当您在池化线程上执行OnNext时,您可能正在观察不同行为的原因仅仅是您的调试体验的结果.尝试启用第一次机会异常.

此外,我还会通过改变Thread.Sleep来避免竞争条件Console.ReadKey().