将BlockingCollection <T>用作单生成器,单用户FIFO查询是否合适?

jav*_*red 7 .net c# multithreading .net-4.0 producer-consumer

我需要单生成器,单用户FIFO查询,因为

  • 我需要按照收到的顺序处理邮件.
  • 我需要这样做异步,因为调用者不应该在我处理消息时等待.
  • 只有在前一个消息处理完成时才应启动下一个消息处理.有时"接收"消息的频率高于"处理"消息的频率.但平均而言,我应该能够处理所有消息,有时候我必须"排队"它们.

所以它非常像我认为的TCP/IP,你有一个生产者和一个消费者,有时你可以比你可以处理的更快地接收消息,所以你必须查询它们.在哪里订单很重要,并且调用者绝对不感兴趣你用这些东西做什么.

这听起来很容易,我可能会使用一般Queue,但我想用BlockingCollection它,因为我不想写任何代码ManualResetEvent等.

如何适合BlockingCollection我的任务,可能你可以推荐别的东西?

sll*_*sll 11

BlockingCollectionclass实现了IProducerConsumerCollection接口,完全符合您的要求.

您可以创建两个任务,一个用于异步生产者,另一个用作消费者工作者.以前会添加项目,BlockingCollection后者只要新的FIFO顺序可用就会消耗.

使用TPL任务BlockingCollection的生产者 - 消费者样本应用程序:

class ProducerConsumer
{
    private static BlockingCollection<string> queue = new BlockingCollection<string>();

    static void Main(string[] args)
    {
        Start();
    }

    public static void Start()
    {
        var producerWorker = Task.Factory.StartNew(() => RunProducer());
        var consumerWorker = Task.Factory.StartNew(() => RunConsumer());

        Task.WaitAll(producerWorker, consumerWorker);
    }

    private static void RunProducer()
    {
        int itemsCount = 100;

        while (itemsCount-- > 0)
        {
            queue.Add(itemsCount + " - " + Guid.NewGuid().ToString());
            Thread.Sleep(250);
        }
    }

    private static void RunConsumer()
    {
        foreach (var item in queue.GetConsumingEnumerable())
        {
           Console.WriteLine(DateTime.Now.ToString("HH:mm:ss.ffff") + " | " + item);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

IProducerConsumerCollection:

定义操作用于生产者/消费者使用的线程安全集合的方法.此接口为生产者/消费者集合提供统一表示,以便更高级别的抽象(如System.Collections.Concurrent.BlockingCollection(Of T))可以将集合用作底层存储机制.

  • 我看到Start()方法等待完成两个任务.显然,`producerWorker`任务将结束,但`consumerWorker`永远不会结束.应该考虑到这一点吗? (2认同)