合并两个可观察对象,单个输出

fil*_*zyk 5 javascript reactive-programming rxjs angular

大家好,我正在尝试掌握RxJS lib和反应式编程的整个思想。我正在尝试将两个可观察对象合并为一个。第一个可观察对象包含对象数组,DefectImages[]第二个可观察对象包含字符串数组,然后将其转换为 DefectImages[]。之后,我想将这两个可观察值合并为一个。

在我的代码下面:

const observable = CachedPhotosBuffer.getInstance().asObservable()
      .pipe(
        switchMap(data => {
          return data.map((url) => DefectImage.createTempImage(url, 'you', Date.now()));
        })
        );
    this.observable = Observable.create(observer => observer.next(this.defectImages));
    this.observable.pipe(
      merge(observable)
    ).subscribe(data => console.log('merge', data))
Run Code Online (Sandbox Code Playgroud)

这种类型的工作与我期望的一样,但此合并的Observables连接到html角度模板。

<ion-list>
    **<ng-container *ngFor="let image of observable | async">**
      <ion-item *ngIf="image.deletedAt === undefined">
        <span class="item-container" (click)="showImage(image)">
          <ion-thumbnail item-start>
            <img id="{{image.url}}" src="{{getUrl(image) + image.url}}">
          </ion-thumbnail>
          <span>
            <p>created at: {{image.createdAt | date: 'd/M/yy H:m'}}</p>
            <p>created by: {{image.createdBy}}</p>
          </span>
        </span>
        <button ion-button item-end (click)="removeImage(image)">
          <ion-icon name="trash"></ion-icon>
        </button>
      </ion-item>
    </ng-container>
  </ion-list>
Run Code Online (Sandbox Code Playgroud)

这是我正在获取的控制台日志 在此处输入图片说明

我的问题是,为什么每个流都有两个单独的日志,而不是一个控制台日志包含所有数据?

Rol*_*new 11

合并可观察对象意味着由这两个可观察对象发出的项目将由新合并的可观察对象(分别参见此页)依次并分别发出。如果您的可观测对象每个仅发射一项,并且您希望通过串联数组来合并这些项目,则可以使用zip运算符,如下所示:

zip(observable, this.observable)
  .pipe(map(x => x[0].concat(x[1])))
  .subscribe(data => console.log('merge', data))
Run Code Online (Sandbox Code Playgroud)

更准确地说,zip(obsa, obsb)创建一个新的可观察对象,它监听obsa和obsb,并在从obsa接收itema和从obsb接收itemb之后发出item x=[itema, itemb]。在您的情况下x[0]=itemax[1]=itemb是数组,并将(x => x[0].concat(x[1]))这两个数组连接在一起。请注意,如果obsa和obsb发出多个数组,则压缩的observable将始终等待从obsa中获得一项,从obsb中获得一项,然后再发出新的[itema, itemb]。对于concat()方法,请参见此页面

并且不要忘记import { zip } from 'rxjs'import { map } from 'rxjs/operators'