处理onError和继续处理的最佳实践

Ale*_*ggs 26 rx-java

我是RxJava的新手,但我正在将它集成到一个项目中,我正在努力帮助我学习它.我遇到了一个关于最佳实践的问题.

我有一个关于如何onError防止停止Observable处理的问题.

这是设置:

我有一个userIds列表,我想做两个或更多的网络请求.如果任何网络请求对用户标识失败,则该用户标识将不会被更新并且可以被跳过.这不应该阻止处理其他用户标识.我确实有一个解决方案,但它涉及嵌套订阅(请参阅第二个代码块).我看到的一个问题是,如果每次呼叫失败,即使在检测到某个阈值数量失败后,也无法短路并阻止其余部分点击网络资源.

有一个更好的方法吗?

在传统的代码中:

List<String> results = new ArrayList<String>();
for (String userId : userIds) {
    try {
        String info = getInfo(userId);  // can throw an GetInfoException
        String otherInfo = getOtherInfo(userId);  // can throw an GetOtherInfoException
        results.add(info + ", " + otherInfo);
    } catch (GetInfoException e) {
        log.error(e);
    } catch (GetOtherInfoException e) {
        log.error(e);
    }
}
Run Code Online (Sandbox Code Playgroud)

问题:

伪代码:

userid -> network requests -> result 
1 -> a, b -> onNext(1[a ,b])
2 -> a, onError -> onError
3 -> a, b -> onNext(3[a, b])
4 -> a, b -> onNext(4[a, b])
Run Code Online (Sandbox Code Playgroud)

以下是userIds列表和每2个info信息请求的工作示例.如果你运行它,你会发现它会失败(见下面的源代码)

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class TestMergeDelayError {

    public static Observable<String> getUserIds() {
        return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
    }

    public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
        Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {

            public Subscription onSubscribe(Observer<? super String> t1) {
                if (integer.contains(errorNumber)) {
                    t1.onError(new Exception());
                } else {
                    t1.onNext(prefix + integer);
                    t1.onCompleted();
                }
                return Subscriptions.empty();
            }
        });
        return observable;
    }

    public static void main(String[] args) {

        Observable<String> userIdObservable = getUserIds();
        Observable<String> t = userIdObservable.flatMap(new Func1<String, Observable<String>>() {

            public Observable<String> call(final String t1) {
                Observable<String> info1 = getInfo("1::: ", t1, "2");
                Observable<String> info2 = getInfo("2::: ",t1, "3");
                return Observable.mergeDelayError(info1, info2);
            }
        });

        t.subscribe(new Action1<String>() {

            public void call(String t1) {
                System.out.println(t1);
            }
        }, new Action1<Throwable>() {

            public void call(Throwable t1) {
                t1.printStackTrace();
            }
        },
        new Action0(){

            public void call() {
                System.out.println("onComplete");
            }

        });
    }
}
Run Code Online (Sandbox Code Playgroud)

输出:

1::: 1
2::: 1
2::: 2
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:32)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:266)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:210)
        at rx.operators.OperationMergeDelayError$2.onSubscribe(OperationMergeDelayError.java:77)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable.onSubscribe(OperationMergeDelayError.java:171)
        at rx.operators.OperationMergeDelayError$1.onSubscribe(OperationMergeDelayError.java:64)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationMap$MapObservable$1.onNext(OperationMap.java:105)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMap$MapObservable.onSubscribe(OperationMap.java:102)
        at rx.operators.OperationMap$2.onSubscribe(OperationMap.java:76)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)
Run Code Online (Sandbox Code Playgroud)

嵌套订阅解决方案:

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class TestMergeDelayError {

    public static Observable<String> getUserIds() {
        return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
    }

    public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
        Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {

            public Subscription onSubscribe(Observer<? super String> t1) {
                if (integer.contains(errorNumber)) {
                    t1.onError(new Exception());
                } else {
                    t1.onNext(prefix + integer);
                    t1.onCompleted();
                }
                return Subscriptions.empty();
            }
        });
        return observable;
    }

    public static void main(String[] args) {

        Observable<String> userIdObservable = getUserIds();
        userIdObservable.subscribe(new Action1<String>() {

            public void call(String t1) {
                Observable<String> info1 = getInfo("1::: ", t1, "2");
                Observable<String> info2 = getInfo("2::: ", t1, "3");
                Observable.merge(info1, info2).subscribe(new Action1<String>() {

                    public void call(String t1) {
                        System.out.println(t1);
                    }
                }, new Action1<Throwable>() {

                    public void call(Throwable t1) {
                        t1.printStackTrace();
                    }
                },
                        new Action0() {

                            public void call() {
                                System.out.println("onComplete");
                            }

                        });
            }
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

输出:

1::: 1
2::: 1
onComplete
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:47)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:42)
        at rx.Observable$2.onNext(Observable.java:381)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:367)
        at TestMergeDelayError.main(TestMergeDelayError.java:42)
1::: 3
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:47)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:42)
        at rx.Observable$2.onNext(Observable.java:381)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:367)
        at TestMergeDelayError.main(TestMergeDelayError.java:42)
1::: 4
2::: 4
onComplete
1::: 5
2::: 5
onComplete
1::: 6
2::: 6
onComplete
Run Code Online (Sandbox Code Playgroud)

正如您所看到的那样,只有失败的单个用户标识停止了它们的单独处理,但其余的用户标识被处理了.

只是寻找建议,看看这个解决方案是否有意义,如果不是最佳实践.

谢谢,亚历克斯

zsx*_*ing 18

由于您想忽略该错误,您可以尝试onErrorResumeNext(Observable.<String>empty());.例如,

Observable<String> info1 = getInfo("1::: ", t1, "2").onErrorResumeNext(Observable.<String>empty());
Observable<String> info2 = getInfo("2::: ", t1, "3").onErrorResumeNext(Observable.<String>empty());
return Observable.merge(info1, info2);
Run Code Online (Sandbox Code Playgroud)

  • 如果我不想忽略错误(在onError回调中也称为句柄)但仍然继续处理下一个事件? (11认同)

Mor*_*goo 9

最佳实践是使用mergeDelayError()将多个Observable组合成一个,允许无错误的Observable在传播错误之前继续.

mergeDelayError表现得很像merge.例外情况是合并的其中一个Observable终止时出现onError通知.如果合并时发生这种情况,合并后的Observable将立即发出onError通知并终止.另一方面,mergeDelayError将阻止报告错误,直到它给出任何其他非产生错误的Observable,它正在合并完成发布其项目的机会,并且它将自己发出错误,并且只会终止所有其他合并的Observable完成时的onError通知.

  • 我不希望观察者完成什么?我不认为这回复了原始问题,它假设所有可观察的参与终止,并且可以延迟错误. (4认同)
  • 同意Daniele,这是mergeDelayError()的解释,而不是问题的答案 (2认同)