相关疑难解决方法(0)

如何处理RxJava中观察者的onNext引发的异常?

请考虑以下示例:

Observable.range(1, 10).subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)

这将输出1到5之间的数字,然后打印异常.

我想要实现的是让观察者保持订阅并在抛出异常后继续运行,即打印从1到10的所有数字.

我已经尝试过使用retry()其他各种错误处理操作符,但是,如文档中所述,它们的目的是处理observable本身发出的错误.

最简单的解决方案就是将整个主体包装onNext成一个try-catch块,但这对我来说听起来不是一个好方法.在类似的Rx.NET问题中,建议的解决方案是创建一个扩展方法,通过创建代理可观察来进行包装.我试图重拍它:

Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
        origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));

proxy.subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)

这不会改变任何东西,因为RxJava本身将订阅者包装成一个SafeSubscriber.使用unsafeSubscribe它来解决它似乎也不是一个好的解决方案.

我该怎么做才能解决这个问题?

java reactive-programming system.reactive rx-java

27
推荐指数
1
解决办法
6656
查看次数

使用ObserveOn时如何处理OnNext中的异常?

OnNext当我使用时,观察者抛出错误时我的应用程序终止ObserveOn(Scheduler.ThreadPool).我发现处理这个问题的唯一方法是使用下面的自定义扩展方法(除了确保OnNext永远不会抛出异常).然后确保每个ObserveOn后跟一个ExceptionToError.

    public static IObservable<T> ExceptionToError<T>(this IObservable<T> source) {
        var sub = new Subject<T>();
        source.Subscribe(i => {
            try {
                sub.OnNext(i);
            } catch (Exception err) {
                sub.OnError(err);
            }
        }
            , e => sub.OnError(e), () => sub.OnCompleted());
        return sub;
    }
Run Code Online (Sandbox Code Playgroud)

但是,这感觉不对.有没有更好的方法来处理这个?

由于未捕获的异常,此程序崩溃.

class Program {
    static void Main(string[] args) {
        try {
            var xs = new Subject<int>();

            xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x => {
                Console.WriteLine(x);
                if (x % 5 == 0) {
                    throw new System.Exception("Bang!");
                }
            }, ex => …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

10
推荐指数
2
解决办法
5286
查看次数

如何在SelectMany语句中处理异步方法的异常

我试图使用Rx异步处理一些任务,例如

var list = Enumerable.Range(0, 100)
    .ToObservable()
    .SelectMany(x => Observable.Start(() => {
        Console.WriteLine("Processing {0} ...", x);

        Thread.Sleep(100 * x % 3);

        if (x > 90) {
            Console.WriteLine("Procesing exception {0} > 90", x);
            throw new Exception("Value too large");
        }
        Console.WriteLine("Processing {0} completed.", x);
        return x;
    }))
    .Subscribe(
        x => { Console.WriteLine("Next [{0}]", x); },
        e => {
            Console.WriteLine("Exception:");
            Console.WriteLine(e.Message);
        },
        () => { Console.WriteLine("Complete"); }
    );
Run Code Online (Sandbox Code Playgroud)

我对此代码的问题是异常未传递给订阅者.所以,经过大量的尝试,我放弃了,并决定问这个简单的问题:

如何处理SelectMany语句中异步方法中引发的异常?

为了说清楚,最终的实现是一个同步的函数调用,可能会或可能不会抛出异常.目标是将其传递给订户,以便可以进一步处理(在特定情况下,将向用户显示消息).

编辑

我将我的发现归结为一个答案,以便我可以将这个问题标记为已回答.就个人而言,我不同意自我回答......但有时没有别的办法,所以很抱歉.

.net c# .net-4.0 system.reactive

5
推荐指数
1
解决办法
1425
查看次数