我有 Observable,当某些数据来自 BLE 连接时,它会发出项目:
public interface CommunicationController {
Flowable<DataContainer> dataReceived();
}
Run Code Online (Sandbox Code Playgroud)
除此之外,我想构建一个Observable,completes当满足以下条件之一时:
A。我收到两条特定类型的消息(这是通过filter在收到的DataContainer项目上使用运算符来完成的。
communicationController.dataReceived()
.filter(data -> isTypeA(data) || isTypeB(data))
.take(2)
.toList()
.map(dataContainers -> doSomeMappingToCommon object)
Run Code Online (Sandbox Code Playgroud)
b. 我收到一条特定类型的消息(再次使用filter运算符)。
communicationController.dataReceived()
.filter(data -> isTypeC(data))
.firstOrError()
.map(dataContainers -> doSomeMappingToCommon object);
Run Code Online (Sandbox Code Playgroud)
我怎样才能将这两个合并Observable为一个?此外,两个 Observable 中只有一个会发出一个项目。
我认为你的答案将是 Observable.merge:http ://reactivex.io/documentation/operators/merge.html
根据文档,这并不等待每个可观察到的东西发出一些东西。
一种选择是使用 merge 和 takeUntil:http://reactivex.io/documentation/operators/takeuntil.html
科特林代码:
val stopStream = Observable.merge(...)
streamThatWillRun = streamThatWillRun.takeUntil(stopStream)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1188 次 |
| 最近记录: |