我正在开发一个自定义 rxjs 运算符来扩展bufferTime运算符功能。BufferTime 需要时间跨度和缓冲区的最大大小(可选),但在我的情况下,我不仅需要根据时间和大小,还需要根据附加触发器(routeChange)重置缓冲区。
我创建了一个自定义运算符来处理这个问题,它需要 timeSpan、maxSize 和 trigger$,为 timeSpan 缓冲区和 size 缓冲区创建可观察量,最后将这 3 个缓冲区合并在一起并作为运算符的单个通知程序传递buffer。
这是源代码:
function bufferFrom<T>(...triggers: Observable<any>[]): (source$: Observable<T>) => Observable<T[]> {
// restart Subject used for taking control over all triggers and resetting them on emit
const restart: Subject<void> = new ReplaySubject<void>(1);
// function that wraps trigger with restart Observable to take control over that
const withRestart: <R>(source: Observable<R>) => Observable<R> = <R>(source: Observable<R>) => restart.asObservable().pipe(switchMap(() => source));
// create Observable from all triggers connected with restart control
const bufferTrigger = merge(...triggers.map(withRestart));
// emit first reset to enable Observables initially
restart.next();
// return the source$ Observable with buffer, and reset applied after buffer is emitted
return source$ => source$.pipe(
buffer(bufferTrigger),
tap(() => restart.next()),
filter(items => items.length > 0),
);
}
export function bufferAll<T>(bufferTimeSpan: number, bufferSize: number, trigger$: Observable<any>): (source$: Observable<T>) => Observable<T[]> {
return (source$) => {
const size$ = source$.pipe(bufferCount(bufferSize));
const timeSpan$ = source$.pipe(bufferTime(bufferTimeSpan * 1000));
return source$.pipe(bufferFrom(size$, timeSpan$, trigger$));
};
}
Run Code Online (Sandbox Code Playgroud)
我遇到的问题是,在 Angular 应用程序中,在 root 中提供的服务中使用此类运算符会导致应用程序不稳定,从而阻止应用程序的渲染。
以下是我尝试在角度区域之外运行代码的方法,但这在我的情况下不起作用。但是,如果我使用简单的bufferTime实现而不是我的实现,则它可以与下面的代码配合使用。
this.zone.runOutsideAngular(() => {
const routeChanges$ = this.router.events.pipe(filter(event => event instanceof NavigationEnd));
this.dataSubject.asObservable().pipe(
// Send after 60 seconds or when 10 items in buffer or when route changes
bufferAll(60, 10, routeChanges$),
).subscribe((data) => {
this.zone.run(() => console.log(data));
});
});
Run Code Online (Sandbox Code Playgroud)
我不知道为什么我的操作符会阻止 Angular,但又bufferTime不会,因为它们都根据时间/间隔创建无限的 Observables,根据 Angular 文档https://angular.io/api/core/ApplicationRef#isstable -examples-and-caveats
可能是阻止渲染和应用程序稳定状态的问题。
知道我的实施中可能存在什么问题吗?我错过了什么?
添加了简短的stackblitz 复制品
data.service.ts 包括自定义运算符代码以及bufferTime或bufferCount运算符的正常用法。当使用bufferTime或bufferCount运营商应用程序稳定时,如果注释掉并切换到我的运营商将变得永久不稳定。
| 归档时间: |
|
| 查看次数: |
271 次 |
| 最近记录: |