对基于自定义 ReplaySubject 的可观察对象的同步阻塞订阅调用

Ste*_*ane 5 observable angular rxjs-observables

该视图包含以下元素:

<div *ngIf="showMe">Hello</div>
Run Code Online (Sandbox Code Playgroud)

调用组件方法时:

downloadDemo(): void {
  this.download$ = this.downloadService.downloadUrlAsBlobWithProgressAndSaveInFile('assets/skypeforlinux-64.deb', 'demo')
  this.download$.subscribe((download: Download) => {
    this.showMe = true;
    console.log('Progress: ' + download.progress);
  })
}
Run Code Online (Sandbox Code Playgroud)

该元素显示在视图中所有Progress记录器之前。事情也应该如此。这种基于 HTTP 的下载工作得很好。

然而,当调用组件方法时:

downloadSoundtrack(soundtrack: Soundtrack): void {
  const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
  const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi(soundtrack);
  this.download$ = this.downloadService.downloadObservableDataAsBlobWithProgressAndSaveInFile(progress$, fileName);
  this.download$.subscribe((download: Download) => {
    this.showMe = true;
    console.log('Progress: ' + download.progress);
  })
}
Run Code Online (Sandbox Code Playgroud)

该元素最后显示在视图中的所有Progress记录器之后。它不应该是这样的。这个ReplaySubject基于自定义的可观察对象没有按预期工作。事实上,该元素应该显示在所有记录器之前而不是之后Progress

我想看看一个订阅调用是否被阻塞。

于是我把这两种方法改成了:

downloadSoundtrack(soundtrack: Soundtrack): void {
  const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
  const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi(soundtrack);
  this.download$ = this.downloadService.downloadObservableDataAsBlobWithProgressAndSaveInFile(progress$, fileName);
  this.showMe = true;
  this.download$.subscribe((download: Download) => {
    console.log('Progress: ' + download.progress);
  });
  console.log('Call done');
}

downloadDemo(): void {
  this.download$ = this.downloadService.downloadUrlAsBlobWithProgressAndSaveInFile('assets/skypeforlinux-64.deb', 'demo')
  this.showMe = true;
  this.download$.subscribe((download: Download) => {
    console.log('Progress: ' + download.progress);
  });
  console.log('Call done');
}
Run Code Online (Sandbox Code Playgroud)

以下是调用该方法时的记录器downloadDemo()

Progress: 0
Call done
Progress: 0
Progress: 0
Progress: 2
Progress: 3
Run Code Online (Sandbox Code Playgroud)

我们可以看到subscribe()调用是非阻塞的。

以下是调用该方法时的记录器downloadSoundtrack()

Progress: 96
Progress: 97
Progress: 100
Call done
Run Code Online (Sandbox Code Playgroud)

我们可以看到subscribe()呼叫被阻塞。

添加显式this.detectChanges();调用没有什么区别:

downloadSoundtrack(soundtrack: Soundtrack): void {
  const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
  const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi(soundtrack);
  this.download$ = this.downloadService.downloadObservableDataAsBlobWithProgressAndSaveInFile(progress$, fileName);
  this.download$.subscribe((download: Download) => {
    this.showMe = true;
    this.detectChanges();
    console.log('Progress: ' + download.progress);
  })
}
Run Code Online (Sandbox Code Playgroud)

在所有记录器之后它仍然显示Progress

我还尝试了一些显式订阅来代替*ngIf="download$ | async as download"模板中的 ,但它没有任何帮助:

downloadInProgress(soundtrack: Soundtrack): boolean {
  let inProgress: boolean = false;
  if (soundtrack.download) {
    if (soundtrack.download.progress > 0 && soundtrack.download.progress < 100) {
      inProgress = true;
    } else if (soundtrack.download.progress == 100) {
      console.log('complete');
      soundtrack.download = undefined;
    }
  }
  console.log('inProgress ' + inProgress);
  return inProgress;
}
Run Code Online (Sandbox Code Playgroud)

长期运行的服务:

public progressiveCreateSoundtrackMidi(soundtrack: Soundtrack): Observable<ProgressTask<Uint8Array>> {
  return Observable.create((progressTaskBis$: ReplaySubject<ProgressTask<Uint8Array>>) => {
    this.createSoundtrackMidi(soundtrack, progressTaskBis$);
    progressTaskBis$.complete();
    return { unsubscribe() { } };
  });
}

public createSoundtrackMidi(soundtrack: Soundtrack, progressTask$?: ReplaySubject<ProgressTask<Uint8Array>>): Uint8Array {
  const midi: Midi = new Midi();
  midi.name = soundtrack.name;
  midi.header.name = soundtrack.name;
  let noteIndex: number = 0;
  if (soundtrack.hasTracks()) {
    soundtrack.tracks.forEach((track: Track) => {
      const midiTrack: any = midi.addTrack();
      midiTrack.name = track.name;
      midiTrack.channel = track.channel;
      if (track.hasMeasures()) {
        let totalDurationInSeconds: number = 0;
        for (const measure of track.getSortedMeasures()) {
          if (measure.placedChords) {
            if (!this.notationService.isOnlyEndOfTrackChords(measure.placedChords)) {
              for (const placedChord of measure.placedChords) {
                if (!this.notationService.isEndOfTrackPlacedChord(placedChord)) {
                  const duration: string = placedChord.renderDuration();
                  const durationInSeconds: number = Tone.Time(duration).toSeconds();
                  const velocity: number = placedChord.velocity;
                  // const tempoInMicroSecondsPerBeat: number = this.beatsToMicroSeconds(1, measure.getTempo());
                  // const ticks: number = this.beatsToTicks(durationInBeats, DEFAULT_MIDI_PPQ, tempoInMicroSecondsPerBeat);
                  for (const note of placedChord.notes) {
                    if (!this.notationService.isEndOfTrackNote(note)) {
                      if (progressTask$) {
                        this.commonService.sleep(50);
                        progressTask$.next(this.downloadService.createProgressTask<Uint8Array>(soundtrack.getNbNotes(), noteIndex));
                      }
                      noteIndex++;
                      midiTrack.addNote({
                        midi: this.synthService.textToMidiNote(note.renderAbc()),
                        time: totalDurationInSeconds,
                        // ticks: ticks,
                        name: note.renderAbc(),
                        pitch: note.renderChroma(),
                        octave: note.renderOctave(),
                        velocity: velocity,
                        duration: durationInSeconds
                      });
                    }
                  }
                totalDurationInSeconds += durationInSeconds;
                }
              }
            }
          }
        }
      }
    });
  }
  if (progressTask$) {
    progressTask$.next(this.downloadService.createProgressTask<Uint8Array>(soundtrack.getNbNotes(), soundtrack.getNbNotes(), midi.toArray()));
  }
  return midi.toArray();
}
Run Code Online (Sandbox Code Playgroud)

有一个50ms的sleep调用减慢了文件创建的速度,从而给了一些充足的时间。

下载服务的实现是基于这篇文章

我使用的是 Angular 9.1.0

Smu*_*tje 1

为了简单起见,我建议转向更具反应性的状态驱动方法,您可以执行类似于以下操作的操作:

1.

soundtracks从数组更改为 apublic readonly soundtracks: Observable<Soundtrack[]> = this.soundtracksSubject.asObservable()以使您的 UI 能够注册更改。this.soundtracksSubjectthenprivate readonly soundtracksSubject: BehaviorSubject<Soundtrack[]> = new BehaviorSubject([]);可以用来触发观察者soundtracks刷新。当您收到 HTTP 响应时,您无需设置this.soundtracks = soundtracks;而是调用this.soundtracksSubject.next(soundtracks))

此外,在进行实际下载 ( soundtrack.download = download;) 时,您必须再次调用主题以将模型中的更改传播给侦听器,而不是仅在更改后更改模型:

const updatedSoundtracks: Soundtrack[] = this.soundtracksSubject.value.map(existingSoundtrack => existingSoundtrack);
const soundtrack: Soundtrack = updatedSoundtracks.find(existingSoundtrack => soundtrack.name === existingSoundtrack.name); // Or whatever identifies your soundtrack

if (soundtrack) {
    soundtrack.download = download;
    this.soundtracksSubject.next(updatedSoundtracks);
}
Run Code Online (Sandbox Code Playgroud)

将您的 UI 从 更改为<tr *ngFor="let soundtrack of soundtracks"><tr *ngFor="let soundtrack of soundtracks | async">解决Observable在 Angular 组件中使用它的问题。这也意味着您的组件将注册更改,soundtracks并在有人调用主题/可观察对象时收到通知。

ObservableBehaviorSubject都是 RxJS 概念 ( import {BehaviorSubject, Observable} from "rxjs/index";) 并且值得研究,因为它们让你的生活变得更加轻松。