如何在RxJava 2中发生错误后继续处理?

Ada*_*old 5 java kotlin rx-java2

我有一个PublishSubject和一个Subscriber我用来处理(可能)无限的预处理数据流.问题是某些元素可能包含一些错误.我想忽略它们并继续处理.我怎么能这样做?我尝试过这样的事情:

    val subject = PublishSubject.create<String>()
    subject.retry().subscribe({
        println("next: $it")
    }, {
        println("error")
    }, {
        println("complete")
    })

    subject.onNext("foo")
    subject.onNext("bar")
    subject.onError(RuntimeException())
    subject.onNext("wom")
    subject.onComplete()
Run Code Online (Sandbox Code Playgroud)

我的问题是没有任何错误处理方法可以帮助我:

onErrorResumeNext() - 指示Observable在遇到错误时发出一系列项目

onErrorReturn(?) - 指示Observable在遇到错误时发出特定项目

onExceptionResumeNext(?) - 指示Observable在遇到异常后继续发出项目(但不是另一种可抛出的项目)

retry(?) - 如果源Observable发出错误,请重新订阅它,希望它能完成而不会出错

retryWhen(?) - 如果源Observable发出错误,请将该错误传递给另一个Observable以确定是否重新订阅源

我尝试retry()了例子,但它无限期地在错误之后挂起我的进程.

我也试过,onErrorResumeNext()但它没有按预期工作:

    val backupSubject = PublishSubject.create<String>()
    val subject = PublishSubject.create<String>()
    var currentSubject = subject
    subject.onErrorResumeNext(backupSubject).subscribe({
        println("next: $it")
    }, {
        println("error")
        currentSubject = backupSubject
    }, {
        println("complete")
    })

    backupSubject.subscribe({
        println("backup")
    }, {
        println("backup error")
    })

    currentSubject.onNext("foo")
    currentSubject.onNext("bar")
    currentSubject.onError(RuntimeException())
    currentSubject.onNext("wom")
    currentSubject.onComplete()
Run Code Online (Sandbox Code Playgroud)

这只打印foobar.

aka*_*okd 9

如果你想在错误后继续处理,这意味着你的错误就像你String的错误一样,应该通过onNext.为确保在这种情况下的类型安全,您应该使用某种形式的包装器,它可以采用常规值或错误; 例如,io.reactivex.Notification<T>它在RxJava 2中可用:

PublishSubject<Notification<String>> subject = PublishSubject.create();

subject.subscribe(System.out::println);

subject.onNext(Notification.createOnNext("Hello"));
subject.onNext(Notification.<String>createOnError(new RuntimeException("oops")));
subject.onNext(Notification.createOnNext("World"));
Run Code Online (Sandbox Code Playgroud)

  • 像Try &lt;Optional &lt;T &gt;&gt;这样的东西被压缩成一个类。 (2认同)