什么是使用Rx以单个并发执行限制在c#中运行周期性任务的好方法?

sma*_*007 4 c# system.reactive

我希望运行周期性任务,其限制是在任何给定时间最多只执行一次方法.

我正在尝试使用Rx,但我不确定一旦并发限制,最多如何强加.

var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
timer.Subscribe(tick => DoSomething());
Run Code Online (Sandbox Code Playgroud)

此外,如果任务仍在运行,我希望后续计划过去.即我不希望任务排队并导致问题.

我有2个这样的任务定期执行.正在执行的任务当前是同步的.但是,如果有必要,我可以让它们异步.

pau*_*els 5

你在正确的轨道上,你可以使用Select+Concat来展平 observable 并限制飞行中请求的数量(注意:如果你的任务花费的时间超过间隔时间,那么它们将开始堆积,因为它们不能快速执行足够的):

var source = Observable.Interval(TimeSpan.FromMilliseconds(100))
          //I assume you are doing async work since you want to limit concurrency
          .Select(_ => Observable.FromAsync(() => DoSomethingAsync()))
          //This is equivalent to calling Merge(1)
          .Concat();

source.Subscribe(/*Handle the result of each operation*/);
Run Code Online (Sandbox Code Playgroud)


Eni*_*ity 5

您应该按原样测试您的代码,因为这正是Rx已经强加的.

试试这个作为测试:

void Main()
{
    var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
    using (timer.Do(x => Console.WriteLine("!")).Subscribe(tick => DoSomething()))
    {
        Console.ReadLine();
    }
}

private void DoSomething()
{
    Console.Write("<");
    Console.Write(DateTime.Now.ToString("HH:mm:ss.fff"));
    Thread.Sleep(1000);
    Console.WriteLine(">");
}
Run Code Online (Sandbox Code Playgroud)

当你运行它时,你会得到这种输出:

!
<16:54:57.111>
!
<16:54:58.112>
!
<16:54:59.113>
!
<16:55:00.113>
!
<16:55:01.114>
!
<16:55:02.115>
!
<16:55:03.116>
!
<16:55:04.117>
!
<16:55:05.118>
!
<16:55:06.119
Run Code Online (Sandbox Code Playgroud)

它已经确保没有重叠.