如何在SelectMany语句中处理异步方法的异常

Axe*_*ger 5 .net c# .net-4.0 system.reactive

我试图使用Rx异步处理一些任务,例如

var list = Enumerable.Range(0, 100)
    .ToObservable()
    .SelectMany(x => Observable.Start(() => {
        Console.WriteLine("Processing {0} ...", x);

        Thread.Sleep(100 * x % 3);

        if (x > 90) {
            Console.WriteLine("Procesing exception {0} > 90", x);
            throw new Exception("Value too large");
        }
        Console.WriteLine("Processing {0} completed.", x);
        return x;
    }))
    .Subscribe(
        x => { Console.WriteLine("Next [{0}]", x); },
        e => {
            Console.WriteLine("Exception:");
            Console.WriteLine(e.Message);
        },
        () => { Console.WriteLine("Complete"); }
    );
Run Code Online (Sandbox Code Playgroud)

我对此代码的问题是异常未传递给订阅者.所以,经过大量的尝试,我放弃了,并决定问这个简单的问题:

如何处理SelectMany语句中异步方法中引发的异常?

为了说清楚,最终的实现是一个同步的函数调用,可能会或可能不会抛出异常.目标是将其传递给订户,以便可以进一步处理(在特定情况下,将向用户显示消息).

编辑

我将我的发现归结为一个答案,以便我可以将这个问题标记为已回答.就个人而言,我不同意自我回答......但有时没有别的办法,所以很抱歉.

Axe*_*ger 2

答案

实际上代码工作正常。然而,调试器在异常处中断,因为异步操作仍然在后台执行 - 至少是那些在第一个异常发生时已经启动的操作。扔我了!如果您在没有调试器的情况下运行代码,则异常会被吞掉。所以我猜问题确实出在计算机前面:-)

仍然有一些关于我假设的澄清Observable.Start- 这是正确的 - 实现实际上应该实现一些错误处理......请参阅背景。

背景

Observable.Start是一种便捷方法,使用该Observable.ToAsync方法将函数/操作转换为异步操作。如果您查看该方法的实现,您会发现它已经执行了异常处理/转发。

public static Func<IObservable<TResult>> ToAsync<TResult>(this Func<TResult> function, IScheduler scheduler) {
    if (function != null) {
        if (scheduler != null) {
            return () => {
                AsyncSubject<TResult> asyncSubject = new AsyncSubject<TResult>();
                scheduler.Schedule(() => {
                    TResult result = default(TResult);
                    try {
                        result = function();
                    } catch (Exception exception1) {
                        Exception exception = exception1;
                        asyncSubject.OnError(exception);
                        return;
                    }
                    asyncSubject.OnNext(result);
                    asyncSubject.OnCompleted();
                });
                return asyncSubject.AsObservable<TResult>();
            };
        } else {
            throw new ArgumentNullException("scheduler");
        }
    } else {
        throw new ArgumentNullException("function");
    }
}
Run Code Online (Sandbox Code Playgroud)