P_K*_*ing 5 java android kotlin rx-java reactivex
在我的应用程序中,我有一些耗时的逻辑,可以通过多种方式启动,比如自动或由用户手动启动。
// Let's describe different event sources as relays
val autoStarts = PublishRelay.create<Unit>()
val manualStarts = PublishRelay.create<Unit>()
val syncStarts = PublishRelay.create<Unit>()
// This is my time consuming operation.
fun longOperation() = Observable.interval(10, TimeUnit.SECONDS).take(1).map { Unit }
val startsDisposable = Observable
.merge(
autoStarts.flatMap { Observable.just(Unit).delay(30, TimeUnit.SECONDS) },
manualStarts
)
.subscribe(syncStarts) // merge emissions of both sources into one
val syncDisposable = syncStarts
.concatMap {
longOperation()
}
.subscribe(autoStarts) // end of long operation trigger start of auto timer
Run Code Online (Sandbox Code Playgroud)
启动继电器会产生许多排放。假设用户单击按钮进行手动启动,距离定时器自动启动还剩 5 秒。longOperation()如果它很简单,这两个事件都将导致开始flatMap。我只想要一个线程在longOperation()里面运行,所以如果它现在正在运行并且没有完成 - 忽略启动排放,无论如何完成将导致计时器重启。
ConcatMap在一半帮助我 - 它增加longOperation()了“队列”,所以它们被一个一个地处理,但是我怎么能写这个来忽略任何进一步的开始,直到第一个完全完成?
您可以使用flatMap()额外的整数参数来限制并行性。
syncStarts
.onBackpressureDrop() // 1
.flatMap(() -> longOperation(), 1) // 2
...
Run Code Online (Sandbox Code Playgroud)
flatMap()减少繁忙时发生的任何排放。flatMap(),本质上是强制操作按顺序进行。上面的功能就是你想要的功能。但是,您没有指定运行后想要发生的情况longOperation():您是否希望在运行后立即启动另一个操作?如果是这样,您需要更改背压处理以最多排队一次发射。