如何使用 RxJs 限制 Angular 中的并发 API 请求

Isa*_*ldt 2 rxjs angular rxjs-observables

我有一个 Angular 7 应用程序,允许您多次上传文件。对于选择的每个文件,它都会向服务器发出请求。这不太好,因为打开数百个并发调用对于我的后端服务器来说似乎是一个潜在问题。

我想做的是限制我的应用程序可以发出的并发请求的数量。我有一个通用 API 类,我想用它来在应用程序范围内进行限制,而不仅仅是文件上传,而不需要文件上传组件本身来管理它。

诚然,我有时对 RxJx 感到困惑,但我很确定这是可能的。

class ApiService {

    get(path: string, params: any = {}): Observable<any> {
        return this.http.get(path`, { params: params });
    }
    
    uploadFile(path: string, body: any = {}): Observable<any> {
        ...code for preparing file here...
        return this.http.post(path, body);
    }
    
}

class FileUploader {

    // called many times-- once for each file
    uploadFile(file) {
        this.apiService.uploadFile(path, body: file).subscribe(response => {
             // use response here
        })
    }
}

Run Code Online (Sandbox Code Playgroud)

我想象的是,在 api 类中,我可以添加到使用最大并发度或其他值的队列中,然后等待调用,直到有空间为止,而不是立即在 fileUpload 或 get 函数中执行 http 调用。但鉴于我立即在文件上传器类中订阅,我不确定如何执行此操作。

PeS*_*PeS 7

您可以使用SubjectandmergeMap运算符

interface FileUpload {
   path: string;
   body: file;
}

export class UploadService {
  private readonly CONCURRENT_UPLOADS = 2;
  private uploadQ = new Subject<FileUpload>();

  constructor(private api: ApiService) {
    this.uploadQ.asObservable().pipe(
      mergeMap(fu => this.api.uploadFile(fu.path, fu.body)).pipe(
        // must catch error here otherwise the subscriber will fail
        // and will stop serving the Q
        catchError(err => {
          console.error('Caught error ', err);
          return of(err);
        })), this.CONCURRENT_UPLOADS),
    ).subscribe((res: WhateverResultYouGet) => {
         // process result  
      }, err => {
        // something went wrong
      });
  }

  // this is your original signature of the method but where do you get path, actually?
  /**
  * Push the file to upload Q
  */
  uploadFile(file) {
    this.uploadQ.next({path, body: file});
  }
}
Run Code Online (Sandbox Code Playgroud)

您只需将上传推送到队列即可,而不是立即启动上传。队列由构造函数中的订阅提供服务,使用mergeMap运算符,您可以在其中实际指定并发性。