线程调度程序,可以在同一个驱动器上批量文件操作到同一个线程?

Cri*_*scu 2 .net parallel-processing system.reactive

我想使用Rx扩展来处理长文件绑定操作的并行化.

工作流程是这样的:

  • 在多个驱动器上搜索给定的文件模式(假设每个驱动器都在一个单独的物理设备上)
  • 对于找到的每个匹配文件,将长文件操作排队到与同一驱动器上的其他文件相同的线程 - 希望最小化随机搜索.
  • 对不同驱动器上的文件的操作应排队到不同的线程以允许并行处理.

我的问题是:我应该使用什么Rx调度程序(或调度程序的组合)?

svi*_*ick 6

为此,了解每个Rx可观察订阅是连续工作非常有用.也就是说,对于单个observable的单个订阅,您可以确保onNext一个项目的委托onNext在以下项目开始之前完成.

默认情况下,onNext委托在当前线程(调用的线程OnNext())上执行,但您可以通过使用来更改它ObserveOn().

这对您来说意味着您应该为每个物理驱动器创建一个单独的observable,并在一个单独的线程上观察每个物理驱动器.一种方法是,如果要执行单个可观察的操作,则使用GroupBy().

使用哪种特定的调度程序?我认为这几乎没关系.ObserveOn()似乎使用ScheduleLongRunning()它,如果它可用,对于最常见的调度程序意味着它将创建一个新的观察线程.

把所有这些放在一起,你的代码看起来像:

operations.GroupBy(op => op.Drive)
          .Select(o => o.ObserveOn(TaskPoolScheduler.Default))
          .Do(o => o.Subscribe(op => op.Execute()))
          .Subscribe();
Run Code Online (Sandbox Code Playgroud)

(假设operations您的操作类型是可观察的,它具有Drive属性和Execute()方法.)