标签: rxjs-observables

如何在链接的 observables 之间传递结果

抽象问题:每次源 Observable 发出和事件时,都需要触发一系列 API 调用和 Angular 服务。其中一些调用取决于以前的结果。

在我的示例中,源 ObservablestartUpload$触发了一系列依赖调用。

使用解构可以这样写:

this.startUploadEvent$.pipe(
      concatMap(event => this.getAuthenticationHeaders(event)),
      map(({ event, headers }) => this.generateUploadId(event, headers)),
      tap(({ event, headers, id }) => this.emitUploadStartEvent(id, event)),
      concatMap(({ event, headers, id }) => this.createPdfDocument(event, headers, id)),
      concatMap(({ event, headers, id, pdfId }) => this.uploadBilderForPdf(event, pdfId, headers, id)),
      mergeMap(({ event, headers, id, pdfId, cloudId }) => this.closePdf(cloudId, event, headers, id, pdfId)),
      tap(({ event, headers, id, pdfId, cloudId }) => this.emitUploadDoneEvent(id, event, cloudId)),
).subscribe()
Run Code Online (Sandbox Code Playgroud)

它几乎读起来像是一种命令式方法。但它有一些问题:

  • 解构链在代码中不断重复并变得越来越长 { …

rxjs typescript angular rxjs-observables

29
推荐指数
3
解决办法
3901
查看次数

在 Jest 中,如何对订阅 observable 的方法进行单元测试

我的组件中有各种方法订阅注入依赖项中的方法,这些方法返回 observables。

我想编写 Jest 单元测试以确保当这些 observables 返回/错误时,我的方法做正确的事情。

在下面的示例中,我正在尝试编写一个测试来检查是否doAThing已触发。以下测试均无效。他们都因错误而失败,例如

'returnMyObservable.subscribe 不是一个函数'。

// Example method to test component 
public testFunction (): void {
    this.myService.returnMyObservable.subscribe(
        ( value ) => this.doAThing( value )
    )
}
Run Code Online (Sandbox Code Playgroud)
// Example method to test component 
public testFunction (): void {
    this.myService.returnMyObservable.subscribe(
        ( value ) => this.doAThing( value )
    )
}
Run Code Online (Sandbox Code Playgroud)

unit-testing typescript jestjs rxjs-observables

8
推荐指数
2
解决办法
8768
查看次数

如何在订阅方法中取消订阅 RXJS 订阅?

我有一些 javascript:

this.mySubscription = someObservable.subscribe((obs: any) => {
   this.mySubscription.unsubscribe();
   this.mySubscription = undefined;
}
Run Code Online (Sandbox Code Playgroud)

执行时,控制台会记录错误ERROR TypeError: Cannot read property 'unsubscribe' of undefined。我想知道为什么我不能在 subscribe lambda 函数中取消订阅。有正确的方法吗?我已经阅读了一些关于使用虚拟主题并完成它们或使用 takeUntil/takeWhile 和其他管道操作符 workArounds 的内容。

在订阅的订阅功能中取消订阅订阅的正确方法/解决方法是什么?

我目前正在使用一个虚拟订阅,如下所示:

mySubscription: BehaviorSubject<any> = new BehaviorSubject<any>(undefined);


// when I do the subscription:
dummySubscription: BehaviorSubject<any> = new BehaviourSubject<any>(this.mySubscription.getValue());
this.mySubscription = someObservable.subscribe((obs: any) => {
    // any work...
    dummySubscription.next(obs);
    dummySubscription.complete();
    dummySubscription = undefined;
}, error => {
    dummySubscription.error(error);
});

dummySubscription.subscribe((obs: any) => {
    // here the actual work to do when …
Run Code Online (Sandbox Code Playgroud)

rxjs rxjs-observables

6
推荐指数
1
解决办法
1960
查看次数

检测主题何时不再有订阅

我正在实现一个角度服务,让消费者根据他们的 id 观察各种值:

它的本质是这样的:

private subjects = new Map<number, Subject<any>>();

public subscribe(id: number, observer: any): Subscription {
  // try getting subject for this id (or undefined if it does not yet exist)
  let subj = this.subjects.get(id);

  // create subject if it does not yet exist
  if (!subj) {
    subj = new Subject<any>();
    this.subjects.set(id, subj);
  }

  // subscribe observer
  const subscription = subj.subscribe(observer);

  // set up teardown logic (gets called when subscription is unsubscribed)
  subscription.add(() => { 
    // remove subject from …
Run Code Online (Sandbox Code Playgroud)

rxjs subject-observer angular rxjs-observables

5
推荐指数
1
解决办法
3421
查看次数

使用 RxJS 进行 Angular 异步延迟渲染

我正在寻找在 Angular 中使用 RxJS 进行“延迟渲染”的方法,我想要实现的目标如下:

<div *ngFor="let item of items$ | async">
  {{item.text}}
<div>
Run Code Online (Sandbox Code Playgroud)

在我的组件中:

export class ItemsComponent implements OnInit {
  public items$: Observable<Item[]>;
  
  constructor(private setStore: SetStore){}

  ngOnInit() {
     const setId = 1;
     this.items$ = this.setStore.sets$.pipe(map(sets => sets.find(set => set.id = 1).items));
  }
}
Run Code Online (Sandbox Code Playgroud)

这工作正常,但当集合有 +50 个项目时,渲染需要时间并且冻结一秒或更长时间。我一直在寻找一种懒惰的方法,通过某种方式渲染前 30 个项目,然后在 500 毫秒后加载下一个 30 个项目,依此类推,直到列表到达末尾。

编辑: 我尝试过这种方法:


const _items$ = this.setStore.sets$.pipe(
  map(sets => sets.find(set => set.id == 1).items)
);
const loadedItems = [];
_items$.subscribe(data => {
  this.items$ = from(data).pipe(
    concatMap(item => { …
Run Code Online (Sandbox Code Playgroud)

asynchronous rxjs angular rxjs-observables

5
推荐指数
1
解决办法
4584
查看次数

对基于自定义 ReplaySubject 的可观察对象的同步阻塞订阅调用

该视图包含以下元素:

<div *ngIf="showMe">Hello</div>
Run Code Online (Sandbox Code Playgroud)

调用组件方法时:

downloadDemo(): void {
  this.download$ = this.downloadService.downloadUrlAsBlobWithProgressAndSaveInFile('assets/skypeforlinux-64.deb', 'demo')
  this.download$.subscribe((download: Download) => {
    this.showMe = true;
    console.log('Progress: ' + download.progress);
  })
}
Run Code Online (Sandbox Code Playgroud)

该元素显示在视图中所有Progress记录器之前。事情也应该如此。这种基于 HTTP 的下载工作得很好。

然而,当调用组件方法时:

downloadSoundtrack(soundtrack: Soundtrack): void {
  const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
  const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi(soundtrack);
  this.download$ = this.downloadService.downloadObservableDataAsBlobWithProgressAndSaveInFile(progress$, fileName);
  this.download$.subscribe((download: Download) => {
    this.showMe = true;
    console.log('Progress: ' + download.progress);
  })
}
Run Code Online (Sandbox Code Playgroud)

该元素最后显示在视图中的所有Progress记录器之后。它不应该是这样的。这个ReplaySubject基于自定义的可观察对象没有按预期工作。事实上,该元素应该显示在所有记录器之前而不是之后Progress

我想看看一个订阅调用是否被阻塞。

于是我把这两种方法改成了:

downloadSoundtrack(soundtrack: Soundtrack): void {
  const fileName: …
Run Code Online (Sandbox Code Playgroud)

observable angular rxjs-observables

5
推荐指数
1
解决办法
528
查看次数

ngIf 异步管道作为仅进行空检查的值

我有一个可观察量,我想在 ngIf 中创建一个变量,并且仅在值为 null 时返回 false (可观察量返回一个数字)

我需要显式检查 null,因为我的 observable 可以返回 0 作为触发 else 块的值。

我尝试过以下方法

*ngIf="(observable$ | async) as obs; obs !== null; esle #elseTemplate"
Run Code Online (Sandbox Code Playgroud)
*ngIf="((observable$ | async) as obs) !== null; esle #elseTemplate"
Run Code Online (Sandbox Code Playgroud)
*ngIf="(observable$ | async) !== null; $implicit = obs; else #elseTemplate"
// this returns the boolean 
Run Code Online (Sandbox Code Playgroud)

我当前的解决方案看起来不太优雅是

*ngIf="(observable$ | async) !== null; esle #elseTemplate"
{{ observable$ | async }}
Run Code Online (Sandbox Code Playgroud)

我正在使用 Angular 10。

rxjs angular rxjs-observables async-pipe angular10

5
推荐指数
1
解决办法
4676
查看次数

通过可观察值进行映射并返回嵌套可观察值的值

希望你能帮助我。

\n

我有一个名为 getEnergyFromPower 的函数,它返回一串值:

\n
getEnergyFromPower(systemId: number, objectId?: string) {\n    ...\n\n    return this.httpClient.get(this.BASE_URL + this.AGGREGATE_URL, {\n      headers: this.BEARER_OPTIONS.headers,\n      responseType: 'text',\n      params: params\n    })\n  }\n\n
Run Code Online (Sandbox Code Playgroud)\n

另一个应该循环遍历 Observable 的值。\n但是当我订阅和 console.log 它时,它返回一个可观察数组,如下所示:

\n
organizeEnergy(systemId: number = 49439) {\n    return this.getSystemLayout(systemId).pipe(\n      map(res => {\n        return res.map(({object_id}) => this.getEnergyFromPower(49439, object_id))\n      })\n    )\n  }\n\n\nthis.optService.organizeEnergy().subscribe(res => console.log(res))\n\n// Returns: \n// [Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable]\n\n
Run Code Online (Sandbox Code Playgroud)\n

我测试了 object_id 是否正确并且它返回正确的值,如下所示:

\n
organizeEnergy(systemId: number = 49439) {\n    return this.getSystemLayout(systemId).pipe(\n      map(res => {\n        return res.map(({object_id}) => …
Run Code Online (Sandbox Code Playgroud)

javascript observable rxjs angular rxjs-observables

5
推荐指数
1
解决办法
332
查看次数

如何在 RxJS 中等待一致状态?

我有一个代码来获取与之相关的书籍和借书卡:

// mimic http requests
const fetchBook = (bookId: number) => {
    const title = 'Book' + bookId;
    return timer(200).pipe(mapTo({ bookId, title }));
}
const fetchLibraryCard = (bookId: number) => {
    const borrowerName = 'Borrower of Book' + bookId;
    return timer(300).pipe(mapTo({ borrowerName }));
}

const bookId$ = new Subject<number>();

const book$ = bookId$.pipe(
    switchMap(bookId => fetchBook(bookId)),
    shareReplay(1)
);

// e.g. 'Refresh library card' button
const libraryCardUpdater$ = new BehaviorSubject<void>(undefined);

const libraryCard$ = combineLatest([bookId$, libraryCardUpdater$]).pipe(
    switchMap(([bookId]) => fetchLibraryCard(bookId)),
    shareReplay(1)
);

combineLatest([book$, …
Run Code Online (Sandbox Code Playgroud)

rxjs rxjs6 rxjs-observables

5
推荐指数
3
解决办法
408
查看次数

我需要取消订阅 Angular Observable 吗?

令人惊讶的是,很难得到关于是否以及何时应该取消订阅 Angular Observable 的直接答案。

我有以下场景:

this.subscription = this.service.makeHTTPCall(stuff).subscribe(x => {
//do something
});
Run Code Online (Sandbox Code Playgroud)

我看到的解决方案有以下几种:

  1. 不要将订阅存储为变量,这是否意味着我不必取消订阅?

    this.service.makeHTTPCall(stuff).subscribe(x => {
    //do something
    });
    
    Run Code Online (Sandbox Code Playgroud)
  2. 将订阅存储为变量并在 ngOnDestroy 中取消订阅

    ngOnDestroy() {
    if (this.subscription) { this.subscription.unsubscribe(); }
    }
    
    Run Code Online (Sandbox Code Playgroud)
  3. 什么也不做,Angular 会为你整理所有取消订阅的内容

我知道有像 之类的第三方库ng-take-until-destroy,但假设我们没有任何第三方库,这是建议的取消订阅方法吗?

rxjs typescript angular rxjs-observables

5
推荐指数
2
解决办法
8236
查看次数