如何创建一个 Observable 仅在订阅时每 10 秒(间隔)更新一次值?

L. *_*der 6 rxjs ngrx angular

我正在用 RxJS 构建一个有角度的应用程序。在我的 StoreService 中,我想要一个 Observable,我的组件可以订阅它。Observable public unreadMessagesCount: Observable<number>保存来自 api 的未读消息的值。

应该每 10 秒完成一次 api 调用,并且 observable 应该用新值更新自己。但是如果没有组件订阅了这个 observable,则不应进行轮询。当有人订阅时,应该立即完成。这样数据从一开始就是最新的。

有没有一种优雅的方式用 RxJS 来构建它?提前致谢!

小智 5

用于timer每 N 秒创建一个可观察的发射,从 M 秒开始:

interval$ = timer(1, 10000);
Run Code Online (Sandbox Code Playgroud)

然后您可以声明您的 API 调用:

APICall$ = this.http.get('...')
Run Code Online (Sandbox Code Playgroud)

最后,从这两个创建一个可观察的:

pulledData$ = this.interval$.pipe(
  switchMap(() => this.APICall$)
);
Run Code Online (Sandbox Code Playgroud)

现在,除非您订阅它,否则您将没有任何价值。但是,一旦您订阅了它,您就会进行 API 调用(尽管在 1 毫秒之后),并且每十秒就会更新您的调用。请记住在销毁组件时取消订阅!


jo_*_*_va 5

您可以使用intervalswitchMap创建一个可观察的对象,当您订阅它时,它将开始轮询您的 API:

import { interval } from 'rxjs';

const unreadMessagesCount$ = interval(10000).pipe(switchMap(_ => apiCall()));
Run Code Online (Sandbox Code Playgroud)

但是,对于您进行的每个订阅,API 调用都会重复。如果您不希望发生这种情况,则必须在所有订阅者之间共享可观察的源。您可以为此使用多播

import { interval, Subject } from 'rxjs';
import { multicast} from 'rxjs/operators';

const unreadMessagesCount$ = interval(10000).pipe(
  switchMap(_ => apiCall()),
  multicast(() => new Subject())
);
Run Code Online (Sandbox Code Playgroud)

然后你只需要调用connectobservable:

unreadMessagesCount$.subscribe(value => ...);
unreadMessagesCount$.subscribe(value => ...);

unreadMessagesCount$.connect();
Run Code Online (Sandbox Code Playgroud)

请参阅此 Stackblitz 演示