如何使用 RX 实现超时序列?

Zor*_*goZ 4 c# system.reactive .net-4.6

场景如下:如果设备在短时间内回调到服务器,则认为设备已连接。我想创建一个类来封装跟踪此状态的功能。调用设备时,应重置超时。回调时,连接被确认,状态应设置为true,如果回调超时,则应设置为false。但是下次调用应该可以重新设置超时时间,不管当前状态如何。

我想通过 RX 使用swith和来实现这一点timeout。但我不知道为什么它停止工作。

public class ConnectionStatus
{
private Subject<bool> pending = new Subject<bool>();
private Subject<bool> connected = new Subject<bool>();

public bool IsConnected { get; private set; }

public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
    pending.Select(outer => connected.Timeout(TimeSpan.FromSeconds(timeoutSeconds))) 
        .Switch()
        .Subscribe(_ => IsConnected = true, e => IsConnected = false, token);
}

public void ConfirmConnected()
{
    connected.OnNext(true);
}

public void SetPending()
{
    pending.OnNext(true);
}
}
Run Code Online (Sandbox Code Playgroud)

这是“测试用例”:

var c = new ConnectionStatus(default(CancellationToken));

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();   
c.IsConnected.Dump(); // TRUE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();
c.IsConnected.Dump(); // TRUE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(20));
c.IsConnected.Dump(); // FALSE, OK
c.ConfirmConnected(); 
c.IsConnected.Dump(); // FALSE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(10));
c.ConfirmConnected(); 
c.IsConnected.Dump(); // FALSE, NOT OK!
Run Code Online (Sandbox Code Playgroud)

我假设内部 observable 的超时也会停止外部 observable。由于outer =>不再调用 lambda。什么是正确的方法?

谢谢

Eni*_*ity 5

这是在IsConnected不使用的情况下生成值流的另一种方法.TimeOut

public class ConnectionStatus
{
    private Subject<Unit> pending = new Subject<Unit>();
    private Subject<Unit> connected = new Subject<Unit>();

    public bool IsConnected { get; private set; }

    public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
    {
        pending
            .Select(outer =>
                Observable.Amb(
                    connected.Select(_ => true),
                    Observable.Timer(TimeSpan.FromSeconds(timeoutSeconds)).Select(_ => false)))
            .Switch()
            .Subscribe(isConnected => IsConnected = isConnected, token);
    }

    public void ConfirmConnected()
    {
        connected.OnNext(Unit.Default);
    }

    public void SetPending()
    {
        pending.OnNext(Unit.Default);
    }
}
Run Code Online (Sandbox Code Playgroud)

Observable.Amb操作人员只需花费从任何可观察到的第一个产生值的值-这是最好例外编码。