Parallel.ForEach是否限制活动线程的数量?

Jad*_*ias 101 .net c# parallel-processing c#-4.0

鉴于此代码:

var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});
Run Code Online (Sandbox Code Playgroud)

所有1000个线程几乎会同时产生吗?

Jon*_*eet 142

不,它不会启动1000个线程 - 是的,它将限制使用的线程数.Parallel Extensions使用适当数量的内核,具体取决于您拥有的内核数量以及已经忙碌的内核数量.它为每个核心分配工作,然后使用一种称为工作窃取的技术,让每个线程有效地处理自己的队列,并且只在需要时才需要进行任何昂贵的跨线程访问.

看一看在PFX团队博客负载有关如何分配工作的信息和各种其他主题.

请注意,在某些情况下,您也可以指定所需的并行度.

  • 我正在使用Parallel.ForEach(FilePathArray,path => ...今晚读取大约24,000个文件,为我读入的每个文件创建一个新文件.非常简单的代码.看起来甚至6个线程足以压倒7200 RPM磁盘我正在以100%的利用率阅读.在几个小时的时间里,我看到Parallel库分离了超过8,000个线程.我使用MaxDegreeOfParallelism进行了测试,确定8000+线程已经消失了.我现在已经多次测试它了结果. (2认同)

Mic*_*per 26

在一个单核心机器上... Parallel.ForEach分区(块)它正在多个线程之间进行处理,但是这个数字是根据一个算法来计算的,该算法考虑到并且似乎在不断地监视由它分配给ForEach的线程.因此,如果ForEach的主体部分调用长时间运行的IO绑定/阻塞函数,这将使线程等待,算法将产生更多线程并在它们之间重新分配集合.例如,如果线程快速完成并且不在IO线程上阻塞,例如简单地计算一些数字,则算法将线程数增加(或实际上减少)到算法认为对吞吐量最佳的点(平均完成)每次迭代的时间).

基本上所有各种并行库函数背后的线程池将计算出最佳的线程数量.物理处理器核心的数量只是等式的一部分.核心数量与产生的线程数量之间没有简单的一对一关系.

我没有找到有关取消和处理同步线程的文档非常有用.希望MS可以在MSDN中提供更好的示例.

不要忘记,必须编写正文代码以在多个线程上运行,以及所有常见的线程安全考虑因素,框架不会抽象那个因素......

  • 没错,对于IO,我调试自己时可能会分配+100个线程 (2认同)

Col*_*kay 5

它根据处理器/核心的数量计算出最佳线程数.他们不会立刻产卵.


Kev*_*son 5

请参阅并行.每次迭代使用一个任务吗?想要使用"心理模型".然而,作者确实声明"在一天结束时,重要的是要记住实施细节可能随时发生变化."


Tim*_*lez 5

很好的问题。在您的示例中,即使在四核处理器上,并行化级别也非常低,但是经过一些等待,并行化级别可能会变得非常高。

// Max concurrency: 5
[Test]
public void Memory_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);
        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
Run Code Online (Sandbox Code Playgroud)

现在看看当添加等待操作来模拟 HTTP 请求时会发生什么。

// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
Run Code Online (Sandbox Code Playgroud)

我还没有做任何改变,并发/并行化的水平已经戏剧性地跃升了。并发的限制可以随着 增加ParallelOptions.MaxDegreeOfParallelism

// Max concurrency: 43
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

// Max concurrency: 391
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(100000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
Run Code Online (Sandbox Code Playgroud)

我建议设置ParallelOptions.MaxDegreeOfParallelism。它不一定会增加正在使用的线程数,但它会确保您只启动合理数量的线程,这似乎是您所关心的。

最后回答您的问题,不,您不会立即启动所有线程。如果您希望完美地并行调用,例如测试竞争条件,请使用 Parallel.Invoke。

// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
    ConcurrentBag<string> monitor = new ConcurrentBag<string>();
    ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(DateTime.UtcNow.Ticks.ToString());
        monitor.TryTake(out string result);
        monitorOut.Add(result);
    });

    var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
    Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
}
Run Code Online (Sandbox Code Playgroud)