combineLatest只在其中一个流发生变化时才会发出

dte*_*ech 6 scala reactive-programming rx-java

我有一个频繁值的流和一个慢的流.我希望将它们组合起来,但只在较慢的一个发出时才发出一个值.所以combineLatest不起作用.像这样:

a1
a2
b1
(a2,b1)
a3
a4
a5
b2
(a5,b2)
Run Code Online (Sandbox Code Playgroud)

目前我正在这样做,有更清洁的方式吗?

withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] =
  Observable({ o =>
    var last : A
    fast.subscribe({a => last = a})
    slow.subscribe({b => o.onNext((last,b))})
  })
Run Code Online (Sandbox Code Playgroud)

编辑:此运算符现在位于Rx中,并使用LatestFrom调用.

And*_*ltz 4

您正在寻找的是一个我称之为“combinePrev”的组合器,它在 API 中不存在,但在许多情况下却非常必要。sample运算符很接近,但它不会合并两个流。我还错过了 RxJS 中的“combinePrev”。事实证明,“combinePrev”(“withLatest”)的实现很简单,只依赖于 map 和 switch:

withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] = {
  val hotSlow = slow.publish.refCount
  fast.map({a => hotSlow.map({b => (a,b)})}).switch
}
Run Code Online (Sandbox Code Playgroud)

这是在 RxJS 中实现的相同运算符的jsfiddle示例。

虽然运算符不在 Rx 中,但您可以使用隐式类,这样您就可以使用slow.withLatest(fast)

implicit class RXwithLatest[B](slow: Observable[B]) {
  def withLatest[A](fast : Observable[A]) : Observable[(A,B)] = /* see above */
}
Run Code Online (Sandbox Code Playgroud)

注意:slow必须是hot. 如果slow是冷的 Observable,则withLatest不起作用。

  • 该运算符现在在 RxJS 中实现为“withLatestFrom”:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/withlatestfrom.md 应该很快就会出现在 Rx JVM 中,但没有尚未指派一名人员实施该计划。 (2认同)
  • withLatestFrom 现在是最新版本中 RxJava 中的实验运算符:https://github.com/ReactiveX/RxJava/releases/tag/v1.0.7 (2认同)