RXJS - 3 个并行可观察量,然后发出值并使用它来串联调用 3 个可观察量

B.B*_*njo 2 rxjs angular

关闭

也许我的问题有点复杂,但我不知道如何实现我想要的。

上下文
三个并行可观察量发出值,然后,当我拥有所有三个值时,我对它们进行一些修改,然后我想串联调用三个可观察量。就像下图一样: 在此输入图像描述

现在?
目前,我设法通过将三个并行可观察量放入zip运算符中来做到这一点,然后订阅它,修改值,完成后调用另一个,订阅,然后完成......三次!

this.service.Function(id) //return zip(Ob1, Ob2, Ob3)
  .subscribe(
    ([val1, val2, val3]) => {
      /*DO SOMETHING*/
      this.tmp1 = val1;
      this.tmp2 = val2;
      this.tmp3 = val3;
    },
    () => {}, //on error
    () => { //on complete
      let newV1, newV2, newV3 = [];
      [newV1, newV2, newV3 ] = [
        this.tmp1.map(x => x.id2),
        this.tmp2.map(x => x.id2),
        this.tmp3.map(x => x.id2)
      ];
      this.service.Function2(newV1)
        .subscribe(res => {
            //DO SOMETHING
          },
          () => {},
          () => { //on complete
            this.service.Function2(newV2)
              .subscribe(res => {
                  //DO SOMETHING
                },
                () => {},
                () => { //on complete
                  this.service.Function2(newV3)
                    .subscribe(res => {
                        //DO SOMETHING
                      },
                      () => {},
                      () => {
                        //DO SOMETHING
                      });
                });
          });
    }
  );
Run Code Online (Sandbox Code Playgroud)

我尝试过使用switchMapconcat
尝试不同的方法,但 concat 不会将我的值返回为数组...

this.kycService.getDocuments(idNotif).pipe(
  switchMap(([val1, val2, val3]) => {
    this.tmp1 = val1;
    this.tmp2 = val2;
    this.tmp3 = val3;

    let newV1, newV2, newV3 = [];
      [newV1, newV2, newV3 ] = [
        this.tmp1.map(x => x.id2),
        this.tmp2.map(x => x.id2),
        this.tmp3.map(x => x.id2)
      ];
    return concat(this.service.Function2(newV1),this.service.Function2(newV2), this.service.Function2(newV3))
  }))
  .subscribe(([Ob_newV1, Ob_newV2, Ob_newV3]) => {
    //DO SOMETHING
    //[Ob_newV1, Ob_newV2, Ob_newV3] Doesn't work, I need to do val => {}
  })
);
Run Code Online (Sandbox Code Playgroud)

如果您对使用什么有任何建议,我对 RXJS 中的所有运算符/函数有点困惑。

先感谢您

我的解决方案

    this.kycService.getDocuments(idNotif).pipe(
  switchMap(([val1, val2, val3]) => {
    this.tmp1 = val1;
    this.tmp2 = val2;
    this.tmp3 = val3;

    let newV1, newV2, newV3 = [];
      [newV1, newV2, newV3 ] = [
        this.tmp1.map(x => x.id2),
        this.tmp2.map(x => x.id2),
        this.tmp3.map(x => x.id2)
      ];
    return concat(this.service.Function2(newV1),this.service.Function2(newV2), this.service.Function2(newV3))
  }))
  .subscribe((val) => {
    //DO SOMETHING
    //val is emitted every time my function2 complete, so I manage to deal with this and rearrange my data
  })
);
Run Code Online (Sandbox Code Playgroud)

小智 6

取决于您的可观察类型。

对于热门可观察对象(主题、商店等),您将使用combineLatest.

对于冷可观察量(HTTP 调用、promise 等),您将使用forkJoin.

我们假设它们很冷。

forkJoin(
  first$.pipe(
    map(result => /* transformation of your first observable */),
    switchMap(result => this.myService.getNextObservableFromFirst())
  ),
  second$.pipe(
    map(result => /* transformation of your second observable */),
    switchMap(result => this.myService.getNextObservableFromSecond())
  ),
  third$.pipe(
    map(result => /* transformation of your third observable */),
    switchMap(result => this.myService.getNextObservableFromThird())
  ),

).subscribe([r1, r2, r3] => /* What to do once all calls are completed */);
Run Code Online (Sandbox Code Playgroud)

看来适合你的情况,它会给

forkJoin(
  first$.pipe(
    map(result => result.id2),
    switchMap(result => this.myService.Function2(result))
  ),
  second$.pipe(
    map(result => result.id2),
    switchMap(result => this.myService.Function2(result))
  ),
  third$.pipe(
    map(result => result.id2),
    switchMap(result => this.myService.Function2(result))
  ),

).subscribe([r1, r2, r3] => /* DO SOMETHING */);
Run Code Online (Sandbox Code Playgroud)