not*_*row 2 c# system.reactive
我有独立事件流,与Reactive扩展异步处理.处理程序可能因任何原因而失败,但流继续.
但是,在Rx中,错误发生后,它会自动取消订阅.这有点可配置吗?
例:
async Task<Unit> ActionAsync(int i)
{
if (i > 1)
throw new Exception();
i.Dump();
return Unit.Default;
}
void Main()
{
var sb = new Subject<int>();
sb.SelectMany(ActionAsync).Subscribe(
_ => { },
ex =>
{
ex.Dump();
}
);
sb.OnNext(1);
sb.OnNext(2);
sb.OnNext(3);
}
Run Code Online (Sandbox Code Playgroud)
我想要以下输出:
我可以在不尝试/捕捉的情况下实现这一目标ActionAsync吗?
在Rx中存在一个行为契约,其中流只能是OnNext*(OnError|OnCompleted).换句话说,零个或多个OnNext,只有一个OnError或OnCompleted最后一个.
所以,不,你不能配置Rx.如果你这样做,它将不再是Rx.
但是,您可以执行的操作是编写可以重试源的查询.
如果您编写如下代码:
async Task<int> ActionAsync(int i)
{
if (i == 2)
throw new Exception();
return i;
}
void Main()
{
var sb = new Subject<int>();
sb
.SelectMany(ActionAsync)
.Do(_ => { }, ex => ex.Dump())
.Retry()
.Subscribe(_ => _.Dump());
sb.OnNext(1);
sb.OnNext(2);
sb.OnNext(3);
}
Run Code Online (Sandbox Code Playgroud)
然后你得到:
1 Exception of type 'System.Exception' was thrown. 3
根据您对性能问题的评论,使用中没有任何性能问题.Retry(),但存在行为问题.
如果源是冷的 - var sb = new [] { 1, 2, 3 }.ToObservable();那么.Retry()将再次从整个可观察序列开始并导致无限序列:
1 Exception of type 'System.Exception' was thrown. 1 Exception of type 'System.Exception' was thrown. 1 Exception of type 'System.Exception' was thrown. 1 Exception of type 'System.Exception' was thrown. 1 Exception of type 'System.Exception' was thrown. 1 Exception of type 'System.Exception' was thrown. ...
在你的代码中,observable是一个热点可观察的,所以这不会发生.
如果你想在寒冷的观察中做到这一点,你需要让它变热.Publish(...).像这样:
var sb = new[] { 1, 2, 3 }.ToObservable();
sb
.Publish(sbp =>
sbp
.SelectMany(ActionAsync)
.Do(_ => { }, ex => ex.Dump())
.Retry())
.Subscribe(_ => _.Dump());
Run Code Online (Sandbox Code Playgroud)
然后预期的行为返回.
| 归档时间: |
|
| 查看次数: |
259 次 |
| 最近记录: |