了解Parallel.Invoke,创建和重用线程

Kja*_*ara 5 c# multithreading

我试图了解如何Parallel.Invoke创建和重用线程。我运行了以下示例代码(来自MSDN,https://msdn.microsoft.com/zh-cn/library/dd642243(v = vs.110.aspx):

using System;
using System.Threading;
using System.Threading.Tasks;

class ThreadLocalDemo
{
        static void Main()
        {
            // Thread-Local variable that yields a name for a thread
            ThreadLocal<string> ThreadName = new ThreadLocal<string>(() =>
            {
                return "Thread" + Thread.CurrentThread.ManagedThreadId;
            });

            // Action that prints out ThreadName for the current thread
            Action action = () =>
            {
                // If ThreadName.IsValueCreated is true, it means that we are not the
                // first action to run on this thread.
                bool repeat = ThreadName.IsValueCreated;

                Console.WriteLine("ThreadName = {0} {1}", ThreadName.Value, repeat ? "(repeat)" : "");
            };

            // Launch eight of them. On 4 cores or less, you should see some repeat ThreadNames
            Parallel.Invoke(action, action, action, action, action, action, action, action);

            // Dispose when you are done
            ThreadName.Dispose();
        }
}
Run Code Online (Sandbox Code Playgroud)

据我了解,Parallel.Invoke尝试在此处创建8个线程-每个动作一个。因此,它将创建第一个线程,运行第一个线程,并由此为该线程action提供a ThreadName。然后,它创建下一个线程(得到不同的ThreadName),依此类推。

如果无法创建新线程,它将重用之前创建的线程之一。在这种情况下,值repeatwill将true在控制台输出中看到。

这是正确的吗?

倒数第二个注释(“启动其中的八个。在4个或更少的内核上,您应该会看到一些重复的ThreadNames”)表示由创建的Invoke线程与处理器的可用cpu线程相对应:在4个内核上,我们有8个cpu线程,至少有一个忙(正在运行操作系统和东西),因此Invoke只能使用7个不同的线程,因此我们必须至少获得一个"repeat"

我对此评论的解释正确吗?

我在装有Intel®Core™i7-2860QM处理器(即4核,8 cpu线程)的PC上运行了此代码。我期望至少得到一个"repeat",但是没有。当我将改为Invoke10个而不是8个动作时,得到以下输出:

ThreadName = Thread6
ThreadName = Thread8
ThreadName = Thread6 (repeat)
ThreadName = Thread5
ThreadName = Thread3
ThreadName = Thread1
ThreadName = Thread10
ThreadName = Thread7
ThreadName = Thread4
ThreadName = Thread9
Run Code Online (Sandbox Code Playgroud)

因此,控制台应用程序中至少有9个不同的线程。这与我的处理器只有8个线程的事实相矛盾。

因此,我想我的某些推理是错误的。是否Parallel.Invoke工作方式不同比上述我?如果是,怎么办?

Evk*_*Evk 3

如果您将少于 10 个项目传递给Parallel.Invoke,并且您没有MaxDegreeOfParallelism在选项中指定(所以 - 您的情况),它只会使用以下代码在线程池调度器上并行运行它们:

var actions = new [] { action, action, action, action, action, action, action, action };
var tasks = new Task[actions.Length];
for (int index = 1; index < tasks.Length; ++index)
    tasks[index] = Task.Factory.StartNew(actions[index]);
tasks[0] = new Task(actions[0]);
tasks[0].RunSynchronously();
Task.WaitAll(tasks);
Run Code Online (Sandbox Code Playgroud)

所以只是一个普通的Task.Factory.StartNew。如果您查看线程池中的最大线程数

int th, io;
ThreadPool.GetMaxThreads(out th, out io);
Console.WriteLine(th);
Run Code Online (Sandbox Code Playgroud)

您会看到一些大数字,例如 32767。因此,将执行的线程数Parallel.Invoke(在您的情况下)根本不限于 cpu 核心数。即使在 1 核 cpu 上,它也可能并行运行 8 个线程。

然后您可能会想,为什么有些线程会被重用?因为当线程池线程上的工作完成时,该线程将返回到池中并准备好接受新工作。您的示例中的操作基本上根本不执行任何操作,并且完成得非常快。因此,有时通过启动的第一个线程Task.Factory.StartNew已经完成了您的操作,并在所有后续线程启动之前返回到池中。这样该线程就被重用了。

顺便说一句,您可以(repeat)在示例中看到在 8 核(16 个逻辑核心)处理器上有 8 个操作,如果您足够努力,甚至可以看到 7 个操作。

更新以回答您的评论。线程池调度程序不必立即创建新线程。线程池中有最小和最大线程数。如何查看最大值我已经在上面显示了。要查看最小数量:

int th, io;
ThreadPool.GetMinThreads(out th, out io);
Run Code Online (Sandbox Code Playgroud)

该数字通常等于核心数量(例如 8)。现在,当您请求在线程池线程上执行新操作,并且线程池中的线程数小于最小值时,将立即创建新线程。但是,如果可用线程数大于最小值 - 在创建新线程之前将引入一定的延迟(不幸的是,我不记得具体多长时间,大约 500 毫秒)。

您在评论中添加的语句我非常怀疑能否在 2-3 秒内执行。对我来说它最多执行 0.3 秒。因此,当线程池创建前 8 个线程时,在创建第 9 个线程之前会有 500 毫秒的延迟。在此延迟期间,前 8 个线程中的部分(或全部)已完成其工作并可用于新工作,因此无需创建新线程并且可以重用它们。

为了验证这一点,引入更大的延迟:

static void Main()
{
    // Thread-Local variable that yields a name for a thread
    ThreadLocal<string> ThreadName = new ThreadLocal<string>(() =>
    {
        return "Thread" + Thread.CurrentThread.ManagedThreadId;
    });

    // Action that prints out ThreadName for the current thread
    Action action = () =>
    {
        // If ThreadName.IsValueCreated is true, it means that we are not the
        // first action to run on this thread.
        bool repeat = ThreadName.IsValueCreated;            
        Console.WriteLine("ThreadName = {0} {1}", ThreadName.Value, repeat ? "(repeat)" : "");
        Thread.Sleep(1000000);
    };
    int th, io;
    ThreadPool.GetMinThreads(out th, out io);
    Console.WriteLine("cpu:" + Environment.ProcessorCount);
    Console.WriteLine(th);        
    Parallel.Invoke(Enumerable.Repeat(action, 100).ToArray());        

    // Dispose when you are done
    ThreadName.Dispose();
    Console.ReadKey();
}
Run Code Online (Sandbox Code Playgroud)

您将看到,现在线程池每次都必须创建新线程(比核心数量多得多),因为它无法在繁忙时重用以前的线程。

您还可以增加线程池中的最小线程数,如下所示:

int th, io;
ThreadPool.GetMinThreads(out th, out io);
ThreadPool.SetMinThreads(100, io);
Run Code Online (Sandbox Code Playgroud)

这将消除延迟(直到创建 100 个线程),在上面的示例中您会注意到这一点。