RX - 在包含方法中重新抛出错误

Chr*_*sCa 5 .net c# reactive-programming system.reactive

我需要将 RX 流 ( IObservable) 中的错误转换为包含对流的订阅的方法中的异常

(因为这个问题https://github.com/aspnet/SignalR/pull/1331,因此错误不会序列化到客户端。)一旦这个问题得到解决,我将恢复正确处理错误

例如

我有以下方法

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    return _mySvc.ThingChanged();
}
Run Code Online (Sandbox Code Playgroud)

所以我尝试订阅流并重新抛出错误,但它仍然没有传输到客户端:

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    _mySvc.ThingChanged().Subscribe(item => {}, OnError, () => {});
    return _mySvc.ThingChanged();
}

private void OnError(Exception exception)
{
    throw new Exception(exception.Message);
}
Run Code Online (Sandbox Code Playgroud)

我需要的是投入 LiveStream 方法的等价物

例如这个错误被传播给客户端

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    throw new Exception("some error message");
    return _mySvc.ThingChanged();
}
Run Code Online (Sandbox Code Playgroud)

任何想法如何实现这一目标?

Eni*_*ity 0

试试这个代码:

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    return
        _mySvc
            .ThingChanged()
            .Materialize()
            .Do(x =>
            {
                if (x.Kind == NotificationKind.OnError)
                {
                    OnError(x.Exception);
                }
            })
            .Dematerialize();
}
Run Code Online (Sandbox Code Playgroud)

我不确定这是最好的方法 - 抛出这样的异常可能会导致您在流中感到悲伤,最终导致错误的异常处理程序触发。您可能需要寻找其他解决方案。