RxJava2
kotlin
Run Code Online (Sandbox Code Playgroud)
这工作正常,我可以连接 2 个观察对象
Observable.concat(countries(), animals())
.subscribeBy {
println(it)
}
Run Code Online (Sandbox Code Playgroud)
这是我无法理解的示例,因为它使用了一个似乎采用 ObservableSource 的 lambda,我想连接 2 个 observable,但它会导致空异常。只是想知道我做错了什么。将 lambda 与 concat 一起使用的目的是什么?
Observable.concat<String> {
it.onNext(countries())
it.onNext(animals())
}.subscribeBy {
println(it)
}
private fun animals(): Observable<String> =
Observable.just("fox", "cat", "dog", "bear", "bat", "hare", "lion", "tiger")
private fun countries(): Observable<String> =
Observable.just("England", "France", "Thailand", "America", "Scotland", "Ice Land")
Run Code Online (Sandbox Code Playgroud)
这是我遇到的崩溃:
Exception in thread "main" java.lang.NullPointerException
at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.onNext(ObservableConcatMap.java:129)
Run Code Online (Sandbox Code Playgroud)
这是ObservableSource我想我所指的接口。
public interface ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}
Run Code Online (Sandbox Code Playgroud)
非常感谢您的任何建议
将 lambda 传递给 concat 解析为concat(ObservableSource<? extends ObservableSource<? extends T>> sources). 因为ObservableSource是一个带有单个非默认方法的接口,这会触发 Kotlin 的SAM 转换。这就是它选择重载的原因——它是唯一一个可以通过 SAM 转换实现的接口。
因此,lambda 是该ObservableSource.subscribe(Observer<? super T> observer)方法的实现。该方法记录为:
将给定的观察者订阅到此 ObservableSource 实例。
因此,lambda 需要将参数 ( it)订阅到 的源Observables。你得到一个,NullPointerException因为不是订阅它,你开始调用onNext一个Observer尚未订阅的,因此内部状态不正确(在这种情况下,有一个尚未设置的队列,但这不是特别重要)。
要实现该方法的契约,您只需创建一个Observable发出Observables 并在 lambda 中订阅it(the Observer)Observable的对象,如下所示:
Observable.concat<String> {
Observable.just(countries(), animals()).subscribe(it)
}.subscribeBy {
println(it)
}
Run Code Online (Sandbox Code Playgroud)
我已经在本地进行了测试,它产生了预期的结果。
| 归档时间: |
|
| 查看次数: |
314 次 |
| 最近记录: |