阻止旧数据的BlockingCollection

Hug*_*une 12 c# collections multithreading producer-consumer thread-safety

我有一个BlockingCollection.生产者任务向其添加项目,消费者任务删除项目.

现在我想限制集合中的项目数,如果添加更多项目,则自动丢弃旧数据.该集合永远不应包含多于N最近添加的项目.

因此,如果生产者添加新项目的速度快于消费者删除它们,我希望消费者只处理最新的项目.

我可以BlockingCollection在其构造函数中限制a的大小,但当然这只是意味着它在添加更多项时会阻塞,而不是它会删除旧项.

(我不希望生产者端阻塞,只有消费者方在从空集合中检索项目时才会阻塞.)

我目前的解决方案是黑客攻击,只适用于1的大小限制:(
而且我不确定它是否可靠.)

// My consumer task:
foreach (var item in blockingCollection.GetConsumingEnumerable())
{
    var lastItem = item;
    var lastItemTmp = item;
    while (blockingCollection.TryTake(out lastItemTmp))
           lastItem = lastItemTmp;
    // Now lastItem contains the most recent item in the collection, 
    // and older items have been discarded.
    // Proceed consuming lastItem ...
}
Run Code Online (Sandbox Code Playgroud)

有更清洁的解决方案吗?

Jim*_*hel 8

这样做:

void AddItemToQueue(MyClass item)
{
    while (!queue.TryAdd(item))
    {
        MyClass trash;
        queue.TryTake(out trash);
    }
}
Run Code Online (Sandbox Code Playgroud)

如果在尝试添加项目时队列已满,则会从队列中删除项目.它的使用TryTake是因为它有可能(不太可能,但可能)某个其他线程可能已经从队列中删除了最后一个项目,然后才有机会获取一个.

当然,这假定您在构造时指定了项目数量的限制BlockingCollection.

另一种方法是,尽管它涉及的更多,但是要创建自己的循环队列类,并让它实现IProducerConsumerCollection接口.然后,您可以使用该类的实例作为您的支持集合BlockingCollection.实现循环队列并不是特别困难,尽管边缘情况很难实现.而且你必须使它成为一个并发的数据结构,尽管用锁很容易.

如果您不希望队列经常溢出,或者队列的流量非常低(即每秒没有被击中数千次),那么我的初步建议将做您想做的事情并且不存在性能问题.如果存在性能问题,则循环队列就是解决方案.

  • 循环队列解决方案不适用于BlockingCollection。BlockingCollection会独立跟踪内部集合中包含多少个项目,并且如果内部集合的计数与BlockingCollection认为应该存在的计数不同,则会发生异常。我曾经解决过这个问题,为什么要让内部队列将过时的条目替换为null,然后让使用者过滤这些条目。它可以工作,但是会将抽象泄漏给用户。 (2认同)