使用包含来自端点的数组的 observable 来查询其他端点并组合成单个 observable

Jar*_*ple 1 rxjs angular

我想使用 rxjs 创建一个方法来查询端点,该端点返回一个对象数组,然后使用该响应中的数据从另一个端点获取有关数组中项目的其他详细信息,最后返回一个对象数组作为可观察到的。

这段代码可以工作,但我想在没有点击操作符内部订阅的情况下执行此操作。

testMethod() {
  this.testService.getItemList().pipe(
    tap((items) => {
      items.forEach((item, i) => {
        this.itemArr.push(item);
        this.testService.getItemDetails(item.id).subscribe(itemDetails => {
          this.itemArr[i]['details'] = itemDetails;
        });
      });
    })
  ).subscribe();
}
Run Code Online (Sandbox Code Playgroud)

Pic*_*cci 5

您可以通过多种方式使用 rxJs 执行此类操作。它们管理对第二端点(即返回每个项目的详细信息的端点)调用的并行性的方式有所不同。

最大并行度

假设您从第一个端点接收到数组中的 10 个项目,并且您希望并行运行对第二个端点的所有 10 个调用。forkJoin在这种情况下,您可以像这样使用运算符

testMethod() {
  this.testService.getItemList().pipe(
    // transform the array of items into an array of Observables
    // note that the outer map is the rxJs operator while the inner one is the
    // the javascript array method
    // note also that we return the array of items since we will need it later
    map(items => [items.map(item => 
       this.testService.getItemDetails(item.id)), items]
    ),
    // then switch to a new Observable which will emit when all of the calls
    // to the second endpoint have returned
    switchMap(([arrayOfObs, items]) => forkJoin(arrayOfObs).pipe(
       // return both the results of the calls to the second endpoint and the
       // original array of items
       map(itemDetails => [itemDetails, items])
    )),
    // finally augment the original array of items with the detail info
    // as in your original code but with no subscription any more
    tap(([itemDetails, items]) => {
      items.forEach((item, i) => {
        this.itemArr.push(item);
        this.itemArr[i]['details'] = itemDetails[i];
      });
    })
  ).subscribe();
}
Run Code Online (Sandbox Code Playgroud)

没有并行性

如果您确实想顺序执行对第二个端点的调用,您可以使用concatMap如下运算符

testMethod() {
  this.testService.getItemList().pipe(
    // transform the array of items into a new stream which notifies sequentially
    // each item in the array - we use the from rxJs function to create the new stream (i.e. the new Observable)
    switchMap(items => from(items)),
    // then concatenate the calls to the second endpoint with concatMap
    concatMap(item => this.testService.getItemDetails(item.id).pipe(
       // return the original item with its details
       map(itemDetail => {
         item['details'] = itemDetail;
         return item
       })
    )),
    // finally gather all items into an array
    toArray()
  ).subscribe();
}
Run Code Online (Sandbox Code Playgroud)

受控并发

如果您想要一定程度的并行性,例如最多并行 5 个调用,您可以使用 的第二个参数contactMap指定mergeMap并发级别mergeMap