Cri*_*scu 2 .net parallel-processing system.reactive
我想使用Rx扩展来处理长文件绑定操作的并行化.
工作流程是这样的:
我的问题是:我应该使用什么Rx调度程序(或调度程序的组合)?
为此,了解每个Rx可观察订阅是连续工作非常有用.也就是说,对于单个observable的单个订阅,您可以确保onNext一个项目的委托onNext在以下项目开始之前完成.
默认情况下,onNext委托在当前线程(调用的线程OnNext())上执行,但您可以通过使用来更改它ObserveOn().
这对您来说意味着您应该为每个物理驱动器创建一个单独的observable,并在一个单独的线程上观察每个物理驱动器.一种方法是,如果要执行单个可观察的操作,则使用GroupBy().
使用哪种特定的调度程序?我认为这几乎没关系.ObserveOn()似乎使用ScheduleLongRunning()它,如果它可用,对于最常见的调度程序意味着它将创建一个新的观察线程.
把所有这些放在一起,你的代码看起来像:
operations.GroupBy(op => op.Drive)
.Select(o => o.ObserveOn(TaskPoolScheduler.Default))
.Do(o => o.Subscribe(op => op.Execute()))
.Subscribe();
Run Code Online (Sandbox Code Playgroud)
(假设operations您的操作类型是可观察的,它具有Drive属性和Execute()方法.)