Rx 运算符。忽略直到发出下一个

P_K*_*ing 5 java android kotlin rx-java reactivex

在我的应用程序中,我有一些耗时的逻辑,可以通过多种方式启动,比如自动或由用户手动启动。

// Let's describe different event sources as relays
val autoStarts = PublishRelay.create<Unit>()
val manualStarts = PublishRelay.create<Unit>()
val syncStarts = PublishRelay.create<Unit>()

// This is my time consuming operation.
fun longOperation() = Observable.interval(10, TimeUnit.SECONDS).take(1).map { Unit }

val startsDisposable = Observable
        .merge(
                autoStarts.flatMap { Observable.just(Unit).delay(30, TimeUnit.SECONDS) },
                manualStarts
        )
        .subscribe(syncStarts) // merge emissions of both sources into one

val syncDisposable = syncStarts
        .concatMap {
            longOperation()
        }
        .subscribe(autoStarts) // end of long operation trigger start of auto timer
Run Code Online (Sandbox Code Playgroud)

启动继电器会产生许多排放。假设用户单击按钮进行手动启动,距离定时器自动启动还剩 5 秒。longOperation()如果它很简单,这两个事件都将导致开始flatMap。我只想要一个线程在longOperation()里面运行,所以如果它现在正在运行并且没有完成 - 忽略启动排放,无论如何完成将导致计时器重启。

ConcatMap在一半帮助我 - 它增加longOperation()了“队列”,所以它们被一个一个地处理,但是我怎么能写这个来忽略任何进一步的开始,直到第一个完全完成?

Bob*_*ish 3

您可以使用flatMap()额外的整数参数来限制并行性。

syncStarts
  .onBackpressureDrop()               // 1
  .flatMap(() -> longOperation(), 1)  // 2
  ...
Run Code Online (Sandbox Code Playgroud)
  1. flatMap()减少繁忙时发生的任何排放。
  2. 数字 1 是订阅的数量flatMap(),本质上是强制操作按顺序进行。

上面的功能就是你想要的功能。但是,您没有指定运行后想要发生的情况longOperation():您是否希望在运行后立即启动另一个操作?如果是这样,您需要更改背压处理以最多排队一次发射。