Tro*_*erg 4 c# parallel-processing blocking task-parallel-library parallel.foreach
我一直在Parallel.ForEach
对项目集合进行一些耗时的处理。该处理实际上是由外部命令行工具处理的,我无法更改它。然而,似乎Parallel.ForEach
会“卡在”集合中长期运行的项目上。我已经将问题提炼出来,并且可以表明Parallel.ForEach
,事实上,等待这个漫长的过程完成并且不允许任何其他人通过。我编写了一个控制台应用程序来演示该问题:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace testParallel
{
class Program
{
static int inloop = 0;
static int completed = 0;
static void Main(string[] args)
{
// initialize an array integers to hold the wait duration (in milliseconds)
var items = Enumerable.Repeat(10, 1000).ToArray();
// set one of the items to 10 seconds
items[50] = 10000;
// Initialize our line for reporting status
Console.Write(0.ToString("000") + " Threads, " + 0.ToString("000") + " completed");
// Start the loop in a task (to avoid SO answers having to do with the Parallel.ForEach call, itself, not being parallel)
var t = Task.Factory.StartNew(() => Process(items));
// Wait for the operations to compelte
t.Wait();
// Report finished
Console.WriteLine("\nDone!");
}
static void Process(int[] items)
{
// SpinWait (not sleep or yield or anything) for the specified duration
Parallel.ForEach(items, (msToWait) =>
{
// increment the counter for how many threads are in the loop right now
System.Threading.Interlocked.Increment(ref inloop);
// determine at what time we shoule stop spinning
var e = DateTime.Now + new TimeSpan(0, 0, 0, 0, msToWait);
// spin until the target time
while (DateTime.Now < e) /* no body -- just a hard loop */;
// count another completed
System.Threading.Interlocked.Increment(ref completed);
// we're done with this iteration
System.Threading.Interlocked.Decrement(ref inloop);
// report status
Console.Write("\r" + inloop.ToString("000") + " Threads, " + completed.ToString("000") + " completed");
});
}
}
}
Run Code Online (Sandbox Code Playgroud)
基本上,我创建一个 int 数组来存储给定操作所需的毫秒数。我将它们全部设置为 10,除了一个设置为 10000(即 10 秒)。我启动Parallel.ForEach
一个任务并在硬旋转等待中处理每个整数(所以它不应该产生或睡眠或任何东西)。在每次迭代中,我都会报告当前循环体中有多少次迭代,以及我们已经完成了多少次迭代。大多数情况下,一切进展顺利。然而,到最后(时间方面),它报告“001 个线程,987 个已完成”。
我的问题是为什么它不使用其他 7 个核心来处理剩余的 13 个“工作”?这一长时间运行的迭代不应该阻止它处理集合中的其他元素,对吗?
这个例子恰好是一个固定集合,但它可以很容易地设置为一个可枚举的。我们不想仅仅因为一项花费了很长时间就停止获取可枚举中的下一项。
我找到了答案(或者至少是一个答案)。它与块分区有关。这里的答案很适合我。所以基本上,在我的“流程”功能的顶部,如果我对此进行更改:
static void Process(int[] items)
{
Parallel.ForEach(items, (msToWait) => { ... });
}
Run Code Online (Sandbox Code Playgroud)
对此
static void Process(int[] items)
{
var partitioner = Partitioner.Create(items, EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, (msToWait) => { ... });
}
Run Code Online (Sandbox Code Playgroud)
它一次只抓取一项工作。对于每个并行的更典型的情况,其中主体不需要超过一秒,我当然可以看到对工作集进行分块。然而,在我的用例中,每个身体部位可能需要半秒到 5 个小时。我当然不希望一堆 10 秒的综艺元素被一个 5 小时的元素挡住。因此,在这种情况下,“一次一个”的开销是非常值得的。