RXJS 等待所有 Observables 完成并返回结果

amc*_*dnl 5 rxjs ngrx

我正在尝试创建一个 RX 流,它将执行异步的 XHR 调用列表,然后在进行下一次调用之前等待它们完成。

为了帮助解释这一点,可以在普通 JS 中这样写:

try {
    await* [
        ...requests.map(r => angularHttpService.get(`/foo/bar/${r}`))
    ];
} catch(e) { throw e }

// do something
Run Code Online (Sandbox Code Playgroud)

这是我正在尝试的代码,但它单独运行它们,而不是在继续之前等待它们全部完成。(这是一个 NGRX 效果流,因此它与 vanilla rx 略有不同)。

mergeMap(
        () => this.requests, concatMap((resqests) => from(resqests))),
        (request) =>
            this.myAngularHttpService
                .get(`foo/bar/${request}`)
                .pipe(catchError(e => of(new HttpError(e))))
    ),
    switchMap(res => new DeleteSuccess())
Run Code Online (Sandbox Code Playgroud)

Sal*_*ani 5

您可以使用forkJoin,它将从每个已完成的可观察量中发出最后发出的值。以下是链接文档中的示例:

    import { mergeMap } from 'rxjs/operators';
    import { forkJoin } from 'rxjs/observable/forkJoin';
    import { of } from 'rxjs/observable/of';
    
    const myPromise = val =>
      new Promise(resolve =>
        setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
      );
    
    const source = of([1, 2, 3, 4, 5]);
    //emit array of all 5 results
    const example = source.pipe(mergeMap(q => forkJoin(...q.map(myPromise))));
    /*
      output:
      [
       "Promise Resolved: 1",
       "Promise Resolved: 2",
       "Promise Resolved: 3",
       "Promise Resolved: 4",
       "Promise Resolved: 5"
      ]
    */
    const subscribe = example.subscribe(val => console.log(val));

Run Code Online (Sandbox Code Playgroud)

还有Peter B Smith的这个不错的食谱,也用于forkJoin相同的建议,我将复制/粘贴其内容如下:


复制自:https ://gist.github.com/peterbsmyth/ce94c0a5ddceb99bab24a761731d1f07


使用 @ngrx/Effects 进行链式 API 调用

目的

此配方对于通过单个操作来烹饪链式 API 调用非常有用。

描述

在下面的示例中,调度了一个名为 的操作POST_REPO,其目的是在 GitHub 上创建一个新的存储库,然后在创建后使用新数据更新 README。为此,GitHub API 需要 4 个 API 调用:

  1. 发布一个新的转发
  2. 获取新存储库的主分支
  3. 获取master分支上的文件
  4. 放置 README.md 文件

Payload.repoPOST_REPO's payload contains 包含 API 调用 1 所需的信息。API 调用 1 的响应对于 API 调用 2 是必需的。API 调用 2 的响应对于 API 调用 3 是必需的。API 调用 3 的响应和 `payload.file,其中包含更新 README.md 文件所需的信息,对于 API 调用 4 是必需的。

使用Observable.ForkJoin使这成为可能。

例子

import { Injectable } from '@angular/core';
import { Effect, Actions } from '@ngrx/effects';
import { Action } from '@ngrx/store';
import { Observable } from 'rxjs/Observable';
import { of } from 'rxjs/observable/of';
import { handleError } from './handleError';


import { GithubService } from '../services/github.service';
import * as githubActions from '../actions/github';

@Injectable()
export class GitHubEffects {
  @Effect()
  postRepo$: Observable<Action> = this.actions$
    .ofType(githubActions.POST_REPO)
    .map((action: githubActions.PostRepo) => action.payload)
    // return the payload and POST the repo
    .switchMap((payload: any) => Observable.forkJoin([
      Observable.of(payload),
      this.githubService.postRepo(payload.repo)
    ]))
    // return the repo and the master branch as an array
    .switchMap((data: any) => {
      const [payload, repo] = data;
      return Observable.forkJoin([
        Observable.of(payload),
        Observable.of(repo),
        this.githubService.getMasterBranch(repo.name)
      ]);
    })
    // return the payload, the repo, and get the sha for README
    .switchMap((data: any) => {
      const [payload, repo, branch] = data;
      return Observable.forkJoin([
        Observable.of(payload),
        Observable.of(repo),
        this.githubService.getFiles(repo.name, branch)
          .map((files: any) => files.tree
            .filter(file => file.path === 'README.md')
            .map(file => file.sha)[0]
          )
      ]);
    })
    // update README with data from payload.file
    .switchMap((data: any) => {
      const [payload, repo, sha] = data;
      payload.file.sha = sha;
      return this.githubService.putFile(repo.name, payload.file);
    });

  constructor(
    private actions$: Actions,
    private githubService: GithubService,
  ) {}
}
Run Code Online (Sandbox Code Playgroud)