自定义 rxjs 运算符(缓冲区)阻止 Angular 应用程序稳定

Mac*_*cik 6 rxjs angular

我正在开发一个自定义 rxjs 运算符来扩展bufferTime运算符功能。BufferTime 需要时间跨度和缓冲区的最大大小(可选),但在我的情况下,我不仅需要根据时间和大小,还需要根据附加触发器(routeChange)重置缓冲区。

我创建了一个自定义运算符来处理这个问题,它需要 timeSpan、maxSize 和 trigger$,为 timeSpan 缓冲区和 size 缓冲区创建可观察量,最后将这 3 个缓冲区合并在一起并作为运算符的单个通知程序传递buffer

这是源代码:

function bufferFrom<T>(...triggers: Observable<any>[]): (source$: Observable<T>) => Observable<T[]> {
   // restart Subject used for taking control over all triggers and resetting them on emit
   const restart: Subject<void> = new ReplaySubject<void>(1);

   // function that wraps trigger with restart Observable to take control over that
   const withRestart: <R>(source: Observable<R>) => Observable<R> = <R>(source: Observable<R>) => restart.asObservable().pipe(switchMap(() => source));

   // create Observable from all triggers connected with restart control
   const bufferTrigger = merge(...triggers.map(withRestart));

   // emit first reset to enable Observables initially
   restart.next();
 
   // return the source$ Observable with buffer, and reset applied after buffer is emitted
   return source$ => source$.pipe(
      buffer(bufferTrigger),
      tap(() => restart.next()),
      filter(items => items.length > 0),
   );
}

export function bufferAll<T>(bufferTimeSpan: number, bufferSize: number, trigger$: Observable<any>): (source$: Observable<T>) => Observable<T[]> {
   return (source$) => {
      const size$ = source$.pipe(bufferCount(bufferSize));
      const timeSpan$ = source$.pipe(bufferTime(bufferTimeSpan * 1000));

      return source$.pipe(bufferFrom(size$, timeSpan$, trigger$));
   };
}
Run Code Online (Sandbox Code Playgroud)

我遇到的问题是,在 Angular 应用程序中,在 root 中提供的服务中使用此类运算符会导致应用程序不稳定,从而阻止应用程序的渲染。

以下是我尝试在角度区域之外运行代码的方法,但这在我的情况下不起作用。但是,如果我使用简单的bufferTime实现而不是我的实现,则它可以与下面的代码配合使用。

 this.zone.runOutsideAngular(() => {
    const routeChanges$ = this.router.events.pipe(filter(event => event instanceof NavigationEnd));

    this.dataSubject.asObservable().pipe(
      // Send after 60 seconds or when 10 items in buffer or when route changes
      bufferAll(60, 10, routeChanges$),
    ).subscribe((data) => {
      this.zone.run(() => console.log(data));
    });
});
Run Code Online (Sandbox Code Playgroud)

我不知道为什么我的操作符会阻止 Angular,但又bufferTime不会,因为它们都根据时间/间隔创建无限的 Observables,根据 Angular 文档https://angular.io/api/core/ApplicationRef#isstable -examples-and-caveats 可能是阻止渲染和应用程序稳定状态的问题。

知道我的实施中可能存在什么问题吗?我错过了什么?

添加了简短的stackblitz 复制品

data.service.ts 包括自定义运算符代码以及bufferTimebufferCount运算符的正常用法。当使用bufferTimebufferCount运营商应用程序稳定时,如果注释掉并切换到我的运营商将变得永久不稳定。