ConcurrentQueue .Net:多线程消费者

Kal*_*lol 4 .net c# multithreading concurrent-queue

我有一个非常基本的问题,更多的是关于概念ConcurrentQueue.队列是FIFO.当多个线程开始访问它时,我们如何保证FIFO?假设,我已经加入Apple,Oranges,Lemon,PeachApricot-的顺序.第一个TryTake应该回来Apple.但是当多个线程开始提供自己的TryTake请求时会发生什么?当一个线程Lemon甚至可以在另一个线程返回之前返回时,是不是有可能Apple?我假设其他项目也将被返回,直到队列为空.但这些回报是否会围绕FIFO的基本原则进行管理?

Sco*_*nen 7

ConcurrentQueue它本身的行为总是FIFO.

当我们谈论线程"返回"项目时ConcurrentQueue,我们谈论的是一个涉及将项目出列执行某种操作的操作,该操作使您能够观察已经出列的内容.无论是打印输出还是将该项添加到另一个列表,在检查之前,您实际上并不知道哪个项已从队列中取出.

虽然队列本身是FIFO,但您无法预测其他事件(例如检查出列项目)将发生的顺序.这些项目将出列FIFO,但您可能会或可能无法按顺序观察队列中出现的内容.不同的线程可能不会以与从队列中删除项目完全相同的顺序执行该检查或输出.

换句话说,它将发生FIFO但它可能或可能不总是看起来像它.ConcurrentQueue如果处理项目的确切顺序至关重要,您不希望同时读取.

如果你要对此进行测试(我即将写一些东西),那么你可能会在大多数情况下找到以精确的FIFO顺序处理的项目,但之后每隔一段时间他们就不会.


这是一个控制台应用程序.它会

  • ConcurrentQueue单线程中插入1-5000中的数字.
  • 执行并发操作以使每个项目出列并将它们移动到另一个项目ConcurrentQueue.这是"多线程消费者".
  • 读取第二个队列中的项目(再次单线程)并报告任何不按顺序的数字.

很多时候我运行它并没有任何不按顺序.但是大约有50%的时间只报告了几个数字.因此,如果您指望按原始序列处理所有数字,那么大多数情况下几乎所有数字都会发生.但那不会.如果你不关心确切的顺序,那就没关系了,但是如果你这样做的话,那就是错误和不可预测的.

结论 - 不依赖于多线程操作的确切顺序.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;

namespace ConcurrentQueueExperiment
{
    class Program
    {
        static void Main(string[] args)
        {
            var inputQueue = new ConcurrentQueue<int>();
            var outputQueue = new ConcurrentQueue<int>();
            Enumerable.Range(1,5000).ToList().ForEach(inputQueue.Enqueue);
            while (inputQueue.Any())
            {
                Task.Factory.StartNew(() =>
                {
                    int dequeued;
                    if (inputQueue.TryDequeue(out dequeued))
                    {
                        outputQueue.Enqueue(dequeued);
                    }
                });
            }
            int output = 0;
            var previous = 0;
            while (outputQueue.TryDequeue(out output))
            {
                if(output!=previous+1)
                    Console.WriteLine("Out of sequence: {0}, {1}", previous, output);
                previous = output;
            }
            Console.WriteLine("Done!");
            Console.ReadLine();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)