RxJava/RxJs:如何合并两个源可观察对象,但只要其中一个完成就完成

jba*_*ndi 6 rxjs rx-java rx-java2

我有两个源可观察量.我想合并两个源可观察对象,但是只要其中一个源可观察对象完成,合并的可观察对象就会完成.

期望的行为:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged"  ---1---2----3--4--x
Run Code Online (Sandbox Code Playgroud)

如果其中一个源出现错误,则错误应传播到合并的observable:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------e
"merged"  ---1---2----3--4--ex
Run Code Online (Sandbox Code Playgroud)

"merge"运算符仅在两个源完成时完成合并流:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged"  ---1---2----3--4-----------------------------x
Run Code Online (Sandbox Code Playgroud)

我怎样才能实现我想要的行为?

Bob*_*ish 10

您需要使用元数据,每个observable的信息.为此,请materialize()在每个流上使用运算符,并dematerialize()在合并流上使用实际发出数据.

Observable.merge( observableA.materialize(),
                  observableB.materialize() )
  .takeWhile( notification -> notification.hasValue() )
  .dematerialize()
  .subscribe( ... );
Run Code Online (Sandbox Code Playgroud)

这将合并两个observable,直到其中一个完成或发出错误.