RxJS如何在继续之前等待两个ReplaySubjects生成值

Ian*_*nT8 3 observable rxjs

我想用于这样的连接:

Observable.forkJoin(
    this.service1.dataSourceIsAReplaySubject,
    this.service2.dataSourceIsAnObservable)
.subscribe(values => {
    console.log('inside subscribe values is', values);
});
Run Code Online (Sandbox Code Playgroud)

但是console.log永远不会触发.

我猜测重播主题永远不会完成,这就是为什么forkJoin永远不会停止等待.

我试过这个,但这也行不通.我首先尝试了ReplaySubject,然后尝试了Observable,反之亦然.

Observable.forkJoin(
    Observable.from(this.service1.dataSourceIsAReplaySubject),
    this.service2.dataSourceIsAnObservable)
.subscribe(values => {
    console.log('inside subscribe values is', values);
});
Run Code Online (Sandbox Code Playgroud)

我也试过调用share方法:

Observable.forkJoin(
    this.service1.dataSourceIsAReplaySubject.share(),
    this.service2.dataSourceIsAnObservable)
.subscribe(values => {
    console.log('inside subscribe values is', values);
});
Run Code Online (Sandbox Code Playgroud)

我该怎么做才能等待两个ReplaySubjects或1个ReplaySubject和1个Observable的混合

pau*_*els 8

正如您自己指出的那样,问题是forkJoin只有在所有流完成后才会发出.如果你要使用它,你将需要确保以某种方式所有流的完整(first(),take(1),等...).

一个更好的选择可能是使用zipcombineLatest根据您的需要.实现看起来类似:

Observable.zip(
    this.service1.dataSourceIsAReplaySubject,
    this.service2.dataSourceIsAnObservable
).subscribe(values => {
    console.log('inside subscribe values is', values);
});

// OR

Observable.combineLatest(
    this.service1.dataSourceIsAReplaySubject,
    this.service2.dataSourceIsAnObservable
).subscribe(values => {
    console.log('inside subscribe values is', values);
});
Run Code Online (Sandbox Code Playgroud)

分解

zip 将两个Observables相互锁定发出,即每次发射一个,另一个也必须在zip发出一个值之前发出,这是有用的,如果一个发射比另一个快得多,它也可能是危险的(因为它会导致无限制的缓冲区在幕后成长).

combineLatest将等待两个流至少发出一次,然后任何时候发出,combineLatest将发出最新的另一个流.

还有第三种选择,称为withLatestFrom.它的功能类似于combineLatest警告,它只在第一个流发出并静默更新所有其他最新值而不发出时才会发出.


car*_*ant 5

forkJoin在传递给它的可观察对象完成之前不会发出值;它发出每个 observables的最后一个值。

您可以使用first运算符确保它们完成:

Observable.forkJoin(
    this.service1.dataSourceIsAReplaySubject.first(),
    this.service2.dataSourceIsAnObservable.first())
.subscribe(values => {
    console.log('inside subscribe values is', values);
});
Run Code Online (Sandbox Code Playgroud)