rxjs - 观察多个可观察值,在任何变化时做出响应

syn*_*t1c 13 rxjs typescript

我希望能够观察多个可观察量,并在其中任何一个发生变化时执行一些操作。与zip 的工作原理类似,但不需要每个可观察值更改其值。同样forkJoin也不适合,因为它仅在所有观察到的可观察对象都触发时才会触发。

例如。理想情况下,当任何可观察值发生变化时one$,订阅函数应该触发给出可观察值的当前状态。目前我正在使用BehavourSubject 的值应该可以使用two$three$BehaviourSubject.getValue()

magicCombinator(
  one$,
  two$,
  three$,
).subscribe(([ one, two, three ]) => {
  console.log({ one, two, three });
})
Run Code Online (Sandbox Code Playgroud)

这样的组合器存在吗?

如果需要,这里有一个带有示例代码的stackblitz 。

当前的工作代码将可观察量合并到单个流中,并将每个流的结果缓存到 BehavourSubjects 中。

const magicCombinator = <T = any>(...observables: Observable<T>[]) =>
  new Observable((subscriber: Subscriber<T>) => {
    // convert the given Observables into BehaviourSubjects so we can synchronously get the values
    const cache = observables.map(toBehavourSubject);
    // combine all the observables into a single stream
    merge(...observables)
      // map the stream output to the values of the BehavourSubjects
      .pipe(map(() => cache.map(item => item.getValue())))
      // trigger the combinator Observable
      .subscribe(subscriber);
  });
Run Code Online (Sandbox Code Playgroud)

Pan*_*kos 7

这就是你需要的

合并

创建一个输出 Observable,它同时发出每个给定输入 Observable 的所有值

请参阅合并文档

您可以将所有可观察量合并在一起,并通过单个订阅来监听每个可观察量发出的值。

我希望能够观察多个可观察量,并在其中任何一个发生变化时执行一些操作。与 zip 的工作原理类似,但不需要每个可观察值更改其值。

Obsevables 不会改变值,也不会保持状态。它们只是发出值,直到完成然后关闭。

如果您希望能够在给定时间获得所有发出的值,那么您应该使用 ReplaySubject。关于重播主题的好文章但是每次您想要再次检索所有发出的值时都需要进行新的订阅。

  • @BizzyBob `combineLatest` 仅当所有可观察对象至少触发一次时才会触发,理想情况下我想要相同的行为,但不要求每个可观察对象触发一次。 (2认同)