Meh*_*ban 4 c# multithreading producer-consumer processing-efficiency
我正在寻找实现一个生产者多个消费者多线程应用程序的最佳方案.目前我正在使用一个队列来共享缓冲区,但它比一个生产者一个消费者的情况要慢得多.我打算这样做:
Queue<item>[] buffs = new Queue<item>[N];
object[] _locks = new object[N];
static void Produce()
{
int curIndex = 0;
while(true)
{
// Produce item;
lock(_locks[curIndex])
{
buffs[curIndex].Enqueue(curItem);
Monitor.Pulse(_locks[curIndex]);
}
curIndex = (curIndex+1)%N;
}
}
static void Consume(int myIndex)
{
item curItem;
while(true)
{
lock(_locks[myIndex])
{
while(buffs[myIndex].Count == 0)
Monitor.Wait(_locks[myIndex]);
curItem = buffs[myIndex].Dequeue();
}
// Consume item;
}
}
static void main()
{
int N = 100;
Thread[] consumers = new Thread[N];
for(int i = 0; i < N; i++)
{
consumers[i] = new Thread(Consume);
consumers[i].Start(i);
}
Thread producer = new Thread(Produce);
producer.Start();
}
Run Code Online (Sandbox Code Playgroud)
BlockingCollection<item> _buffer = new BlockingCollection<item>();
static void Produce()
{
while(true)
{
// Produce item;
_buffer.Add(curItem);
}
// eventually stop producing
_buffer.CompleteAdding();
}
static void Consume(int myIndex)
{
foreach (var curItem in _buffer.GetConsumingEnumerable())
{
// Consume item;
}
}
static void main()
{
int N = 100;
Thread[] consumers = new Thread[N];
for(int i = 0; i < N; i++)
{
consumers[i] = new Thread(Consume);
consumers[i].Start(i);
}
Thread producer = new Thread(Produce);
producer.Start();
}
Run Code Online (Sandbox Code Playgroud)
如果您不想从start开始指定线程数,则可以使用Parallel.ForEach.
static void Consume(item curItem)
{
// consume item
}
void Main()
{
Thread producer = new Thread(Produce);
producer.Start();
Parallel.ForEach(_buffer.GetConsumingPartitioner(), Consumer)
}
Run Code Online (Sandbox Code Playgroud)