我在这里有点疯狂.我正在尝试创建一个Observable<BigDecimal>扩展函数(针对RxJava 2.x)来发出平均排放量,但是我得到了Single.zip()函数的编译错误.有没有人有任何想法我做错了什么?我试图明确我的所有类型,但是没有用......
import io.reactivex.Observable
import io.reactivex.Single
import java.math.BigDecimal
fun Observable<BigDecimal>.sum() = reduce { total, next -> total + next }
//compile error
fun Observable<BigDecimal>.average() = publish().autoConnect(2).let {
Single.zip(it.sum().toSingle(), it.count()) {
sum, count -> sum / BigDecimal.valueOf(count)
}
}
Run Code Online (Sandbox Code Playgroud)
我知道如何在RxJava 2 中做到这一点。
而且我知道RxKotlin如何帮助解决类似问题。
但似乎 RxKotlin.Observables 没有列表重载的这个辅助函数,我无法弄清楚。你会怎么做?