可观察的forkJoin不触发

nic*_*ick 7 rxjs typescript rxjs5 angular

我正在尝试使用forkJoin两个Observables。其中一个以流的形式开始...如果我直接订阅它们,forkJoin则会收到响应,但不会触发。有任何想法吗?

private data$: Observable<any[]>;
private statuses$: Observable<any[]>;
private queryStream = new Subject<string>();    

....

this.data$ = this.queryStream
    .startWith('')
     .flatMap(queryInput => {
            this.query = queryInput
            return this._companyService.getCompanies(this.queryRequired + ' ' + this.query, this.page, this.sort);
                })
            .share();

...

Observable.forkJoin(this.statuses$, this.companies$)
            .subscribe(res => {
                console.log('forkjoin');
                this._countStatus(res[0], res[1]);
            });


// This shows arrays in the console...

this.statuses$.subscribe(res => console.log(res));
this.companies$.subscribe(res => console.log(res));

// In the console
Array[9]
Array[6]
Run Code Online (Sandbox Code Playgroud)

use*_*686 11

forkJoin仅在所有内部可观测量完成时才发射。如果您需要与之等效的功能forkJoin,只需监听每个来源的单个发射,请使用combineLatest+take(1)

combineLatest(
  this.statuses$,
  this.companies$,
)
.pipe(
  take(1),
)
.subscribe(([statuses, companies]) => {
  console.log('forkjoin');
  this._countStatus(statuses, companies);
});
Run Code Online (Sandbox Code Playgroud)

两个来源一经发出,combineLatest便会发出并take(1)在此之后立即退订。

  • mergeLatest 的问题在于它返回首先触发的可观察对象的数据。我必须更改为 forkjoin,因为我需要等待两个可观察对象相互等待。 (2认同)

mar*_*tin 7

一个非常常见的问题forkJoin是,它要求所有源Observable都发出至少一个项目,并且所有项目都必须完成。

换句话说,this.statuses$无论this.companies$是否发射任何东西,直到它们都完成,forkJoin才发射任何东西。

this.statuses$.subscribe(
    res => console.log(res),
    undefined,
    () => console.log('completed'),
);
Run Code Online (Sandbox Code Playgroud)

  • 直到您告诉主题,主题才会完成。因此,您需要手动调用`queryStream.complete()`。但这取决于您的应用程序逻辑,也许您可​​以仅使用“ take(1)”而无需完成“主题”。查看更新如何检查Observable完成 (3认同)

Ric*_*e50 6

不起作用forkJoin,所以我使用下面的代码来解决我的问题。您可以mergeMap将外部订阅的结果映射到内部订阅并根据需要进行订阅

this.statuses$.pipe(
    mergeMap(source => this.companies$.pipe(
        map(inner => [source , inner])
        )
    )
).subscribe(([e , r]) => {
    console.log(e , r);
})
Run Code Online (Sandbox Code Playgroud)