使用blockingcollection和tasks .net 4 TPL的经典生产者消费者模式

Gul*_*llu 19 c# multithreading scheduled-tasks pfx task-parallel-library

请参阅下面的伪代码

//Single or multiple Producers produce using below method
    void Produce(object itemToQueue)
    {
        concurrentQueue.enqueue(itemToQueue);
        consumerSignal.set;
    }

    //somewhere else we have started a consumer like this
    //we have only one consumer
    void StartConsumer()
    {
        while (!concurrentQueue.IsEmpty())
        {
            if (concurrentQueue.TrydeQueue(out item))
            {
                //long running processing of item
            }
        }
        consumerSignal.WaitOne();
    }
Run Code Online (Sandbox Code Playgroud)

我如何移植这种模式,我从远古时代就开始使用taskfactory创建的任务和net 4的新信号功能.换句话说,如果有人用net 4编写这个模式,它会是什么样子?伪代码很好.如你所见,我已经使用.net 4 concurrentQueue了.如何使用任务并可能使用一些新的信令机制.谢谢

通过Jon/Dan解决我的问题.甜.没有手动信号或while(true)或while(itemstoProcess)类型循环像过去那样

//Single or multiple Producers produce using below method
 void Produce(object itemToQueue)
 {
     blockingCollection.add(item);
 }

 //somewhere else we have started a consumer like this
 //this supports multiple consumers !
 task(StartConsuming()).Start; 

 void StartConsuming()
 {
     foreach (object item in blockingCollection.GetConsumingEnumerable())
     {
                //long running processing of item
     }
 }

cancellations are handled using cancel tokens
Run Code Online (Sandbox Code Playgroud)

Jon*_*eet 23

你会用的BlockingCollection<T>.文档中有一个例子.

该课程专门设计用于使这一点变得微不足道.

  • @ user666490:Jon已经为生产者消费者问题提供了规范的.Net 4解决方案.来自MSDN:"为实现IProducerConsumerCollection <T>的线程安全集合提供阻塞和绑定功能." (4认同)

Bri*_*eon 11

你的第二块代码看起来更好.但是,开始Task然后立即等待它是毫无意义的.只需调用Take然后处理直接在使用线程上返回的项目.这就是生产者 - 消费者模式的意图.如果您认为工作项目的处理足够密集以保证更多的消费者,那么无论如何都要吸引更多的消费者.BlockingCollection安全的多个生产者多个消费者.

public class YourCode
{
  private BlockingCollection<object> queue = new BlockingCollection<object>();

  public YourCode()
  {
    var thread = new Thread(StartConsuming);
    thread.IsBackground = true;
    thread.Start();
  }

  public void Produce(object item)
  {
    queue.Add(item);
  }

  private void StartConsuming()
  {
    while (true)
    {
      object item = queue.Take();
      // Add your code to process the item here.
      // Do not start another task or thread. 
    }
  }
}
Run Code Online (Sandbox Code Playgroud)