Rx扩展:Parallel.ForEach在哪里?

Ach*_*him 8 c# .net-3.5 system.reactive

我有一段代码正在使用Parallel.ForEach,可能基于旧版本的Rx扩展或任务并行库.我安装了当前版本的Rx扩展但无法找到Parallel.ForEach.我没有使用库中的任何其他花哨的东西,只是想像这样并行处理一些数据:

Parallel.ForEach(records, ProcessRecord);
Run Code Online (Sandbox Code Playgroud)

我发现了这个问题,但我不想依赖旧版本的Rx.但是我无法为Rx找到类似的东西,那么使用当前Rx版本的当前和最直接的方法是什么?该项目使用的是.NET 3.5.

Ana*_*tts 27

如果你有Rx,不需要做所有这些愚蠢的goosery:

records.ToObservable()
    .SelectMany(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))
    .ToList()
    .First();
Run Code Online (Sandbox Code Playgroud)

(或者,如果您希望以效率为代价维护项目的顺序):

records.ToObservable()
    .Select(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))
    .Concat()
    .ToList()
    .First();
Run Code Online (Sandbox Code Playgroud)

或者,如果您想同时限制多少项:

records.ToObservable()
    .Select(x => Observable.Defer(() => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)))
    .Merge(5 /* at a time */)
    .ToList()
    .First();
Run Code Online (Sandbox Code Playgroud)

  • 已更新,包括限制并发 (2认同)