Ada*_*old 5 java kotlin rx-java2
我有一个PublishSubject
和一个Subscriber
我用来处理(可能)无限的预处理数据流.问题是某些元素可能包含一些错误.我想忽略它们并继续处理.我怎么能这样做?我尝试过这样的事情:
val subject = PublishSubject.create<String>()
subject.retry().subscribe({
println("next: $it")
}, {
println("error")
}, {
println("complete")
})
subject.onNext("foo")
subject.onNext("bar")
subject.onError(RuntimeException())
subject.onNext("wom")
subject.onComplete()
Run Code Online (Sandbox Code Playgroud)
我的问题是没有任何错误处理方法可以帮助我:
onErrorResumeNext()
- 指示Observable在遇到错误时发出一系列项目
onErrorReturn(?)
- 指示Observable在遇到错误时发出特定项目
onExceptionResumeNext(?)
- 指示Observable在遇到异常后继续发出项目(但不是另一种可抛出的项目)
retry(?)
- 如果源Observable发出错误,请重新订阅它,希望它能完成而不会出错
retryWhen(?)
- 如果源Observable发出错误,请将该错误传递给另一个Observable以确定是否重新订阅源
我尝试retry()
了例子,但它无限期地在错误之后挂起我的进程.
我也试过,onErrorResumeNext()
但它没有按预期工作:
val backupSubject = PublishSubject.create<String>()
val subject = PublishSubject.create<String>()
var currentSubject = subject
subject.onErrorResumeNext(backupSubject).subscribe({
println("next: $it")
}, {
println("error")
currentSubject = backupSubject
}, {
println("complete")
})
backupSubject.subscribe({
println("backup")
}, {
println("backup error")
})
currentSubject.onNext("foo")
currentSubject.onNext("bar")
currentSubject.onError(RuntimeException())
currentSubject.onNext("wom")
currentSubject.onComplete()
Run Code Online (Sandbox Code Playgroud)
这只打印foo
和bar
.
如果你想在错误后继续处理,这意味着你的错误就像你String
的错误一样,应该通过onNext
.为确保在这种情况下的类型安全,您应该使用某种形式的包装器,它可以采用常规值或错误; 例如,io.reactivex.Notification<T>
它在RxJava 2中可用:
PublishSubject<Notification<String>> subject = PublishSubject.create();
subject.subscribe(System.out::println);
subject.onNext(Notification.createOnNext("Hello"));
subject.onNext(Notification.<String>createOnError(new RuntimeException("oops")));
subject.onNext(Notification.createOnNext("World"));
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2522 次 |
最近记录: |