如果observable在X时间内没有发出值,则会产生副作用

Sam*_*abá 5 rxjs

我正在研究一个用例,要求如果一个observable在一定时间内没有发出值,那么我们应该做一些副作用.

给出一个实际的用例:

  • 打开Web套接字连接
  • 如果在X时间内没有发送/接收消息,则关闭Web套接字连接并通知用户

这需要在每个发射值上启动计时器,并且在初始订阅observable时启动计时器,然后在指定的时间之后运行某个函数或者直到发出计时器重置的值.我正在努力做到这一点的Rx方式.任何帮助,将不胜感激 :)

Ben*_*esh 6

你可以这样做race

timer(5000).race(someSource$)
  .subscribe(notifyUser);
Run Code Online (Sandbox Code Playgroud)

如果someSource$通知速度超过timer(5000)(5 秒),则someSource$“获胜”并继续存在。

如果您只想要 一个值someSource$,您显然可以使用 atake(1)first()on someSource$,这将解决该问题。

我希望这有帮助。


con*_*cat 5

debounceTime是您正在寻找的运算符:如果在特定超时内没有其他符号,则它只会发出一个值.通过侦听debounced流的第一条消息,您可以超时并清理websocket连接.如果你需要从流的开头开始超时,你可以简单地startWith.具体来说:

messages$.startWith(null)
         .debounceTime(timeout)
         .take(1)
         .subscribe(() => { /* side effects */ });
Run Code Online (Sandbox Code Playgroud)

编辑:如果不是你正在寻找结束的消息流完全当它超时(例如,你在的onComplete处理程序清理),只是硬塞debounceTimetakeUntil:

messages$.takeUntil(
  messages$.startWith(null)
           .debounceTime(timeout)
).subscribe(timeout_observer);
Run Code Online (Sandbox Code Playgroud)

timeout_observable: Observer<TMessage>它包含你的清理onComplete.