如何合并两个流(没有空值)并在对上应用条件?

ope*_*eek 6 reactive-programming rxjs reactive-streams

考虑我有两个数据流,有没有办法合并它们并在这两个流之间应用数据条件?例如

Stream A : A, B, C, D....
Stream B : -, A, -, -....
Composed : (A,-),(B,A),(C,-),(D,-)....
Run Code Online (Sandbox Code Playgroud)

如何使用rxjs获得上面的组合流?我想在组合流上应用条件来提出一些通知.也可以使用最后已知的非空数据,例如,参见下面的组合流.

Stream A : A, B, C, D....
Stream B : 1, null, 2, null....
Composed : (A,1),(B,1),(C,2),(D,2)....
Run Code Online (Sandbox Code Playgroud)

我刚开始玩反应流的想法,所以如果我误解了反应流的想法,请纠正我.

J. *_*non 12

有两个运营商可以为您的建议服务.

邮编:
Rx Zip
参考RxJs:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/zip.md

CombineLatest:
Rx CombineLatest
参考RxJs:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/combinelatest.md

图像解释了两者之间的差异.现在你已经合并了你需要过滤的observable,使用where,如果其中一个值为null,它将被过滤.

不幸的是,操作员都无法获得您描述的此行为:

Stream A : A, B, C, D, E....
Stream B : 1, null, 2, null, 3....
Composed : (A,1),(B,1),(C,2),(D,2)....
Run Code Online (Sandbox Code Playgroud)

如果您使用Zip和Where(之后过滤空值),结果将是:

Composed: (A,1),(C,2),(E,3)
Run Code Online (Sandbox Code Playgroud)

如果使用Where(先前过滤空值)和Zip,结果将是:

Composed: (A,1),(B,2),(C,3)
Run Code Online (Sandbox Code Playgroud)

如果您使用CombineLatest将取决于Streams中事件发生的顺序,当然,您将放置where运算符的位置,结果可能与您显示的不同,例如:

Stream A : A, B, C, D....
Stream B : 1, null, 2, null....
Composed : (A,1),(B,1),(C,1),(C,2),(D,2).... // OR
Composed : (A,1),(B,1),(B,2),(C,2),(D,2).... 
Run Code Online (Sandbox Code Playgroud)

除非您有更具体的要求,否则我认为我提到的其中一个选项就是您要寻找的,随时添加信息.

有几种方法可以组成observable,其他未提及的运算符是:

  • distinctUntilChanged,可以在合成的最后添加,使用键选择器功能来限制zip或最新值的一部分.
  • 开关,用于将一个可观察的内部组合在一起.