我对Rx.NET有些新意.是否有可能捕获任何订阅者可能抛出的异常?采取以下措施......
handler.FooStream.Subscribe(
_ => throw new Exception("Bar"),
_ => { });
Run Code Online (Sandbox Code Playgroud)
目前,我正在以每个订阅为基础,使用以下实例.其实现只使用ManualResetEvent来唤醒等待的线程.
public interface IExceptionCatcher
{
Action<T> Exec<T>(Action<T> action);
}
Run Code Online (Sandbox Code Playgroud)
并像这样使用它......
handler.FooStream.Subscribe(
_exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred
_ => { });
Run Code Online (Sandbox Code Playgroud)
我觉得必须有更好的方法.Rx.NET中的所有错误处理功能是否专门用于处理可观察到的错误?
编辑:根据请求,我的实现是https://gist.github.com/1409829(接口和实现分为prod代码中的不同程序集).欢迎反馈.这可能看起来很愚蠢,但我正在使用城堡windsor来管理许多不同的Rx用户.此异常捕获器已在此容器中注册
windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher));
Run Code Online (Sandbox Code Playgroud)
然后就像这样使用observableIObservable的实例:
var exceptionCatcher =
new ExceptionCatcher(e =>
{
Logger.FatalException(
"Exception caught, shutting down.", e);
// Deal with unmanaged resources here
}, false);
/*
* Normally the code below exists in some class managed …Run Code Online (Sandbox Code Playgroud) c# asynchronous exception-handling subscription system.reactive
我正在使用一个简单的Subject<object>方法在Web应用程序中实现Event Aggregator模式,如下所示:
public class EventAggregator
{
private readonly ISubject<object, object> _subject = Subject.Synchronize(new Subject<object>());
public IObservable<T> GetEvent<T>()
{
return _subject.AsObservable().OfType<T>();
}
public void Publish<TEvent>(TEvent sampleEvent)
{
_subject.OnNext(sampleEvent);
}
}
Run Code Online (Sandbox Code Playgroud)
当运营商或订户抛出异常时,我想记录它然后忽略它并继续发布事件 - 本质上我希望流在某些意外行为的情况下"继续",因为事件聚合器在单个生命周期中具有单个生命周期.申请的背景.
在这个问题中给出的答案是创建一个延迟的可观察和使用Retry(),但我不想在这里冷可观察.
我提出的解决方案是Catch在订阅者方法中使用和try-catch,我将其包含在扩展中:
public static IDisposable SubscribeSwallowExceptions<T>(this IObservable<T> source, Action<T> subscription, Action<Exception> log)
{
source = source.Catch((Exception e) => { log(e); return source; });
return source.Subscribe(x =>
{
try { subscription(x); }
catch (Exception e) { log(e); }
});
}
Run Code Online (Sandbox Code Playgroud)
我知道"catch-all"异常处理通常是不受欢迎的,但在这种情况下,我不确定我有什么其他选项,因为我希望订阅保留,即使抛出异常.我不知道可能会发生什么类型的异常,因为我还不知道在处理流时会做什么工作. …
每次观察到事件时,我都需要对其进行预处理和后处理。这是我想出的东西,它有效:
var subj = new Subject<Event>();
var observable = Observable.Create<Event>(obs =>
{
subj.Finally(obs.OnCompleted);
return subj.Subscribe(e =>
{
try
{
Preprocess(e);
obs.OnNext(e);
Postprocess(e);
}
catch (Exception ex) { obs.OnError(ex); }
});
});
Run Code Online (Sandbox Code Playgroud)
我的问题:这是正确的方法,还是有更好的模板/扩展方法?