RxJs - concatMap 替代方案,可以删除介于两者之间的所有内容

Bar*_*urg 5 rxjs

我试图找到一个行为类似于 的运算符concatMap,但丢弃介于两者之间的所有内容。例如, concatMap 执行以下操作:

  • 下一个
  • 开始处理
  • 下一个 b
  • 下一个 c
  • 完成处理一个
  • 开始处理 b
  • 完成处理 b
  • 开始处理 c
  • 完成处理 c

相反,我正在寻找一种会 drop 的机制b,因为c已经进来了:

  • 下一个
  • 开始处理
  • 下一个 b
  • 下一个 c
  • 完成处理一个
  • (跳过 b)
  • 开始处理 c
  • 完成处理 c

有关更扩展的示例,请参阅此要点:https : //gist.github.com/Burgov/afeada0d8aad58a9592aef6f3fc98543

dmc*_*dle 5

我认为您正在寻找的运营商是throttle

这是一个正在运行的Stackblitz。完成这项工作的关键是设置传递到的配置对象,该对象允许它在运行throttle()期间发出(和处理)前导和尾随源排放,但忽略其间的任何排放。processData()

这是 Stackblitz 的关键函数:

// Use 'from' to emit the above array one object at a time
const source$ = from(sourceData).pipe(

  // Simulate a delay of 'delay * delayInterval' between emissions
  concatMap(data => of(data).pipe(delay(data.delay * delayInterval))),

  // Now tap in order to show the emissions on the console.
  tap(data => console.log('next ', data.emit)),

  // Finally, throttle as long as 'processData' is handling the emission
  throttle(data => processData(data), { leading: true, trailing: true }),

).subscribe()
Run Code Online (Sandbox Code Playgroud)

简短而甜蜜,除了一个问题之外,按要求工作......

更新:

上面代码的“一个问题”是,当源可观察对象完成时,throttle()取消订阅 processData,从而有效地停止需要完成的任何最终处理。正如 Bart van den Burg 在下面的评论中指出的那样,修复方法是使用主题。我认为有很多方法可以做到这一点,但 Stackblitz 已使用以下代码进行了更新,现在可以使用:

// Set up a Subject to be the source of data so we can manually complete it
const source$ = new Subject();

// the data observable is set up just to emit as per the gist.
const dataSubscribe = from(sourceData).pipe(
    // Simulate a delay of 'delay * delayInterval' before the emission
    concatMap(data => of(data).pipe(delay(data.delay * delayInterval))),
).subscribe(data => {
    console.log('next ', data.emit); // log the emission to console
    source$.next(data); // Send this emission into the source
});

// Finally, subscribe to the source$ so we can process the data
const sourceSubscribe = source$.pipe(
    // throttle as long as 'processData' is handling the emission
    throttle(data => processData(data), { leading: true, trailing: true })
).subscribe(); // will need to manually unsubscribe later ...
Run Code Online (Sandbox Code Playgroud)