rxjs/Observable:获取第一个流后运行一次函数(连续可观察)

shi*_*les 6 observable rxjs angularfire2 angular

首先,抱歉长标题.

我想订阅连续流与数组的forEachangularfire2,但我也想我已经证实后,第一组数据已经在运行一个函数:

this.people.forEach((person) => {
    person.items = this.database.list('/items' + person.key);
    person.items.subscribe((data) => {person.itemsList = data});
});

myIntendedFunction();
Run Code Online (Sandbox Code Playgroud)

有没有办法myIntendedFunction()这样:

  1. 它在data每个收到第一个流之后运行person,并且
  2. 它只运行一次?

j2L*_*L4e 6

如果您不希望第一个请求发生两次,那么实现起来会稍微复杂一些:

const connectables: ConnectableObservable<any>[] = [];

this.people.forEach(person => {
    person.items = this.database.list('/items' + person.key);
    const connectable = person.items.publish();
    connectables.push(connectable);
    connectable.subscribe((data) => {person.itemsList = data});
});

Observable.zip(...connectables).take(1).subscribe(myIntendedFunction);

connectables.forEach(c => c.connect());
Run Code Online (Sandbox Code Playgroud)

这里发生的事情是: 的效果publish()基本上是您可以多次订阅同一个数据流。此外,在您调用connect(). 如果我们使用person.items.share()which is sugar for person.items.publish().connect(),它会立即发出请求,并且我们的应用程序可能会由于竞争条件而出现问题。

zip()等待每个传递的 observable 发出一个项目,并将这些项目作为数组一次性发出。我们只希望这发生在第一组项目上,所以我们只需要take(1).