RXJS等待数组中的所有可观察对象完成(或错误)

jos*_*chs 44 javascript rxjs

我正在将observable推入像这样的数组......

var tasks$ = [];
tasks$.push(Observable.timer(1000));
tasks$.push(Observable.timer(3000));
tasks$.push(Observable.timer(10000));
Run Code Online (Sandbox Code Playgroud)

我想要一个在所有任务$完成后发出的Observable.请记住,在实践中,任务$没有已知数量的Observable.

我已经尝试了,Observable.zip(tasks$).subscribe()但是如果只有一个任务,这似乎失败了,并且让我相信ZIP需要偶数个元素才能按照我期望的方式工作.

我试过Observable.concat(tasks$).subscribe()但是concat运算符的结果似乎是一个可观察数组......例如与输入基本相同.你甚至不能打电话订阅它.

在C#中,这将类似于Task.WhenAll().在ES6承诺它将类似于Promise.all().

我遇到了一些SO问题,但它们似乎都在等待已知数量的流(例如将它们映射到一起).

car*_*ant 74

如果要编写在所有源可观察对象完成时发出的可观察对象,可以使用forkJoin:

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/forkJoin';
import 'rxjs/add/operator/first';

var tasks$ = [];
tasks$.push(Observable.timer(1000).first());
tasks$.push(Observable.timer(3000).first());
tasks$.push(Observable.timer(10000).first());
Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); });
Run Code Online (Sandbox Code Playgroud)

  • 如果`tasks`是动态构建并且为空,请小心,forkJoin将停止可观察序列.有关详细信息,请参阅我的答案http://stackoverflow.com/a/42622968/1224564 (13认同)
  • 对于 rxjs 6:`import {forkJoin} from 'rxjs';` 和 `forkJoin(...tasks$).subscribe(results => { console.log(results); });`。 (4认同)

Joã*_*tti 7

您可以使用zip

组合多个Observable来创建一个Observable,其值是根据其每个输入Observable的值依次计算得出的。

const obsvA = this._service.getObjA();
const obsvB = this._service.getObjB();
// or with array
// const obsvArray = [obsvA, obsvB];

const zip = Observable.zip(obsvA, obsvB);
// or with array
// const zip = Observable.zip(...obsvArray);
zip.subscribe(
  result => console.log(result), // result is an array with the responses [respA, respB]
);
Run Code Online (Sandbox Code Playgroud)

注意事项:

  • 不需要是偶数个可观察值。
  • zip 视觉上
  • 在此处输入图片说明 至于说在这里

    zip运算符将订阅所有内部可观测对象,等待每个可观测对象发出值。一旦发生这种情况,将发出具有相应索引的所有值。这将持续到至少一个内部可观察到的对象完成为止。

  • 当一个可观察对象抛出一个错误(或什至两个都发生)时,订阅将关闭(onComplete在完成时被调用),并且使用一种onError方法,您只会得到第一个错误。
  • zip.subscribe(
      result => console.log(result), // result is an array with the responses [respA, respB]
      error => console.log(error), // will return the error message of the first observable that throws error and then finish it
      () => console.log ('completed after first error or if first observable finishes)
    );
    
    Run Code Online (Sandbox Code Playgroud)

    • 注意:使用 rxjs6,您可以在管道中使用“zip”,但必须使用“zipAll”。如果您确实使用 zip,您将收到弃用警告,并且正确的输入将无法通过。但是,您可以仅使用“zip(...)”静态地使用 zip(不再是“Observable.zip(...)”) (2认同)