使用任务并行库时线程计数增长

Gru*_*kin 8 c# parallel-processing multithreading producer-consumer task-parallel-library

我正在使用C#TPL并且我遇到了生产者/消费者代码的问题...出于某种原因,TPL不重用线程并不断创建新线程

我做了一个简单的例子来演示这种行为:

class Program
{
    static BlockingCollection<int> m_Buffer = new BlockingCollection<int>(1);
    static CancellationTokenSource m_Cts = new CancellationTokenSource();

    static void Producer()
    {
        try
        {
            while (!m_Cts.IsCancellationRequested)
            {
                Console.WriteLine("Enqueuing job");
                m_Buffer.Add(0);
                Thread.Sleep(1000);
            }
        }
        finally
        {
            m_Buffer.CompleteAdding();
        }
    }

    static void Consumer()
    {
        Parallel.ForEach(m_Buffer.GetConsumingEnumerable(), Run);
    }

    static void Run(int i)
    {
        Console.WriteLine
            ("Job Processed\tThread: {0}\tProcess Thread Count: {1}",
              Thread.CurrentThread.ManagedThreadId, 
              Process.GetCurrentProcess().Threads.Count);
    }

    static void Main(string[] args)
    {
        Task producer = new Task(Producer);
        Task consumer = new Task(Consumer);
        producer.Start();
        consumer.Start();

        Console.ReadKey();
        m_Cts.Cancel();

        Task.WaitAll(producer, consumer);
    }
}
Run Code Online (Sandbox Code Playgroud)

此代码创建2个任务,生产者和消费者.Produces每秒添加1个工作项,而Consumer只打印出包含信息的字符串.我认为在这种情况下1个消费者线程就足够了,因为任务的处理速度比添加到队列的速度要快得多,但实际发生的是,进程中每隔一个线程数增加1 ...就好像TPL正在为每个项目创建新线程

在试图了解发生了什么后,我还注意到了另一件事:即使BlockingCollection大小为1,一段时间后消费者开始以突发方式进行调用,例如,这就是它的启动方式:

入队作业

作业处理线程:4处理线程数:9

入队作业

作业处理线程:6处理线程数:9

入队作业

作业处理线程:5处理线程数:10

入队作业

作业处理线程:4处理线程数:10

入队作业

作业处理线程:6处理线程数:11

这就是它不到一分钟后处理物品的方式:

入队作业

作业处理线程:25处理线程数:52

入队作业

入队作业

作业处理线程:5处理线程数:54

作业处理线程:5处理线程数:54

并且因为线程在完成Parallel.ForEach循环之后被释放(我在这个例子中没有显示它,但它在真实的项目中)我假设它与ForEach具体有关...我发现这个artice http: //reedcopsey.com/2010/01/26/parallelism-in-net-part-5-partitioning-of-work/,我认为我的问题是由这个默认分区程序引起的,所以我从TPL示例中选择了自定义分区程序这是逐个提供消费者线程项目,虽然它修复了执行顺序(摆脱了延迟)......

入队作业

作业处理线程:71处理线程数:140

入队作业

作业处理线程:12处理线程数:141

入队作业

作业处理线程:72处理线程数:142

入队作业

作业处理线程:38处理线程数:143

入队作业

作业处理线程:73处理线程数:143

入队作业

作业处理线程:21处理线程数:144

入队作业

作业处理线程:74处理线程数:145

......它并没有阻止线程的增长

我知道ParallelOptions.MaxDegreeOfParallelism,但我仍然想了解TPL发生了什么,以及为什么它无缘无故地创建了数百个线程

在我的项目中,我需要运行几个小时并从数据库中读取新数据的代码,将其放入BlockingCollections并由其他代码处理数据,每5秒钟就有1个新项目,需要几毫秒到几乎一个分钟处理它,并在运行大约10分钟后,线程数超过1000个线程

svi*_*ick 6

有两件事共同导致这种行为:

  1. ThreadPool尝试为您的情况使用最佳线程数.但是如果池中的某个线程阻塞,则池会将此视为该线程没有执行任何有用的工作,因此很快就会创建另一个线程.这意味着如果你有很多阻塞,ThreadPool那么猜测最佳线程数是非常糟糕的,它往往会创建新线程,直到达到极限.

  2. Parallel.ForEach()ThreadPool除非您明确设置最大线程数,否则信任猜测正确的线程数.Parallel.ForEach()也主要用于有界集合,而不是数据流.

当你将这两件事结合起来时GetConsumingEnumerable(),你得到的是Parallel.ForEach()创造几乎总是被阻挡的线程.在ThreadPool看到这一点,并且,要尽量保持CPU的使用,造成越来越多的线程.

这里正确的解决方案是设置MaxDegreeOfParallelism.如果您的计算受CPU限制,则最有可能是最佳值Environment.ProcessorCount.如果它们是IO绑定的,则必须通过实验找出最佳值.

如果可以使用.Net 4.5,另一个选择是使用TPL Dataflow.这个库专门用于处理数据流,就像你一样,所以它没有你的代码所带来的问题.它实际上甚至比那更好,并且当它当前没有处理任何东西时根本不使用任何线程.

注意:还有一个很好的理由为每个新项目创建一个新线程,但解释这将需要我解释如何Parallel.ForEach()更详细地工作,我觉得这里没有必要.