捕获可能从Subscription OnNext Action抛出的异常

drs*_*ens 9 c# asynchronous exception-handling subscription system.reactive

我对Rx.NET有些新意.是否有可能捕获任何订阅者可能抛出的异常?采取以下措施......

handler.FooStream.Subscribe(
            _ => throw new Exception("Bar"),
            _ => { });
Run Code Online (Sandbox Code Playgroud)

目前,我正在以每个订阅为基础,使用以下实例.其实现只使用ManualResetEvent来唤醒等待的线程.

public interface IExceptionCatcher
{
    Action<T> Exec<T>(Action<T> action);
}
Run Code Online (Sandbox Code Playgroud)

并像这样使用它......

handler.FooStream.Subscribe(
            _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred
            _ => { });
Run Code Online (Sandbox Code Playgroud)

我觉得必须有更好的方法.Rx.NET中的所有错误处理功能是否专门用于处理可观察到的错误?

编辑:根据请求,我的实现是https://gist.github.com/1409829(接口和实现分为prod代码中的不同程序集).欢迎反馈.这可能看起来很愚蠢,但我正在使用城堡windsor来管理许多不同的Rx用户.此异常捕获器已在此容器中注册

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher));
Run Code Online (Sandbox Code Playgroud)

然后就像这样使用observableIObservable的实例:

var exceptionCatcher =
    new ExceptionCatcher(e =>
                                {
                                    Logger.FatalException(
                                        "Exception caught, shutting down.", e);
                                    // Deal with unmanaged resources here
                                }, false);


/* 
 * Normally the code below exists in some class managed by an IoC container.
 * 'catcher' would be provided by the container.
 */
observable /* do some filtering, selecting, grouping etc */
    .SubscribeWithExceptionCatching(processItems, catcher);
Run Code Online (Sandbox Code Playgroud)

Gid*_*rth 9

内置的Observable操作符默认情况下不会执行您所要求的操作(很像事件),但您可以创建一个执行此操作的扩展方法.

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
                                this IObservable<T> source
                               ) where TException : Exception
{
    return Observable.CreateWithDisposable<T>(
        o => source.Subscribe(
            v => { try { o.OnNext(v); }
                   catch (TException) { }
            },
            ex => o.OnError(ex),
            () => o.OnCompleted()
            ));
}
Run Code Online (Sandbox Code Playgroud)

然后可以使用此方法包装任何observable以获得您描述的行为.

  • @drstevens 它将捕获来自同一线程的异常。如果您的观察者正在启动自己的异步操作,抛出异常,则不会捕获这些异常。 (2认同)
  • 我编辑了答案,因为较新的 Rx 包不再具有 `Observable.CreateWithDisposable` 方法。相反,他们有一个重载的 [`Observable.Create`](https://github.com/dotnet/reactive/blob/main/Rx.NET/Source/src/System.Reactive/Linq/Observable.Creation.cs) 方法。 (2认同)