如何提高 Parallel.ForEach 的吞吐量

eri*_*ikH 3 c# .net-4.0 parallel.foreach

我尝试通过并行执行来优化代码,但有时只有一个线程承担所有重负载。下面的例子展示了如何在最多 4 个线程中执行 40 个任务,并且前十个任务比其他的更耗时。

Parallel.ForEach似乎将数组分成 4 部分,并让一个线程处理每个部分。所以整个执行过程大约需要 10 秒。它应该能够在最多 3.3 秒内完成!

有没有办法一直使用所有线程,因为在我的实际问题中不知道哪些任务耗时?

var array = System.Linq.Enumerable.Range(0, 40).ToArray();

System.Threading.Tasks.Parallel.ForEach(array, new System.Threading.Tasks.ParallelOptions() { MaxDegreeOfParallelism = 4, },
     i =>
     {
         Console.WriteLine("Running index {0,3} : {1}", i, DateTime.Now.ToString("HH:mm:ss.fff"));
         System.Threading.Thread.Sleep(i < 10 ? 1000 : 10);
     });
Run Code Online (Sandbox Code Playgroud)

Ser*_*rvy 5

Parallel.ForEach可以实现,但您需要使用自定义分区程序(或找到第 3 方分区程序),以便能够根据您的特定项目更明智地对元素进行分区。(或者只是使用小得多的批次。)

这也是假设您事先并不严格知道哪些项目会很快,哪些会很慢;如果你这样做了,你可以在打电话之前自己重新订购物品,ForEach这样昂贵的物品就会更加分散。视情况而定,这可能不够,也可能不够。

一般来说,我更喜欢通过简单地让一个生产者和多个消费者来解决这些问题,每个消费者一次处理一个项目,而不是批量处理。该BlockingCollection班使这些情况相当简单。只需将所有项目添加到集合中,创建 N 个任务/线程/等,每个任务/线程/等,每个任务都会抓取一个项目并对其进行处理,直到没有更多项目为止。它不会为您提供 Parallel.ForEach 为您提供的动态添加/删除线程,但这似乎不是您的问题。


svi*_*ick 5

使用自定义分区器是修改Parallel.ForEach(). 如果您使用 .Net 4.5,则可以使用超载Partitioner.Create()。有了它,您的代码将如下所示:

var partitioner = Partitioner.Create(
    array, EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(
    partitioner, new ParallelOptions { MaxDegreeOfParallelism = 4, }, i => …);
Run Code Online (Sandbox Code Playgroud)

这不是默认设置,因为关闭缓冲会增加Parallel.ForEach(). 但是,如果您的迭代真的那么长(几秒),那么额外的开销应该不会很明显。