如何筛选更改超过阈值的值

Dmy*_*yak 2 rx-java rx-java2

我有一个发出数字的Observable(实际上是整数,双数,BigDecimal doen'st).我有一个在可观察的合成时间定义的阈值.换句话说,在可观察的生命期内,阈值不会改变.

我需要的是过滤掉与阈值内最后一次传递的值不同的值.或者改写它:只有当它与上一次传递的值超过阈值时才传递值.

示例可以更好地解释它:

Source observable values: 
[5, 4, 7, 9, 15, 14, 13, 12, 11, 7, 3, 2, 1]

Filetered values with threshold = 3 : 
[5, 9, 15, 11, 7, 3]
Run Code Online (Sandbox Code Playgroud)

为了更好地说明它,我们假设它是温度读数,例如.我想只过滤温度变化超过3度的那些值.第一个值总是通过并作为初始值进行比较.例如,如果第一温度读数是21摄氏度并且在一小时内每个后续读数在[18..24],那么这些值都不应该通过下游.但是一旦它越过这些边界就应该传递并创建新的边界来进行比较.

问题是:如何仅使用RxJava运算符执行此操作?或者除了在rx管道外存储状态之外别无其他方式(在易失性或原子引用或任何其他同步状态,细节无关紧要)?

小智 5

你可以使用的组合scan,并distinctUntilChanged与刚刚RX运营商实现这一目标.

scan 允许您访问上一个和下一个值,允许您与给定的阈值进行比较.

如果未满足阈值,则可以只发出先前的值.

distinctUntilChanged 然后可以用来减少重复排放.

在Kotlin中,这将如下所示:

val threshold = 3
val list = listOf(5, 4, 7, 9, 15, 14, 13, 12, 11, 7, 3, 2, 1)

return Observable.fromIterable<Int>(list)
        .scan { previous: Int, next: Int ->
            val difference = abs(next - previous)
            when {
                difference > threshold -> next
                else -> previous
            }
        }.distinctUntilChanged()
Run Code Online (Sandbox Code Playgroud)