C#Rx - 忽略错误

not*_*row 2 c# system.reactive

我有独立事件流,与Reactive扩展异步处理.处理程序可能因任何原因而失败,但流继续.

但是,在Rx中,错误发生后,它会自动取消订阅.这有点可配置吗?

例:

async Task<Unit> ActionAsync(int i)
{
    if (i > 1)
        throw new Exception();

    i.Dump();       
    return Unit.Default;
}

void Main()
{
    var sb = new Subject<int>();

    sb.SelectMany(ActionAsync).Subscribe(
        _ => { },
        ex =>
        {
            ex.Dump();
        }
    );


    sb.OnNext(1);
    sb.OnNext(2);
    sb.OnNext(3);
}
Run Code Online (Sandbox Code Playgroud)

我想要以下输出:

  • 1
  • 例外
  • 3

我可以在不尝试/捕捉的情况下实现这一目标ActionAsync吗?

Eni*_*ity 6

在Rx中存在一个行为契约,其中流只能是OnNext*(OnError|OnCompleted).换句话说,零个或多个OnNext,只有一个OnErrorOnCompleted最后一个.

所以,不,你不能配置Rx.如果你这样做,它将不再是Rx.

但是,您可以执行的操作是编写可以重试源的查询.

如果您编写如下代码:

async Task<int> ActionAsync(int i)
{
    if (i == 2)
        throw new Exception();

    return i;
}

void Main()
{
    var sb = new Subject<int>();

    sb
        .SelectMany(ActionAsync)
        .Do(_ => { }, ex => ex.Dump())
        .Retry()
        .Subscribe(_ => _.Dump());

    sb.OnNext(1);
    sb.OnNext(2);
    sb.OnNext(3);
}
Run Code Online (Sandbox Code Playgroud)

然后你得到:

1
Exception of type 'System.Exception' was thrown. 
3

根据您对性能问题的评论,使用中没有任何性能问题.Retry(),但存在行为问题.

如果源是冷的 - var sb = new [] { 1, 2, 3 }.ToObservable();那么.Retry()将再次从整个可观察序列开始并导致无限序列:

1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
1
Exception of type 'System.Exception' was thrown. 
...

在你的代码中,observable是一个热点可观察的,所以这不会发生.

如果你想在寒冷的观察中做到这一点,你需要让它变热.Publish(...).像这样:

var sb = new[] { 1, 2, 3 }.ToObservable();

sb
    .Publish(sbp =>
        sbp
            .SelectMany(ActionAsync)
            .Do(_ => { }, ex => ex.Dump())
            .Retry())
    .Subscribe(_ => _.Dump());
Run Code Online (Sandbox Code Playgroud)

然后预期的行为返回.