Axe*_*ger 5 .net c# .net-4.0 system.reactive
我试图使用Rx异步处理一些任务,例如
var list = Enumerable.Range(0, 100)
.ToObservable()
.SelectMany(x => Observable.Start(() => {
Console.WriteLine("Processing {0} ...", x);
Thread.Sleep(100 * x % 3);
if (x > 90) {
Console.WriteLine("Procesing exception {0} > 90", x);
throw new Exception("Value too large");
}
Console.WriteLine("Processing {0} completed.", x);
return x;
}))
.Subscribe(
x => { Console.WriteLine("Next [{0}]", x); },
e => {
Console.WriteLine("Exception:");
Console.WriteLine(e.Message);
},
() => { Console.WriteLine("Complete"); }
);
Run Code Online (Sandbox Code Playgroud)
我对此代码的问题是异常未传递给订阅者.所以,经过大量的尝试,我放弃了,并决定问这个简单的问题:
如何处理SelectMany语句中异步方法中引发的异常?
为了说清楚,最终的实现是一个同步的函数调用,可能会或可能不会抛出异常.目标是将其传递给订户,以便可以进一步处理(在特定情况下,将向用户显示消息).
编辑
我将我的发现归结为一个答案,以便我可以将这个问题标记为已回答.就个人而言,我不同意自我回答......但有时没有别的办法,所以很抱歉.
答案
实际上代码工作正常。然而,调试器在异常处中断,因为异步操作仍然在后台执行 - 至少是那些在第一个异常发生时已经启动的操作。扔我了!如果您在没有调试器的情况下运行代码,则异常会被吞掉。所以我猜问题确实出在计算机前面:-)
仍然有一些关于我假设的澄清Observable.Start- 这是正确的 - 实现实际上应该实现一些错误处理......请参阅背景。
背景
Observable.Start是一种便捷方法,使用该Observable.ToAsync方法将函数/操作转换为异步操作。如果您查看该方法的实现,您会发现它已经执行了异常处理/转发。
public static Func<IObservable<TResult>> ToAsync<TResult>(this Func<TResult> function, IScheduler scheduler) {
if (function != null) {
if (scheduler != null) {
return () => {
AsyncSubject<TResult> asyncSubject = new AsyncSubject<TResult>();
scheduler.Schedule(() => {
TResult result = default(TResult);
try {
result = function();
} catch (Exception exception1) {
Exception exception = exception1;
asyncSubject.OnError(exception);
return;
}
asyncSubject.OnNext(result);
asyncSubject.OnCompleted();
});
return asyncSubject.AsObservable<TResult>();
};
} else {
throw new ArgumentNullException("scheduler");
}
} else {
throw new ArgumentNullException("function");
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1425 次 |
| 最近记录: |