根据rxjs中的时间处理事件流

Gre*_*Ros 7 javascript asynchronous node.js rxjs

我有一个进程,每隔一段时间发送一次数据包,我需要根据数据包到达的时间等来管理该数据流.在某些时候,我也关闭流和过程.

现在,我正在使用一组计时器来做这件事,但我希望我能做到这一点,rxjs因为它似乎非常适合这种事情.到目前为止,我没有取得多大成功.

问题

该流应该定期向我发送数据包,但它通常偏离很多,有时会卡住.

我希望在以下条件下关闭流:

  1. 如果需要的不仅仅是startDelay向我发送第一个数据包.
  2. 发送第一个数据包后,如果暂停时间超过middleDelay两个数据包.
  3. 经过一段时间maxChannelTime.

由于上述任何原因我即将关闭流时,我首先要求它礼貌地关闭以便它可以进行一些清理.有时它还会在清理过程中向我发送最终数据包.但是我想等待cleanupTime清理和最后一个数据到达之前我关闭流并忽略更多消息.

我将通过使用Observable包装事件来创建"流".我这样做没有问题.

通过"关闭"流,我的意思是告诉进程停止发送数据,并可能关闭(即死亡).

Ric*_*sen 2

棘手的问题。

我将其分为两个阶段:“监管”(因为我们要定期检查)和“清理”。

向后工作,输出是

const regulated = source.takeUntil(close)
const cleanup = source.skipUntil(close).takeUntil(cleanupCloser)
const output = regulated.merge(cleanup)
Run Code Online (Sandbox Code Playgroud)

“Closers”是在关闭时间时发出的可观察量(每个超时值一个更近的值)。

const startTimeout = 600
const intervalTimeout = 200
const maxtimeTimeout = 3000
const cleanupTimeout = 300

const startCloser = Observable.timer(startTimeout)  // emit once after initial delay
  .takeUntil(source)                                // cancel after source emits
  .mapTo('startTimeoutMarker')

const intervalCloser = source.switchMap(x =>    // reset interval after each source emit
    Observable.timer(intervalTimeout)           // emit once after intervalTimeout
      .mapTo('intervalTimeoutMarker')
  )

const maxtimeCloser = Observable.timer(maxtimeTimeout)  // emit once after maxtime
  .takeUntil(startCloser)                               // cancel if startTimeout
  .takeUntil(intervalCloser)                            // cancel if intervalTimeout
  .mapTo('maxtimeTimeoutMarker')

const close = Observable.merge(startCloser, intervalCloser, maxtimeCloser).take(1)

const cleanupCloser = close.switchMap(x =>      // start when close emits
     Observable.timer(cleanupTimeout)           // emit once after cleanup time
  ) 
  .mapTo('cleanupTimeoutMarker')
Run Code Online (Sandbox Code Playgroud)

这是一个工作示例CodePen (请一次运行一个测试)