如何使用 RxJS 运算符避免多个嵌套订阅?

Ben*_*158 9 angular rxjs6

我正在使用 Angular 进行文件加密和上传类。其中许多操作是异步的,因此我编写的方法返回 RxJS Observables。

// 1.
private prepareUpload(file): Observable<T>;

// 2.
private encryptData(data, filekey): Observable<T>

// 3.
private uploadEncryptedData(formData, token, range): Observable<T>

// 4.
private completeUpload(updatedFilekey, token): Observable<T>
Run Code Online (Sandbox Code Playgroud)

我想将这个逻辑封装在一个公共upload(file)方法中,我最终使用了嵌套订阅,它可以工作,但我知道它是错误的,并且是 RxJS 中的反模式,原因有几个。这是代码的简化版本:

public upload(file) {
    const gen = this.indexGenerator(); // generator function

    this.prepareUpload(file).subscribe(values => {
    const [response, filekey, data] = values;

    this.encryptData(data, filekey).subscribe(encryptedDataContainer => {
      const formData = this.prepareEncDataUpload(encryptedDataContainer.data, file.name)
      const range = this.getRange(file.size, gen.next().value);

      this.uploadEncryptedData(formData, response.token, range).subscribe(() => {
        if (range.isFinalPart) {
            this.completeUpload(encryptedDataContainer.updatedFilekey, response.token).subscribe(console.log);
        }
      });

    });

  });

}
Run Code Online (Sandbox Code Playgroud)

我无法使用多个 RxJS 运算符的组合来清理此代码。我的目标是避免嵌套订阅,而是upload()在工作流完成时从公共方法返回单个 Observable 。

谢谢!

Muh*_*yaz 7

您可以使用RxJs 中的mergeMapfilter运算符并链接您的调用。您将需要创建一些在链接期间使用的函数级变量。

import { mergeMap, filter, catchError } from 'rxjs/operators`
Run Code Online (Sandbox Code Playgroud)
public upload(file) {
    const gen = this.indexGenerator(); // generator function
    let range, token;
    this.prepareUpload(file)
      .pipe(
        mergeMap((values) => {
          const [response, filekey, data] = values;
          token = response.token;
          return this.encryptData(data, filekey);
        }),
        mergeMap(encryptedDataContainer => {
          const formData = this.prepareEncDataUpload(encryptedDataContainer.data, file.name)
          range = this.getRange(file.size, gen.next().value);

          return this.uploadEncryptedData(formData, token, range);
        }),
        filter(() => !!range.isFinalPart),
        mergeMap(() => {
          return this.completeUpload(encryptedDataContainer.updatedFilekey, token);
        })
        catchError((error) => {
          console.log(error);
          // handle the error accordingly.
        })
      )
      .subscribe(() => {
        console.log('success');
      });

  }
Run Code Online (Sandbox Code Playgroud)