Gru*_*kin 8 c# parallel-processing multithreading producer-consumer task-parallel-library
我正在使用C#TPL并且我遇到了生产者/消费者代码的问题...出于某种原因,TPL不重用线程并不断创建新线程
我做了一个简单的例子来演示这种行为:
class Program
{
static BlockingCollection<int> m_Buffer = new BlockingCollection<int>(1);
static CancellationTokenSource m_Cts = new CancellationTokenSource();
static void Producer()
{
try
{
while (!m_Cts.IsCancellationRequested)
{
Console.WriteLine("Enqueuing job");
m_Buffer.Add(0);
Thread.Sleep(1000);
}
}
finally
{
m_Buffer.CompleteAdding();
}
}
static void Consumer()
{
Parallel.ForEach(m_Buffer.GetConsumingEnumerable(), Run);
}
static void Run(int i)
{
Console.WriteLine
("Job Processed\tThread: {0}\tProcess Thread Count: {1}",
Thread.CurrentThread.ManagedThreadId,
Process.GetCurrentProcess().Threads.Count);
}
static void Main(string[] args)
{
Task producer = new Task(Producer);
Task consumer = new Task(Consumer);
producer.Start();
consumer.Start();
Console.ReadKey();
m_Cts.Cancel();
Task.WaitAll(producer, consumer);
}
}
Run Code Online (Sandbox Code Playgroud)
此代码创建2个任务,生产者和消费者.Produces每秒添加1个工作项,而Consumer只打印出包含信息的字符串.我认为在这种情况下1个消费者线程就足够了,因为任务的处理速度比添加到队列的速度要快得多,但实际发生的是,进程中每隔一个线程数增加1 ...就好像TPL正在为每个项目创建新线程
在试图了解发生了什么后,我还注意到了另一件事:即使BlockingCollection大小为1,一段时间后消费者开始以突发方式进行调用,例如,这就是它的启动方式:
入队作业
作业处理线程:4处理线程数:9
入队作业
作业处理线程:6处理线程数:9
入队作业
作业处理线程:5处理线程数:10
入队作业
作业处理线程:4处理线程数:10
入队作业
作业处理线程:6处理线程数:11
这就是它不到一分钟后处理物品的方式:
入队作业
作业处理线程:25处理线程数:52
入队作业
入队作业
作业处理线程:5处理线程数:54
作业处理线程:5处理线程数:54
并且因为线程在完成Parallel.ForEach循环之后被释放(我在这个例子中没有显示它,但它在真实的项目中)我假设它与ForEach具体有关...我发现这个artice http: //reedcopsey.com/2010/01/26/parallelism-in-net-part-5-partitioning-of-work/,我认为我的问题是由这个默认分区程序引起的,所以我从TPL示例中选择了自定义分区程序这是逐个提供消费者线程项目,虽然它修复了执行顺序(摆脱了延迟)......
入队作业
作业处理线程:71处理线程数:140
入队作业
作业处理线程:12处理线程数:141
入队作业
作业处理线程:72处理线程数:142
入队作业
作业处理线程:38处理线程数:143
入队作业
作业处理线程:73处理线程数:143
入队作业
作业处理线程:21处理线程数:144
入队作业
作业处理线程:74处理线程数:145
......它并没有阻止线程的增长
我知道ParallelOptions.MaxDegreeOfParallelism,但我仍然想了解TPL发生了什么,以及为什么它无缘无故地创建了数百个线程
在我的项目中,我需要运行几个小时并从数据库中读取新数据的代码,将其放入BlockingCollections并由其他代码处理数据,每5秒钟就有1个新项目,需要几毫秒到几乎一个分钟处理它,并在运行大约10分钟后,线程数超过1000个线程
有两件事共同导致这种行为:
ThreadPool
尝试为您的情况使用最佳线程数.但是如果池中的某个线程阻塞,则池会将此视为该线程没有执行任何有用的工作,因此很快就会创建另一个线程.这意味着如果你有很多阻塞,ThreadPool
那么猜测最佳线程数是非常糟糕的,它往往会创建新线程,直到达到极限.
Parallel.ForEach()
ThreadPool
除非您明确设置最大线程数,否则信任猜测正确的线程数.Parallel.ForEach()
也主要用于有界集合,而不是数据流.
当你将这两件事结合起来时GetConsumingEnumerable()
,你得到的是Parallel.ForEach()
创造几乎总是被阻挡的线程.在ThreadPool
看到这一点,并且,要尽量保持CPU的使用,造成越来越多的线程.
这里正确的解决方案是设置MaxDegreeOfParallelism
.如果您的计算受CPU限制,则最有可能是最佳值Environment.ProcessorCount
.如果它们是IO绑定的,则必须通过实验找出最佳值.
如果可以使用.Net 4.5,另一个选择是使用TPL Dataflow.这个库专门用于处理数据流,就像你一样,所以它没有你的代码所带来的问题.它实际上甚至比那更好,并且当它当前没有处理任何东西时根本不使用任何线程.
注意:还有一个很好的理由为每个新项目创建一个新线程,但解释这将需要我解释如何Parallel.ForEach()
更详细地工作,我觉得这里没有必要.