是否有Rx运算符仅在流2发出事物时才组合最新的流1和2?

Mat*_*att 5 reactive-programming system.reactive rx-java

这是我尝试绘制大理石图 -

STREAM 1 = A----B----C---------D------> (magical operator) STREAM 2 = 1----------2-----3-----4---> STREAM 3 = 1A---------2C----3C----4D-->

我基本上是在寻找从流1和流2生成流3的东西.基本上,每当从流2发出一些东西时,它将它与流1中的最新内容相结合.combineLatest类似于我想要但我只想要从流中发出的东西3当从流2发出某些东西而不是流1时.这样的运营商是否存在?

Tom*_*řák 5

有一个运算符可以满足您的需求:一个重载sample需要另一个可观察而不是持续时间作为参数.文档在这里:https://github.com/ReactiveX/RxJava/wiki/Filtering-Observables#sample-or-throttlelast

用法(我将在scala中给出示例):

import rx.lang.scala.Observable
import scala.concurrent.duration
import duration._

def o = Observable.interval(100.milli)
def sampler = Observable.interval(180.milli)

// Often, you just need the sampled observable
o.sample(sampler).take(10).subscribe(x ? println(x +  ", "))
Thread.sleep(2000)
// or, as for your use case
o.combineLatest(sampler).sample(sampler).take(10).subscribe(x ? println(x +  ", "))
Thread.sleep(2000)
Run Code Online (Sandbox Code Playgroud)

输出:

0, 
2, 
4, 
6, 
7, 
9, 
11, 
13, 
15, 
16, 

(2,0), 
(4,1), 
(6,2), 
(7,3), 
(9,4), 
(11,5), 
(13,6), 
(15,7), 
(16,8), 
(18,9),
Run Code Online (Sandbox Code Playgroud)

稍微有一点,因为吞下了来自采样的observable的重复条目(参见https://github.com/ReactiveX/RxJava/issues/912上的讨论).除此之外,我认为这正是你要找的.


Mat*_*att 4

withLatestFrom似乎完全符合我正在寻找的东西 - http://rxmarbles.com/#withLatestFrom