RxJS Angular:监听一个可观察值并使用其他两个可观察值的最新值

Ale*_*lex 4 reactive-programming rxjs angular

我正在使用 Redux 和 RxJS Angular (4) 实现来将状态和操作反应性地管理到我的应用程序中。

我正在监听一个事件(产品列表中所选产品 ID 的更改),因此我们将其称为selectedProductId$.

当相关的发射器发出一个新的 selectedId 值时,我想从这个 Observable 值中执行 a 操作switchMap,以及从其他两个 Observables 中发出的最新值(比如说;productDetails$productComments$)。

因此,“触发更改”应该是 selectedProductID,我们将switchMap使用 ProductDetails 和 ProductComments 的最新值进行操作。

我试过这个:

// subscribed to the 3 observable passed in combineLatest
 fetchProduct$: Observable<any>;
 this.fetchProduct$ = Observable.combineLatest(
       this.selectedProductId$,
       this.productComments$,
       this.productDetails$
 ).switchMap(
       result => Observable.of(
         {
           'uuid': result[0],
           'comments': result[1],
           'details': result[2]
         }
      )
 );
Run Code Online (Sandbox Code Playgroud)

然后我订阅这个 observable 并在传递的值上放置一个断点:

this.fetchProductSubscription = this.fetchProduct$.subscribe(result => this.someFn(value));
Run Code Online (Sandbox Code Playgroud)

它可以工作,但每次三个 Observables 之一检测到状态变化时都会被调用。我也尝试过 Observable.forkJoin,但从文档来看,它等待三个 Observable 发出新值,这让我感到很奇怪,因为我从另一个 Web 服务获得注释,并且在选择下一个产品 ID 时它可以检索它。

Observable.merge 也不是一种解决方案,因为它只会在一个“通道”上一一发出值。

我还没有完全习惯反应式编程,也许我没有清楚地看到明显的解决方案,但我知道我会喜欢提示或一些帮助来解决这个问题:( 非常感谢

mag*_*tic 5

Try withLatestFrom,其工作方式类似于combineLatest()但每次源可观察对象触发时都会触发。示例(对于 rxjs 5.5):

this.fetchProduct$ = 
  this.selectedProductId$.pipe(
   withLatestFrom(
       this.productComments$,
       this.productDetails$
   ));
Run Code Online (Sandbox Code Playgroud)

或者对于早期版本:

this.fetchProduct$ = 
  this.selectedProductId$.withLatestFrom(
       this.productComments$,
       this.productDetails$
   );
Run Code Online (Sandbox Code Playgroud)