Pav*_*vol 4 c# parallel-processing task parallel.foreach partitioner
我正在尝试使用 C# 应用程序尽可能快地处理数字。我使用 aThread.Sleep()来模拟处理和随机数。我使用 3 种不同的技术。
这是我使用的测试代码:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Test
{
internal class Program
{
private static void Main()
{
var data = new int[500000];
var random = new Random();
for (int i = 0; i < 500000; i++)
{
data[i] = random.Next();
}
var partialTimes = new Dictionary<int, double>();
var iterations = 5;
for (int i = 1; i < iterations + 1; i++)
{
Console.Write($"ProcessData3 {i}\t");
StartProcessing(data, partialTimes, ProcessData3);
GC.Collect();
}
Console.WriteLine();
Console.WriteLine("Press Enter to Exit");
Console.ReadLine();
}
private static void StartProcessing(int[] data, Dictionary<int, double> partialTimes, Action<int[], Dictionary<int, double>> processData)
{
var stopwatch = Stopwatch.StartNew();
try
{
processData?.Invoke(data, partialTimes);
stopwatch.Stop();
Console.WriteLine($"{stopwatch.Elapsed.ToString(@"mm\:ss\:fffffff")} total = {partialTimes.Sum(s => s.Value)} max = {partialTimes.Values.Max()}");
}
finally
{
partialTimes.Clear();
}
}
private static void ProcessData1(int[] data, Dictionary<int, double> partialTimes)
{
Parallel.ForEach(data, number =>
{
var partialStopwatch = Stopwatch.StartNew();
Thread.Sleep(1);
partialStopwatch.Stop();
lock (partialTimes)
{
partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
}
});
}
private static void ProcessData3(int[] data, Dictionary<int, double> partialTimes)
{
// Partition the entire source array.
var rangePartitioner = Partitioner.Create(0, data.Length);
// Loop over the partitions in parallel.
Parallel.ForEach(rangePartitioner, (range, loopState) =>
{
// Loop over each range element without a delegate invocation.
for (int i = range.Item1; i < range.Item2; i++)
{
var number = data[i];
var partialStopwatch = Stopwatch.StartNew();
Thread.Sleep(1);
partialStopwatch.Stop();
lock (partialTimes)
{
partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
}
}
});
}
private static void ProcessData2(int[] data, Dictionary<int, double> partialTimes)
{
var tasks = new Task[data.Count()];
for (int i = 0; i < data.Count(); i++)
{
var number = data[i];
tasks[i] = Task.Factory.StartNew(() =>
{
var partialStopwatch = Stopwatch.StartNew();
Thread.Sleep(1);
partialStopwatch.Stop();
lock (partialTimes)
{
partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
}
});
}
Task.WaitAll(tasks);
}
}
}
Run Code Online (Sandbox Code Playgroud)
对于每种技术,我都会重新启动程序。我得到了这些结果,
有Thread.Sleep( 1 ):
ProcessData1 1 00:56:1796688 total = 801335,282599955 max = 16,8783
ProcessData1 2 00:23:5390014 total = 816167,642100022 max = 14,5913
ProcessData1 3 00:14:7090566 total = 827589,675899998 max = 13,2617
ProcessData1 4 00:10:8929177 total = 829296,528300007 max = 15,0175
ProcessData1 5 00:10:6333310 total = 839282,123200008 max = 29,2738
ProcessData2 1 00:37:8084153 total = 824507,174200022 max = 112,071
ProcessData2 2 00:16:3762096 total = 849272,47810001 max = 77,1514
ProcessData2 3 00:12:9177717 total = 854012,353100029 max = 67,5684
ProcessData2 4 00:10:4798701 total = 857396,642899983 max = 92,9408
ProcessData2 5 00:09:2206146 total = 870966,655499989 max = 51,8945
ProcessData3 1 01:13:6814541 total = 803581,718699918 max = 25,6815
ProcessData3 2 01:07:9809277 total = 814069,532899922 max = 26,0671
ProcessData3 3 01:07:9857984 total = 814148,329399928 max = 21,3116
ProcessData3 4 01:07:4812183 total = 808042,695499966 max = 16,8601
ProcessData3 5 01:07:2954614 total = 805895,325499903 max = 23,8517
Run Code Online (Sandbox Code Playgroud)
其中是每个函数总共
total花费的时间,是每个函数的最大时间。Parallel.ForEach()
max
为什么第一个循环这么慢?其他尝试怎么可能处理得这么快?如何在第一次尝试时实现更快的并行处理?
所以我也尝试了一下,Thread.Sleep( 10 )
结果是:
ProcessData1 1 02:50:2845698 total = 5109831,95429994 max = 12,0612
ProcessData1 2 00:56:3361645 total = 5125884,05919954 max = 12,7666
ProcessData1 3 00:53:4911541 total = 5131105,15209993 max = 12,7486
ProcessData1 4 00:49:5665628 total = 5144654,75829992 max = 13,2678
ProcessData1 5 00:46:0218194 total = 5152955,19509996 max = 13,702
ProcessData2 1 01:21:7207557 total = 5121889,31579983 max = 73,8152
ProcessData2 2 00:39:6660074 total = 5175557,68889969 max = 59,369
ProcessData2 3 00:31:9036416 total = 5193819,89889973 max = 56,2895
ProcessData2 4 00:27:4616803 total = 5207168,56969977 max = 65,5495
ProcessData2 5 00:24:4270755 total = 5222567,9044998 max = 65,368
ProcessData3 1 02:44:9985645 total = 5110117,19019997 max = 11,7172
ProcessData3 2 02:25:6533128 total = 5237779,27010012 max = 26,3171
ProcessData3 3 02:22:2771259 total = 5116123,45259975 max = 12,0581
ProcessData3 4 02:22:1678911 total = 5112574,93779995 max = 11,5334
ProcessData3 5 02:21:9418178 total = 5104980,07120004 max = 11,5583
Run Code Online (Sandbox Code Playgroud)
所以第一个循环仍然比其他循环花费更多的秒数..
您所看到的行为完全可以用以下事实来解释:该类ThreadPool会延迟创建新线程,直到经过一小段时间(大约 1 秒......多年来一直在变化)。
将仪器添加到程序中可以提供丰富的信息。在您的示例中,一个非常有用的工具是计算线程池管理的并发线程数,确定“高水位线”(即最终确定的最大线程数),然后使用该数字来覆盖线程池的行为。
当我这样做时,我发现在第一次运行第一个方法时,您最多可以获得大约 25 个线程。但由于线程池的默认设置是仅创建等于计算机上核心数量的线程(在我的例子中为八个),因此创建额外的线程可能需要相当长的时间。当然,在此期间,您获得的吞吐量会比其他情况下明显减少(因此您会产生比仅 20 秒左右达到该线程数所导致的延迟更大的延迟)。
在该测试的后续运行中,最大线程数逐渐上升(因为每次新运行都从上一次运行开始,线程池中已经有更多线程)高达 53 左右。
如果您事先知道线程池需要多少线程才能有效地执行工作,则可以使用该SetMinThreads()方法来增加它将根据需要立即创建的线程数,然后再切换到节流线程创建算法。例如,手头有 53 个线程的高水位线,您可以将最小线程数设置为该数字(或一个不错的整数,例如 50)。
当我这样做时,第一次测试的所有五次运行(之前需要 25 秒到 1 分钟)(当然,较长的运行时间会更早),现在需要大约 19 秒才能完成。
我想强调的是,你应该SetMinThreads()非常小心地使用。一般来说,线程池非常适合管理工作负载。您上面呈现的场景显然只是为了示例而不现实,但它确实存在一个问题,即您一Parallel.ForEach()开始并没有在每次迭代中真正做那么多工作。它似乎不太适合并发,因为大部分时间都花在了开销上。在任何类似的场景中使用SetMinThreads()只会掩盖一个更隐蔽的潜在问题。
您会发现,如果您定制工作负载以更好地匹配可用资源,并最大限度地减少任务和线程之间的转换,则无需覆盖默认线程池数量即可获得良好的吞吐量。
关于这个特定测试的一些其他注释......
请注意,如果您将程序更改为在同一会话中运行所有三个测试(每个运行五次),则“第一次运行时间更长”仅发生在第一个测试中。为了将来的参考,您应该始终处理此类“第一次较慢”的问题,着眼于测试不同的组合和顺序,以验证是否是特定的实现受到影响,或者您是否第一次看到效果测试,无论首先运行哪个实现。有许多实现和平台细节,包括 JIT、线程池、磁盘缓存,它们可能会影响任何算法的初始运行,您需要确保快速缩小搜索范围,以了解您是否正在处理其中之一或您自己的算法中存在一些真正的问题。
顺便说一句,这对你的问题并不重要,但我发现你选择使用数组中的随机数data作为计时字典的键很奇怪。恕我直言,由于随机数的冲突,这些计时值变得无用。您不会每次都进行计数(当发生碰撞时,只会存储该数字的最后一个实例),这意味着显示的“总”时间小于实际花费的总时间,甚至最大值也不会必然是正确的(如果真正的最大值被使用相同密钥的后续值覆盖,您将错过它)。
这是我对第一个测试的修改版本,它显示了我添加的诊断代码,以及(注释掉)用于设置线程池计数以产生更快、更一致的行为的语句:
private static int _threadCount1;
private static int _maxThreadCount1;
private static void ProcessData1(int[] data, Dictionary<int, double> partialTimes)
{
const int minOverride = 50;
int minMain, minIOCP, maxMain, maxIOCP;
ThreadPool.GetMinThreads(out minMain, out minIOCP);
ThreadPool.GetMaxThreads(out maxMain, out maxIOCP);
WriteLine($"cores: {Environment.ProcessorCount}");
WriteLine($"threads: {minMain} min, {maxMain} max");
// Uncomment two lines below to see uniform behavior across test runs:
//ThreadPool.SetMinThreads(minOverride, minIOCP);
//ThreadPool.SetMaxThreads(minOverride, maxIOCP);
_threadCount1 = _maxThreadCount1 = 0;
Parallel.ForEach(data, number =>
{
int threadCount = Interlocked.Increment(ref _threadCount1);
var partialStopwatch = Stopwatch.StartNew();
Thread.Sleep(1);
partialStopwatch.Stop();
lock (partialTimes)
{
partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
if (_maxThreadCount1 < threadCount)
{
_maxThreadCount1 = threadCount;
}
}
Interlocked.Decrement(ref _threadCount1);
});
ThreadPool.SetMinThreads(minMain, minIOCP);
ThreadPool.SetMaxThreads(maxMain, maxIOCP);
WriteLine($"max thread count: {_maxThreadCount1}");
}
Run Code Online (Sandbox Code Playgroud)