Reactive Extensions超时不会停止序列?

Tim*_*Tim 5 c# system.reactive

我试图让一个IObservable<bool>返回true如果一个UDP消息在最后5秒,如果发生超时被接收,一个返回false.

到目前为止我有这个:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
    var udp = BaseComms.UDPBaseStringListener(localEP)
        .Where(msg => msg.Data.Contains("running"))
        .Select(s => true);

    return Observable
        .Timeout(udp, TimeSpan.FromSeconds(5))
        .Catch(Observable.Return(false));
}
Run Code Online (Sandbox Code Playgroud)

这个问题是: -

  • 一旦返回false,序列就会停止
  • 我只是真的需要truefalse改变状态.

我可以使用a Subject<T>但是UDPBaseStringListener当没有更多订阅者时我需要小心处理observable.

更新

每次我收到UDP消息,我都希望它返回一个true.如果我在最近5秒内没有收到UDP消息,我希望它返回一个false.

Séb*_*ion 5

正如Bj0所指出的,解决方案BufferWithTime不会在接收到数据点后立即返回数据点,并且在接收到数据点后不会重置缓冲区超时。

使用 Rx Extensions 2.0,您可以通过接受超时和大小的新缓冲区重载来解决这两个问题:

static IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
    return BaseComms
        .UDPBaseStringListener(localEP)
        .Where(msg => msg.Data.Contains("running"))
        .Buffer(TimeSpan.FromSeconds(5), 1)
        .Select(s => s.Count > 0)
        .DistinctUntilChanged();
}
Run Code Online (Sandbox Code Playgroud)