在Rx中安排定期事件的正确方法是什么?

Dan*_*try 3 c# system.reactive

简单的问题,我希望:我正在编写一个应用程序,我想从数据库中检索数据; 为此,我选择使用Rx将数据库表示为一系列值.

我只想轮询数据库(因此发生我的观察者的通知)最多每5秒一次.现在,我有这样的事情,调度程序正在安排一个周期性任务,导致我的观察者订阅我的数据库的observable:

_scheduler.SchedulePeriodic(_repository, TimeSpan.FromSeconds(5),
    (repo) => repo.AsObservable()
        .Where(item => _SomeFilter(item))
        .Subscribe(item => _SomeProcessFunction(item))
);
Run Code Online (Sandbox Code Playgroud)

为简洁起见省略了功能名称等; repo.AsObservable()只是一个函数,它返回IObservable<T>存储库中所有项目的一部分.

现在,我认为这是正确的做事方式,但是在我提出这个解决方案之前,我确实提出了一个不同的解决方案,其中我Observable.Timer订阅的观察者会订阅AsObservable()每个计时器滴答的返回值.

我的问题是,这看起来很奇怪 - 为什么我多次订阅这个可观察的?

对不起,如果这个问题令人困惑,它在编写时会让我感到困惑,但调度程序对我来说也很困惑:P

cwh*_*ris 6

如果您使用内置运算符而不是手动调度任务,该怎么办?

repo.AsObservable()
    .Where(_SomeFilter)
    // Wait 5 seconds before completing
    .Concat(Observable.Empty<T>().Delay(TimeSpan.FromSeconds(5))
    // Resubscribe indefinitely after source completes
    .Repeat()
    // Subscribe
    .Subscribe(_SomeProcessFunction);
Run Code Online (Sandbox Code Playgroud)

  • 我会注意到(对于任何遇到此问题的人),出于单元测试目的传入调度程序仍然很重要。编写的这段代码至少需要 5 秒来运行单元测试。如果您要针对此代码或类似代码运行十几个测试,那将是一个问题。调度程序(或更准确地说是 TestScheduler)允许您“伪造”时间的流逝以非常快速地测试通常会长时间运行的进程。 (3认同)