Jak*_*rew 5 c# parallel-processing multithreading concurrentdictionary blockingcollection
我有一个BlockingCollection(ConcurrentBag,50000),我试图为生产者线程使用一个非常小的有限容量50,000,以便最大化我可以在我的消费者线程的ConcurrentDictionary中处理的记录数.生产者比消费者快得多,并且会消耗大部分内存.
不幸的是,我立即注意到我的ConcurrentDictionary中的记录总数现在大大低于在我的测试数据执行时添加50,000的有界容量后的记录.我读到BlockingCollection的.add方法应该无限期地阻塞,直到集合中有空间来执行add.但是,情况似乎并非如此.
问题:
如果在BlockingCollection中的容量释放之前调用了太多的add,那么BlockingCollection的.add方法最终会超时或者无声地失败吗?
如果#1的答案是肯定的,那么在超过限制容量而不丢失数据后我可以尝试多少次添加?
如果调用了许多正在等待/阻塞容量的BlockingCollection .add()方法并且调用了CompleteAdding()方法,那些等待/阻塞添加会继续等待然后最终添加还是静默失败?
Jak*_*rew 11
如果您正在使用BlockingCollection和ConcurrentDictionary,请确保您没有在代码中隐藏的BlockingCollection.TryAdd(myobject)方法,并将其误认为是ConcurrentDictionary.TryAdd()方法.如果已超出BlockingCollection的边界容量,则BlockingCollection.TryAdd(myobject)将返回false并丢弃生成"静默失败"的添加请求.
关于绩效的最后说明
看来(在我自己的情况下,无论如何)在BlockingCollection上使用Bounding Capacity和.Add()与在同一进程中不使用Bounding Capacity和.TryAdd()相比非常慢.
通过实施自己的边界容量策略,我获得了更好的性能结果.有很多方法可以做到这一点.三个选项包括与Monitor.PulseAll()一起使用的Thread.Sleep(),Thread.Spinwait()或Monitor.Wait().当使用其中一个策略时,也可以使用BlockingCollection.TryAdd()而不是BlockingCollection.Add()并且没有限制容量而不会丢失任何数据或内存不足.这种方法似乎也能产生更好的性能.
您可以从三个示例中进行选择,这三个示例基于哪种方案最适合Producer和Consumer线程中的速度差异.
Thread.Wait()示例:
//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{ //If the bounded capacity has been exceeded
//place the thread in wait mode
Thread.Sleep(SleepTime);
}
Run Code Online (Sandbox Code Playgroud)
Thread.SpinWait()示例:
//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{ //If the capacity has been exceeded
//place the thread in wait mode
Thread.SpinWait(SpinCount);
}
Run Code Online (Sandbox Code Playgroud)
Monitor.Wait()示例
此示例在Producer和Consumer方面都需要一个钩子.
制片人代码
//Check to see BlockingCollection capacity has been exceeded.
if (Tokens.Count > 50000)
{
lock (syncLock)
{ //Double check before waiting
if (Tokens.Count > 50000)
{
Monitor.Wait(syncLock, 1000);
}
}
}
Run Code Online (Sandbox Code Playgroud)
消费者代码
//Check to see BlockingCollection capacity is back a normal range.
if (Tokens.Count <= 40000)
{
lock (syncLock)
{ //Double check before waiting
if (Tokens.Count < 40000)
{
Monitor.PulseAll(syncLock);
}
}
}
Run Code Online (Sandbox Code Playgroud)