相关疑难解决方法(0)

捕获可能从Subscription OnNext Action抛出的异常

我对Rx.NET有些新意.是否有可能捕获任何订阅者可能抛出的异常?采取以下措施......

handler.FooStream.Subscribe(
            _ => throw new Exception("Bar"),
            _ => { });
Run Code Online (Sandbox Code Playgroud)

目前,我正在以每个订阅为基础,使用以下实例.其实现只使用ManualResetEvent来唤醒等待的线程.

public interface IExceptionCatcher
{
    Action<T> Exec<T>(Action<T> action);
}
Run Code Online (Sandbox Code Playgroud)

并像这样使用它......

handler.FooStream.Subscribe(
            _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred
            _ => { });
Run Code Online (Sandbox Code Playgroud)

我觉得必须有更好的方法.Rx.NET中的所有错误处理功能是否专门用于处理可观察到的错误?

编辑:根据请求,我的实现是https://gist.github.com/1409829(接口和实现分为prod代码中的不同程序集).欢迎反馈.这可能看起来很愚蠢,但我正在使用城堡windsor来管理许多不同的Rx用户.此异常捕获器已在此容器中注册

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher));
Run Code Online (Sandbox Code Playgroud)

然后就像这样使用observableIObservable的实例:

var exceptionCatcher =
    new ExceptionCatcher(e =>
                                {
                                    Logger.FatalException(
                                        "Exception caught, shutting down.", e);
                                    // Deal with unmanaged resources here
                                }, false);


/* 
 * Normally the code below exists in some class managed …
Run Code Online (Sandbox Code Playgroud)

c# asynchronous exception-handling subscription system.reactive

9
推荐指数
1
解决办法
3004
查看次数

从长时间运行的事件聚合器吞噬异常可观察

我正在使用一个简单的Subject<object>方法在Web应用程序中实现Event Aggregator模式,如下所示:

public class EventAggregator
{
    private readonly ISubject<object, object> _subject = Subject.Synchronize(new Subject<object>());
    public IObservable<T> GetEvent<T>()
    {
        return _subject.AsObservable().OfType<T>();
    }

    public void Publish<TEvent>(TEvent sampleEvent)
    {
        _subject.OnNext(sampleEvent);
    }
}
Run Code Online (Sandbox Code Playgroud)

当运营商或订户抛出异常时,我想记录它然后忽略它并继续发布事件 - 本质上我希望流在某些意外行为的情况下"继续",因为事件聚合器在单个生命周期中具有单个生命周期.申请的背景.

这个问题中给出的答案是创建一个延迟的可观察和使用Retry(),但我不想在这里冷可观察.

我提出的解决方案是Catch在订阅者方法中使用和try-catch,我将其包含在扩展中:

public static IDisposable SubscribeSwallowExceptions<T>(this IObservable<T> source, Action<T> subscription, Action<Exception> log)
{
    source = source.Catch((Exception e) => { log(e); return source; });

    return source.Subscribe(x =>
    {
        try { subscription(x); }
        catch (Exception e) { log(e); }
    });
}
Run Code Online (Sandbox Code Playgroud)

我知道"catch-all"异常处理通常是不受欢迎的,但在这种情况下,我不确定我有什么其他选项,因为我希望订阅保留,即使抛出异常.我不知道可能会发生什么类型的异常,因为我还不知道在处理流时会做什么工作. …

c# system.reactive

6
推荐指数
1
解决办法
420
查看次数

在将 Rx 事件提供给订阅者之前和之后修改 C# 中的 Rx 事件

每次观察到事件时,我都需要对其进行预处理和后处理。这是我想出的东西,它有效:

var subj = new Subject<Event>();
var observable = Observable.Create<Event>(obs =>
{
    subj.Finally(obs.OnCompleted); 
    return subj.Subscribe(e =>
    {
        try
        {
            Preprocess(e);
            obs.OnNext(e);
            Postprocess(e);
        }
        catch (Exception ex) { obs.OnError(ex); }
    });
});
Run Code Online (Sandbox Code Playgroud)

我的问题:这是正确的方法,还是有更好的模板/扩展方法?

c# system.reactive

3
推荐指数
1
解决办法
191
查看次数