具有固定大小FIFO队列的生产者/消费者模式

bmt*_*033 6 .net c# queue producer-consumer concurrent-collections

我需要在固定大小的FIFO队列周围实现生产者/消费者模式.我认为围绕ConcurrentQueue的包装类可能适用于此,但我不完全确定(我之前从未使用过ConcurrentQueue).这种扭曲是队列只需要保存固定数量的项目(在我的例子中是字符串).我的应用程序将有一个生产者任务/线程和一个消费者任务/线程.当我的消费者任务运行时,它需要在那个时刻出现队列中存在的所有项目并对其进行处理.

对于它的价值,我的消费者处理排队的项目只不过是通过SOAP将它们上传到一个不是100%可靠的网络应用程序.如果无法建立连接或调用SOAP调用失败,我应该丢弃这些项目并返回队列以获取更多信息.由于SOAP的开销,我试图最大化队列中可以在一次SOAP调用中发送的项目数.

有时,我的制作人可能会比我的消费者能够删除和处理它们更快地添加项目.如果队列已满并且我的生产者需要添加另一个项目,我需要将新项目排队,然后将最旧的项目出列,以便队列的大小保持固定.基本上,我需要始终保留队列中生成的最新项目(即使这意味着某些项目不会被消耗,因为我的消费者当前正在处理以前的项目).

关于生产者如果队列中的项目是固定的那样保留数字,我从这个问题中发现了一个潜在的想法:

固定大小队列,在新的enques上自动将旧值出列

我目前在ConcurrentQueue周围使用一个包装类(基于该答案)和Enqueue()方法,如下所示:

public class FixedSizeQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizeQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        // add item to the queue
        queue.Enqueue(obj);

        lock (this) // lock queue so that queue.Count is reliable
        {
            while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
            {
                T objOut;
                queue.TryDequeue(out objOut);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我创建了这个类的实例,对队列有一个大小限制,如下所示:

FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit
Run Code Online (Sandbox Code Playgroud)

我启动了我的生产者任务,它开始填充队列.我的Enqueue()方法中的代码似乎在添加项目时从队列中删除最旧的项目时正常工作导致队列计数超过最大大小.现在我需要我的消费者任务来将项目出列并处理它们,但这就是我的大脑混淆的地方.为我的消费者实现Dequeue方法的最佳方法是什么,它将在某个时刻获取队列的快照并将所有项目出列以进行处理(生产者可能仍然在此过程中将项目添加到队列中)?

Kei*_*thS 7

简单地说,ConcurrentQueue有一个"ToArray"方法,当输入时,它将锁定集合并生成队列中所有当前项的"快照".如果您希望为您的消费者提供一系列工作,您可以锁定入队方法所具有的相同对象,调用ToArray(),然后旋转while(!queue.IsEmpty) queue.TryDequeue(out trash)循环以清除队列,然后返回您提取的数组.

这将是你的GetAll()方法:

public T[] GetAll()
{
    lock (syncObj) // so that we don't clear items we didn't get with ToArray()
    {
        var result = queue.ToArray();
        T trash;
        while(!queue.IsEmpty) queue.TryDequeue(out trash);
    }
}
Run Code Online (Sandbox Code Playgroud)

由于你必须清除队列,你可以简单地组合这两个操作; 创建一个适当大小的数组(使用queue.Count),然后在队列不为空时,在返回之前将一个项目出列并将其放入数组中.

现在,这是具体问题的答案.我现在必须在良心上穿上我的CodeReview.SE帽子并指出一些事情:

  • 永远不要使用lock(this).您永远不会知道其他对象可能将您的对象用作锁定焦点,因此当对象从内部锁定自身时会被阻止.最佳实践是锁定一个私有范围的对象实例,通常是一个被锁定的实例:private readonly object syncObj = new object();

  • 既然你无论如何都要锁定包装器的关键部分,我会使用普通的List<T>而不是并发的集合.访问速度更快,更容易清理,因此您可以比ConcurrentQueue允许的更简单地完成您所做的工作.要排队,请在索引零之前锁定同步对象Insert(),然后使用RemoveRange()从索引大小中删除任何项目到列表的当前计数.要出列,请锁定同一个同步对象,调用myList.ToArray()(来自Linq命名空间;与ConcurrentQueue完全相同),然后在返回数组之前调用myList.Clear().不能简单:

    public class FixedSizeQueue<T>
    {
    private readonly List<T> queue = new List<T>();
    private readonly object syncObj = new object();
    
    public int Size { get; private set; }
    
    public FixedSizeQueue(int size) { Size = size; }
    
    public void Enqueue(T obj)
    {
        lock (syncObj)
        {
            queue.Insert(0,obj)
            if(queue.Count > Size) 
               queue.RemoveRange(Size, Count-Size);
        }
    }
    
    public T[] Dequeue()
    {
        lock (syncObj)
        {
            var result = queue.ToArray();
            queue.Clear();
            return result;
        }
    }
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • 您似乎明白使用此模型将您排队的物品扔掉了.这通常不是一件好事,但我愿意给你怀疑的好处.但是,我会说使用BlockingCollection有一种无损的方法来实现这一点.BlockingCollection包装任何IProducerConsumerCollection,包括大多数System.Collections.Concurrent类,并允许您指定队列的最大容量.然后,该集合将阻止任何尝试从空队列或任何尝试添加到完整队列的线程出队的线程,直到已添加或删除项目,以便有可能获得某些内容或插入空间.这是实现具有最大大小的生产者 - 消费者队列的最佳方式,或者是否需要"轮询"以查看是否有供消费者使用的内容的最佳方式.如果你走这条路,只有消费者扔掉的东西才会被扔掉; 消费者将看到生产者投入的所有行,并对每个行做出自己的决定.


ole*_*sii 5

你不想lockthis.看看为什么锁(这个){...}不好?更多细节.

这段代码

// if queue count > max queue size, then dequeue an item
while (queue.Count > Size) 
{
    T objOut;
    queue.TryDequeue(out objOut);
}
Run Code Online (Sandbox Code Playgroud)

建议您需要以某种方式等待通知消费者该项目的可用性.在这种情况下,请考虑使用BlockingCollection<T>.