无效扩展固定当呼叫长于间隔长度时,异步调用之间的间隔

KOT*_*TIX 6 c# unit-testing system.reactive

这是我的Interval定义:

m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
                .ObserveOn(m_schedulerProvider.EventLoop)
                .Select(l => Observable.FromAsync(DoWork))
                .Concat()
                .Subscribe();
Run Code Online (Sandbox Code Playgroud)

在上面的代码中,我提供ISchedulerin IntervalObserveOnfrom,SchedulerProvider以便我可以更快地进行单元测试(TestScheduler.AdvanceBy).另外,DoWork是一种async方法.

在我的特定情况下,我希望DoWork每5秒调用一次该函数.这里的问题是我希望5秒是DoWork另一个结束和开始之间的时间.因此,如果DoWork执行时间超过5秒,假设为10秒,则第一次呼叫为5秒,第二次呼叫为15秒.

不幸的是,以下测试证明它不像那样:

[Fact]
public void MultiPluginStatusHelperShouldWaitForNextQuery()
{    
    m_queryHelperMock
        .Setup(x => x.CustomQueryAsync())
        .Callback(() => Thread.Sleep(10000))
        .Returns(Task.FromResult(new QueryCompletedEventData()))
        .Verifiable()
    ;

    var multiPluginStatusHelper = m_container.GetInstance<IMultiPluginStatusHelper>();
    multiPluginStatusHelper.MillisecondsInterval = 5000;
    m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);
    m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);

    m_queryHelperMock.Verify(x => x.CustomQueryAsync(), Times.Once);
}
Run Code Online (Sandbox Code Playgroud)

DoWork调用CustomQueryAsync和测试失败说的就是被称为两次.它应该只被调用一次,因为强制延迟.Callback(() => Thread.Sleep(1000)).

我在这做错了什么?

我的实际实现来自这个例子.

Bra*_*don 8

通常在轮询一些不可观察的数据源时会出现这个问题.当我遇到它时,我使用了RepeatAfterDelay一段时间后写的运算符:

public static IObservable<T> RepeatAfterDelay<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler)
{
    var repeatSignal = Observable
        .Empty<T>()
        .Delay(delay, scheduler);

    // when source finishes, wait for the specified
    // delay, then repeat.
    return source.Concat(repeatSignal).Repeat();
}
Run Code Online (Sandbox Code Playgroud)

这就是我使用它的方式:

// do first set of work immediately, and then every 5 seconds do it again
m_interval = Observable
    .FromAsync(DoWork)
    .RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler)
    .Subscribe();

// wait 5 seconds, then do first set of work, then again every 5 seconds
m_interval = Observable
    .Timer(TimeSpan.FromSeconds(5), scheduler)
    .SelectMany(_ => Observable
        .FromAsync(DoWork)
        .RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler))
    .Subscribe();
Run Code Online (Sandbox Code Playgroud)