rxjs - 缓冲流直到函数返回 true

MJ_*_*les 5 javascript rxjs

我有一个数字流,这些数字以我想要子采样的恒定数量增加。给定一个常量样本interval,我想缓冲流,直到第一个和最后一个缓冲值之间的差异大于或等于interval。然后它会发出这个数组,所以类似于缓冲区操作符。

我已经搜索了不同的 rxjs 运算符,但无法弄清楚如何使其工作。一个bufferUntil运营商将是完美的,但似乎并不存在。我该如何实施?

例如:

const interval = 15;
//example stream would be: 5, 10 , 15, 20, 25, 30..

Observable.pipe(
   bufferUntil(bufferedArray => {
       let last = bufferedArray.length - 1;
       return (bufferedArray[last] - bufferedArray[0] >= interval);
   })
).subscribe(x => console.log(x));

//With an expected output of [5, 10, 15, 20], [ 25, 30, 35, 40],..
Run Code Online (Sandbox Code Playgroud)

fri*_*doo 5

您可以实现一个维护自定义缓冲区的运算符。将所有传入值推入缓冲区,当满足给定条件时发出缓冲区并重置它。用于defer为每个订阅者提供自己的缓冲区。

function bufferUntil<T>(emitWhen: (currentBuffer: T[]) => boolean): OperatorFunction<T, T[]> {
  return (source: Observable<T>) => defer(() => {
    let buffer: T[] = []; // custom buffer
    return source.pipe(
      tap(v => buffer.push(v)), // add values to buffer
      switchMap(() => emitWhen(buffer) ? of(buffer) : EMPTY), // emit the buffer when the condition is met
      tap(() => buffer = []) // clear the buffer
    )
  });
}
Run Code Online (Sandbox Code Playgroud)

https://stackblitz.com/edit/rxjs-7awqmv

请注意,一旦完成,上述代码将不会发出任何剩余的项目source。采取以下补救措施:

function bufferUntil<T>(emitWhen: (currentBuffer: T[]) => boolean): OperatorFunction<T, T[]> {
  return (source: Observable<T>) => defer(() => {
    const buffer: T[] = [];
    return source.pipe(
      tap(v => buffer.push(v)), // add values to buffer
      switchMap(() => emitWhen(buffer) ? of([...buffer]) : EMPTY),
      tap(() => buffer.splice(0, buffer.length)),
      endsWith(buffer)
    )
  });
}
Run Code Online (Sandbox Code Playgroud)