如何在没有.flatMap的情况下控制流,这会打破一个反应流,阻止像distinctUntilChanged这样的运营商在整个流上工作

Zak*_*rdi 6 algebraic-data-types reactive-programming kotlin rx-java rx-java2

我想为不同的实现处理不同的可观察逻辑链State.这可以通过密封的类/代数数据类型/ union +轻松实现.flatMap(),但这会打破流,操作员.distinctUntilChanged()只能在.flatMap()函数内工作,而不是整个流本身.

sealed class State {
    object Loading : State()
    data class Loaded(val value: Int) : State()
}

@Test fun distinctTest() {
    val relay = PublishRelay.create<State>()
    relay.flatMap {
        fun handle(state: State): Observable<*> = when (state) {
            State.Loading -> Observable.just(state)
                    .distinctUntilChanged()
                    .doOnNext { println("loading") }

            is State.Loaded -> Observable.just(state)
                    .distinctUntilChanged()
                    .doOnNext { println(it.value) }
        }
        handle(it)
    }
            .subscribe()

    relay.accept(State.Loading)
    relay.accept(State.Loaded(1))
    relay.accept(State.Loaded(2))
    relay.accept(State.Loaded(3))
    relay.accept(State.Loaded(3))
    //desired: loading, 1, 2, 3
    //actual: loading, 1, 2, 3, 3
}
Run Code Online (Sandbox Code Playgroud)

注意,这是一个简化的例子.虽然我只是在这里打印,但实际上我想根据实现类型执行不同的操作(以不同的方式呈现UI)State

这可以通过主题/中继来完成,但是这会创建一个断开的,可变的流,我也想避免.

mar*_*ran 1

您能否将其拆分Observable为多个可观察量,每个可观察量都获取单一类型的事件?然后,您可以对这些可观察量执行一些操作,然后再将它们重新合并在一起。

我现在无法对此进行测试,因此可能需要一些调整。不管怎样,我希望你能明白:

@Test fun distinctTest() {
    val relay = PublishRelay.create<State>()

    val loadingObs = relay.filter { it is State.Loading }
                          .distinctUntilChanged()
                          .doOnNext { println("loading") }

    val loadedObs = relay.filter { it is State.Loaded }
                         .distinctUntilChanged()
                         .doOnNext { println(it.value) }

    val merged = loadingObs.mergeWith(loadedObs)

    merged.subscribe()

    relay.accept(State.Loading)
    relay.accept(State.Loaded(1))
    relay.accept(State.Loaded(2))
    relay.accept(State.Loaded(3))
    relay.accept(State.Loaded(3))
    // Hopefully prints this: loading, 1, 2, 3
}
Run Code Online (Sandbox Code Playgroud)