如何在 lambda 和 ObservableSource 中使用 concat

ant*_*009 4 kotlin rx-java2

RxJava2
kotlin 
Run Code Online (Sandbox Code Playgroud)

这工作正常,我可以连接 2 个观察对象

   Observable.concat(countries(), animals())
                    .subscribeBy {
                        println(it)
                    }
Run Code Online (Sandbox Code Playgroud)

这是我无法理解的示例,因为它使用了一个似乎采用 ObservableSource 的 lambda,我想连接 2 个 observable,但它会导致空异常。只是想知道我做错了什么。将 lambda 与 concat 一起使用的目的是什么?

        Observable.concat<String> {
                    it.onNext(countries())
                    it.onNext(animals())
                }.subscribeBy {
                    println(it)
                }


    private fun animals(): Observable<String> =
            Observable.just("fox", "cat", "dog", "bear", "bat", "hare", "lion", "tiger")

    private fun countries(): Observable<String> =
            Observable.just("England", "France", "Thailand", "America", "Scotland", "Ice Land")
Run Code Online (Sandbox Code Playgroud)

这是我遇到的崩溃:

Exception in thread "main" java.lang.NullPointerException
    at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.onNext(ObservableConcatMap.java:129)
Run Code Online (Sandbox Code Playgroud)

这是ObservableSource我想我所指的接口。

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}
Run Code Online (Sandbox Code Playgroud)

非常感谢您的任何建议

Rya*_*ley 5

将 lambda 传递给 concat 解析为concat(ObservableSource<? extends ObservableSource<? extends T>> sources). 因为ObservableSource是一个带有单个非默认方法的接口,这会触发 Kotlin 的SAM 转换。这就是它选择重载的原因——它是唯一一个可以通过 SAM 转换实现的接口。

因此,lambda 是该ObservableSource.subscribe(Observer<? super T> observer)方法的实现。该方法记录为:

将给定的观察者订阅到此 ObservableSource 实例。

因此,lambda 需要将参数 ( it)订阅到 的源Observables。你得到一个,NullPointerException因为不是订阅它,你开始调用onNext一个Observer尚未订阅的,因此内部状态不正确(在这种情况下,有一个尚未设置的队列,但这不是特别重要)。

要实现该方法的契约,您只需创建一个Observable发出Observables 并在 lambda 中订阅it(the Observer)Observable的对象,如下所示:

Observable.concat<String> {
    Observable.just(countries(), animals()).subscribe(it)
}.subscribeBy {
    println(it)
}
Run Code Online (Sandbox Code Playgroud)

我已经在本地进行了测试,它产生了预期的结果。