如何使用ReactiveX Observables/Subjects处理进度更新?

Tar*_*deX 5 progress observable rxjs rx-java reactivex

我正在编写一个Angular应用程序,它使用ReactiveX API来处理异步操作.我之前在Android项目中使用过API,我非常喜欢它如何简化并发任务处理.但有一件事我不确定如何以正确的方式解决.

如何从正在进行的任务中更新观察者?在这种情况下的任务将花费时间来加载/创建复杂/大型对象,并且我能够返回中间进度,但不能返回对象本身.observable只能返回一个dataType.因此,我知道两种可能性.

  1. 创建一个具有进度字段和数据字段的对象.可以使用Observable.onNext(object)返回此对象.progress字段将在每个onNext上更新,而data字段为空,直到最后一个onNext,这将把它设置为加载的值.

  2. 创建两个可观察对象,一个数据可观察对象和一个进度可观察对象.观察者可以订阅进度更新可观察的进度以及最终加载/创建数据时可通知的数据.这些也可以选择性地压缩在一起进行一次订阅.

我使用了这两种技术,它们都有效,但我想知道是否有统一的标准,干净的方式,如何解决这个任务.当然,它也可以是一个全新的.我为每个解决方案开放.

Tar*_*deX 4

经过仔细考虑,我使用了类似于我的问题中的选项二的解决方案。主要可观察的内容与操作的实际结果有关。本例中为 http 请求,但文件迭代示例类似。它由“work”函数返回。

可以通过函数参数添加第二个观察者/订阅者。该订阅者只关心进度信息。这样,所有操作都是空安全的,并且不需要类型检查。

如果不需要进度 UI 更新,可以使用没有进度观察器的工作函数的第二个版本。

export class FileUploadService {

 doWork(formData: FormData, url: string): Subject<Response> {
    return this.privateDoWork(formData, url, null);
 }

 doWorkWithProgress(formData: FormData, url: string, progressObserver: Observer<number>): Subject<Response> {
    return this.privateDoWork(formData, url, progressObserver);
 }

 private privateDoWork(formData: FormData, url: string, progressObserver: Observer<number> | null): Subject<Response> {

     return Observable.create(resultObserver => {
     let xhr: XMLHttpRequest = new XMLHttpRequest();
     xhr.open("POST", url);

     xhr.onload = (evt) => {
         if (progressObserver) {
            progressObserver.next(1);
            progressObserver.complete();
            }
         resultObserver.next((<any>evt.target).response);
         resultObserver.complete()
     };
     xhr.upload.onprogress = (evt) => {
         if (progressObserver) {
            progressObserver.next(evt.loaded / evt.total);
         }

     };
     xhr.onabort = (evt) => resultObserver.error("Upload aborted by user");
     xhr.onerror = (evt) => resultObserver.error("Error");

     xhr.send(formData);
     });
 }
Run Code Online (Sandbox Code Playgroud)

这是包含进度订阅者的函数调用。使用此解决方案,上传函数的调用者必须创建/处理/拆除进度订阅者。

 this.fileUploadService.doWorkWithProgress(this.chosenSerie.formData, url, new Subscriber((progress) => console.log(progress * 100)).subscribe(
    (result) => console.log(result),
    (error) => console.log(error),
    () => console.log("request Completed")
    );
Run Code Online (Sandbox Code Playgroud)

总体而言,与具有单个订阅的“配对”对象相比,我更喜欢此解决方案。没有必要的空处理,并且我得到了一个干净的关注点分离。

该示例是用 Typescript 编写的,但其他 ReactiveX 实现也应该可以实现类似的解决方案。