我正在研究一个用例,要求如果一个observable在一定时间内没有发出值,那么我们应该做一些副作用.
给出一个实际的用例:
这需要在每个发射值上启动计时器,并且在初始订阅observable时启动计时器,然后在指定的时间之后运行某个函数或者直到发出计时器重置的值.我正在努力做到这一点的Rx方式.任何帮助,将不胜感激 :)
你可以这样做race:
timer(5000).race(someSource$)
.subscribe(notifyUser);
Run Code Online (Sandbox Code Playgroud)
如果someSource$通知速度超过timer(5000)(5 秒),则someSource$“获胜”并继续存在。
如果您只想要 一个值someSource$,您显然可以使用 atake(1)或first()on someSource$,这将解决该问题。
我希望这有帮助。
debounceTime是您正在寻找的运算符:如果在特定超时内没有其他符号,则它只会发出一个值.通过侦听debounced流的第一条消息,您可以超时并清理websocket连接.如果你需要从流的开头开始超时,你可以简单地startWith.具体来说:
messages$.startWith(null)
.debounceTime(timeout)
.take(1)
.subscribe(() => { /* side effects */ });
Run Code Online (Sandbox Code Playgroud)
编辑:如果不是你正在寻找结束的消息流完全当它超时(例如,你在的onComplete处理程序清理),只是硬塞debounceTime到takeUntil:
messages$.takeUntil(
messages$.startWith(null)
.debounceTime(timeout)
).subscribe(timeout_observer);
Run Code Online (Sandbox Code Playgroud)
用timeout_observable: Observer<TMessage>它包含你的清理onComplete.
| 归档时间: |
|
| 查看次数: |
933 次 |
| 最近记录: |