Gre*_*Ros 7 javascript asynchronous node.js rxjs
我有一个进程,每隔一段时间发送一次数据包,我需要根据数据包到达的时间等来管理该数据流.在某些时候,我也关闭流和过程.
现在,我正在使用一组计时器来做这件事,但我希望我能做到这一点,rxjs因为它似乎非常适合这种事情.到目前为止,我没有取得多大成功.
该流应该定期向我发送数据包,但它通常偏离很多,有时会卡住.
我希望在以下条件下关闭流:
startDelay向我发送第一个数据包.middleDelay两个数据包.maxChannelTime.由于上述任何原因我即将关闭流时,我首先要求它礼貌地关闭以便它可以进行一些清理.有时它还会在清理过程中向我发送最终数据包.但是我想等待cleanupTime清理和最后一个数据到达之前我关闭流并忽略更多消息.
我将通过使用Observable包装事件来创建"流".我这样做没有问题.
通过"关闭"流,我的意思是告诉进程停止发送数据,并可能关闭(即死亡).
棘手的问题。
我将其分为两个阶段:“监管”(因为我们要定期检查)和“清理”。
向后工作,输出是
const regulated = source.takeUntil(close)
const cleanup = source.skipUntil(close).takeUntil(cleanupCloser)
const output = regulated.merge(cleanup)
Run Code Online (Sandbox Code Playgroud)
“Closers”是在关闭时间时发出的可观察量(每个超时值一个更近的值)。
const startTimeout = 600
const intervalTimeout = 200
const maxtimeTimeout = 3000
const cleanupTimeout = 300
const startCloser = Observable.timer(startTimeout) // emit once after initial delay
.takeUntil(source) // cancel after source emits
.mapTo('startTimeoutMarker')
const intervalCloser = source.switchMap(x => // reset interval after each source emit
Observable.timer(intervalTimeout) // emit once after intervalTimeout
.mapTo('intervalTimeoutMarker')
)
const maxtimeCloser = Observable.timer(maxtimeTimeout) // emit once after maxtime
.takeUntil(startCloser) // cancel if startTimeout
.takeUntil(intervalCloser) // cancel if intervalTimeout
.mapTo('maxtimeTimeoutMarker')
const close = Observable.merge(startCloser, intervalCloser, maxtimeCloser).take(1)
const cleanupCloser = close.switchMap(x => // start when close emits
Observable.timer(cleanupTimeout) // emit once after cleanup time
)
.mapTo('cleanupTimeoutMarker')
Run Code Online (Sandbox Code Playgroud)
这是一个工作示例CodePen (请一次运行一个测试)