如何创建一个耗尽但始终在当前完成后触发最后一个事件的联合可观察对象?

jhp*_*jhp 6 observable rxjs ngrx

我有以下理论问题需要解决,请注意,对于我描述的示例问题可能有更通用的解决方案,但我特别想知道如何创建具有这些属性的连接可观察量。

我有一个可观察的更改事件,应该触发保存操作(这会导致可观察到保存成功)。

  1. 我需要确保最后的保存事件肯定会被执行
  2. 保存本身是一个复杂的过程,需要一些时间,并且在保存事件期间不应执行其他保存操作

使用exhaust或exhaustMap几乎可以实现我想要的:它确保在保存过程中不会触发其他事件。虽然concat或 concatMap 会确保最后一个会被执行,但我会做很多不必要的保存操作。

换句话来说:如何创建一个将耗尽并连接最后一个事件的可观察对象?

fri*_*doo 8

您可以使用throttleconfigleading: true, trailing: true来发出第一个事件,然后在发出可观察值之前不发出任何事件,然后在这段时间内接收到最后一个事件。请参阅throttleTime 操作员的配置参数如何工作?(油门配置)

映射到您想要执行的可观察对象(保存操作)。当保存操作完成时,使用Subject和结束限制间隔。finalize

无论您是否使用mergeMapexhaustMapconcatMap来映射到您的内部可观察量都无关紧要,因为throttle运算符仅在您内部可观察量完成时发出下一个事件。

如果您使用此逻辑创建自定义运算符函数,则必须包装代码,defer以便不同的订阅者不会共享相同的主题,而是每个订阅者都会获得自己的新主题。

export function exhaustMapWithTrailing<T, R>(
  project: (value: T, index: number) => ObservableInput<R>
): OperatorFunction<T, R> {
  return (source): Observable<R> => defer(() => {
    const release = new Subject()

    return source.pipe(
      throttle(() => release, { leading: true, trailing: true }),
      exhaustMap((value, index) => from(project(value, index)).pipe(
        finalize(() => release.next())
      ) as Observable<R> )
    )
  })
}
Run Code Online (Sandbox Code Playgroud)
events$.pipe(
  exhaustMapWithTrailing(event => save(event))
)
Run Code Online (Sandbox Code Playgroud)

https://stackblitz.com/edit/rxjs-5k6egc?file=index.ts

此代码改编自此处https://github.com/ReactiveX/rxjs/issues/5004