是否可以在运行时将可观察量添加到combineLatest(或类似的)中?

dis*_*nte 4 rxjs angular

我有这个用例。当布尔可观察值列表中其中一个为真时,组件不应该是可见的。这是简单的部分,为此,我的服务有一个isBlocked$公共可观察的 API。

困难的部分是我需要在运行时注册新的可观察量,但保留我的isBlocked$引用,这样我当前的订阅就不会中断。

例子:

class MyService {
   isBlocked$: Observable<boolean> =  ?????

   
  add(observable: Observable<boolean>){
   }

}


// my component
...

myService.isBlocked$.subscribe(isBlocked => isBlocked)
// isBlocked = false
myService.add(of(true));
// isBlocked = true
Run Code Online (Sandbox Code Playgroud)

max*_*992 5

这是有可能实现的。

您必须使用更高阶的运算符,例如mergeAll

在你的情况下,它应该类似于以下内容:

class MyService {
  observables$: Subject<Observable<any>> = new Subject();

  isBlocked$: Observable<boolean> = this.observables$.pipe(mergeAll());

  add(observable: Observable<boolean>){
    this.observables$.next(observable);
  }
}
Run Code Online (Sandbox Code Playgroud)

如果您需要以不同于 的方式处理流的流mergeAll,您还可以检查以下内容:
- - -concatAll
zipAll
switch

编辑:

根据要求,这是一个示例:https ://stackblitz.com/edit/rxjs-svzjk5

(启动应用程序并打开控制台)

import { of, Subject, Observable, interval } from 'rxjs'; 
import { map, mergeAll, tap } from 'rxjs/operators';

const animals$$: Subject<Observable<string>> = new Subject();

const addAnimals = (animals: Observable<string>) => animals$$.next(animals);

const animals$: Observable<string> = animals$$.pipe(
  mergeAll()
);

// display the values in the stream
animals$.pipe(
  tap(console.log)
).subscribe()

// add some animals
addAnimals(
  interval(1000).pipe(map((i) => `Wolf ${i}`))
);
addAnimals(
  interval(1500).pipe(map((i) => `Cat ${i}`))
);
addAnimals(
  interval(600).pipe(map((i) => `Dog ${i}`))
);
Run Code Online (Sandbox Code Playgroud)