RxJava:根据条件完成流

Chr*_*her 2 android rx-java2

我有 Observable,当某些数据来自 BLE 连接时,它会发出项目:

public interface CommunicationController {
     Flowable<DataContainer> dataReceived();
}
Run Code Online (Sandbox Code Playgroud)

除此之外,我想构建一个Observablecompletes当满足以下条件之一时:

A。我收到两条特定类型的消息(这是通过filter在收到的DataContainer项目上使用运算符来完成的。

communicationController.dataReceived()
    .filter(data -> isTypeA(data) || isTypeB(data))
    .take(2)
    .toList()
    .map(dataContainers -> doSomeMappingToCommon object) 
Run Code Online (Sandbox Code Playgroud)

b. 我收到一条特定类型的消息(再次使用filter运算符)。

communicationController.dataReceived()
    .filter(data -> isTypeC(data))
    .firstOrError()
    .map(dataContainers -> doSomeMappingToCommon object); 
Run Code Online (Sandbox Code Playgroud)

我怎样才能将这两个合并Observable为一个?此外,两个 Observable 中只有一个会发出一个项目。

Lea*_*mpo 5

我认为你的答案将是 Observable.merge:http ://reactivex.io/documentation/operators/merge.html

根据文档,这并不等待每个可观察到的东西发出一些东西。

一种选择是使用 merge 和 takeUntil:http://reactivex.io/documentation/operators/takeuntil.html

科特林代码:

val stopStream = Observable.merge(...)

streamThatWillRun = streamThatWillRun.takeUntil(stopStream)
Run Code Online (Sandbox Code Playgroud)