从长时间运行的事件聚合器吞噬异常可观察

Ale*_*ill 6 c# system.reactive

我正在使用一个简单的Subject<object>方法在Web应用程序中实现Event Aggregator模式,如下所示:

public class EventAggregator
{
    private readonly ISubject<object, object> _subject = Subject.Synchronize(new Subject<object>());
    public IObservable<T> GetEvent<T>()
    {
        return _subject.AsObservable().OfType<T>();
    }

    public void Publish<TEvent>(TEvent sampleEvent)
    {
        _subject.OnNext(sampleEvent);
    }
}
Run Code Online (Sandbox Code Playgroud)

当运营商或订户抛出异常时,我想记录它然后忽略它并继续发布事件 - 本质上我希望流在某些意外行为的情况下"继续",因为事件聚合器在单个生命周期中具有单个生命周期.申请的背景.

这个问题中给出的答案是创建一个延迟的可观察和使用Retry(),但我不想在这里冷可观察.

我提出的解决方案是Catch在订阅者方法中使用和try-catch,我将其包含在扩展中:

public static IDisposable SubscribeSwallowExceptions<T>(this IObservable<T> source, Action<T> subscription, Action<Exception> log)
{
    source = source.Catch((Exception e) => { log(e); return source; });

    return source.Subscribe(x =>
    {
        try { subscription(x); }
        catch (Exception e) { log(e); }
    });
}
Run Code Online (Sandbox Code Playgroud)

我知道"catch-all"异常处理通常是不受欢迎的,但在这种情况下,我不确定我有什么其他选项,因为我希望订阅保留,即使抛出异常.我不知道可能会发生什么类型的异常,因为我还不知道在处理流时会做什么工作.

这是处理潜在异常的可接受方式吗?您能否预见到任何可能使我采用这种方法的问题?

Jam*_*rld 1

这是一个有问题的解决方案。一旦订阅者抛出异常,您必须真正假设他们此时已死在水中(您几乎无能为力并且不再进一步调用它们。通过您的方法,“变坏”的订阅者将继续发送事件,并且可能会无限地抛出异常。

此外,您不能真正依赖使用您的扩展方法的订阅者。我会GetEvent<T>按如下方式重写您的方法(显然Console.WriteLine用一些日志记录替换我的方法)。这种方法将处理由于不良订阅者而导致的订阅,并保持其他人运行。

public IObservable<T> GetEvent<T>()
{       
    return Observable.Create<T>(o =>
    {
        var source = _subject.OfType<T>();
        var m = new SingleAssignmentDisposable();
        m.Disposable = source.Subscribe(
            x => {
                try {
                    o.OnNext(x);                        
                }
                catch(Exception e) {
                    Console.WriteLine(e);
                    m.Dispose();
                }
            },
            e => {
                try {
                    o.OnError(e);                       
                }
                catch(Exception ex) {
                    Console.WriteLine(ex);
                }
                finally {
                    m.Dispose();
                }
            },
            () => {
                try {
                    o.OnCompleted();                        
                }
                catch(Exception e) {
                    Console.WriteLine(e);
                }
                finally {
                    m.Dispose();
                }
            }               
        );

        return m;
    });
}
Run Code Online (Sandbox Code Playgroud)