Observable.Delay或Observable.Buffer重用相同的线程

fsl*_*fsl 2 c# system.reactive

是否有某些版本的Observable.Delay或Observable.Buffer不为其计时器使用新线程?也许精度较低..

我有一个场景,我需要在一个observable上调用Observable.Delay,每秒产生几千条消息,这会产生很多线程.

谢谢.

Jam*_*rld 5

如果要限制线程数,则只需要考虑使用的调度程序,这与创建的定时器数量无关.基于时间的操作调度具有适当时间的操作,并且调度程序决定如何在正确的时间调度事件 - Rx明智地知道实际创建了多少定时器,并且此机制取决于所使用的调度程序.

基于时间的运算符(Scheduler.Default)使用的大多数平台上的默认调度程序将使用任务池来获取调度将来安排的事件的线程,这就是您通常会看到用于调度事件的不同线程的原因.

控制线程的一种方法是使用a EventLoopScheduler并确保在使用基于时间的操作时指定它.这将在同一个线程上发送它的所有事件.例如:

var scheduler = new EventLoopScheduler();

Observable.Return(1)
          .Delay(TimeSpan.FromSeconds(4), scheduler)
          .Subscribe(x => Console.WriteLine(x.ToString() + ": "
              + Thread.CurrentThread.ManagedThreadId));
Observable.Return(2).Delay(TimeSpan.FromSeconds(2), scheduler)
          .Subscribe(x => Console.WriteLine(x.ToString() + ": "
              + Thread.CurrentThread.ManagedThreadId));

Console.WriteLine("Done.");
Run Code Online (Sandbox Code Playgroud)

会输出类似的东西(ThreadId当然会有所不同):

Done.
2: 3
1: 3
Run Code Online (Sandbox Code Playgroud)

而如果您将第一行更改为var scheduler = Scheduler.Default;您将看到不同的线程ID.

Rx中的计时器主题相当复杂,对于这种格式可能有点过于宽泛 - 这篇优秀的帖子涵盖了很多内部细节.请参阅"公平的方式"一节"这都是关于时间的."