如何访问阻塞集合的基础默认并发队列

Gul*_*llu 6 c# multithreading .net-4.0 task-parallel-library

我有多个生产者和一个消费者.但是,如果队列中存在尚未消耗的内容,则生产者不应再次对其进行排队.(唯一没有重复的阻塞集合使用默认的并发队列)

if (!myBlockingColl.Contains(item))
    myBlockingColl.Add(item)
Run Code Online (Sandbox Code Playgroud)

但是,阻塞coll没有contains方法,也没有提供任何类型的trypeek()方法.我如何访问底层并发队列,以便我可以做类似的事情

if (!myBlockingColl.myConcurQ.trypeek(item)
  myBlockingColl.Add(item)
Run Code Online (Sandbox Code Playgroud)

尾巴旋转.请帮忙.谢谢

Bri*_*eon 8

这是个有趣的问题.这是我第一次看到有人要求阻止重复的阻塞队列.奇怪的是,我发现BCL中已经存在的东西并不像你想要的那样.我说这是奇怪的,因为BlockingCollection可以接受a IProducerConsumerCollection作为底层集合,其具有TryAdd被广告的方法,当检测到重复时能够失败.问题是我没有看到具体的实现,IProducerConsumerCollection这可以防止重复.至少我们可以写自己的.

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
  // TODO: You will need to fully implement IProducerConsumerCollection.

  private Queue<T> queue = new Queue<T>();

  public bool TryAdd(T item)
  {
    lock (queue)
    {
      if (!queue.Contains(item))
      {
        queue.Enqueue(item);
        return true;
      }
      return false;
    }
  }

  public bool TryTake(out T item)
  {
    lock (queue)
    {
      item = null;
      if (queue.Count > 0)
      {
        item = queue.Dequeue();
      }
      return item != null;
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

既然我们有IProducerConsumerCollection不接受重复的东西,我们可以像这样使用它:

public class Example
{
  private BlockingCollection<object> queue = new BlockingCollection<object>(new NoDuplicatesConcurrentQueue<object>());

  public Example()
  {
    new Thread(Consume).Start();
  }

  public void Produce(object item)
  {
    bool unique = queue.TryAdd(item);
  }

  private void Consume()
  {
    while (true)
    {
      object item = queue.Take();
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

你可能不喜欢我的实现NoDuplicatesConcurrentQueue.ConcurrentQueue如果您认为需要TPL集合提供的低锁性能,您当然可以自由地实现自己的使用或其他任何东西.

更新:

我今天早上能够测试代码.有一些好消息和坏消息.好消息是,这将在技术上有效.坏消息是您可能不想这样做,因为BlockingCollection.TryAdd拦截了底层IProducerConsumerCollection.TryAdd方法的返回值,并在false检测到时抛出异常.是的,这是对的.它false不像您期望的那样返回,而是生成异常.我必须说实话,这既令人惊讶又荒谬.TryXXX方法的重点是它们不应该抛出异常.我非常失望.


Eug*_*sky 5

除了Brian Gideon在Update之后提到的警告之外,他的解决方案还存在以下性能问题:

  • queue.Contains(item)随着队列的增长,队列()上的O(n)操作对性能产生严重影响
  • 锁限制并发(他提到)

以下代码改进了Brian的解决方案

  • 使用哈希集进行O(1)查找
  • 组合System.Collections.Concurrent命名空间中的2个数据结构

NB因为没有ConcurrentHashSet,我正在使用a ConcurrentDictionary,忽略了这些值.

在这种罕见的情况下,幸运的是,可以简单地从多个更简单的并发数据结构中构建更复杂的并发数据结构,而无需添加锁.这两个并发数据结构的操作顺序非常重要.

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
    private readonly ConcurrentDictionary<T, bool> existingElements = new ConcurrentDictionary<T, bool>();
    private readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public bool TryAdd(T item)
    {
        if (existingElements.TryAdd(item, false))
        {
            queue.Enqueue(item);
            return true;
        }
        return false;
    }

    public bool TryTake(out T item)
    {
        if (queue.TryDequeue(out item))
        {
            bool _;
            existingElements.TryRemove(item, out _);
            return true;
        }
        return false;
    }
    ...
}
Run Code Online (Sandbox Code Playgroud)

NB查看此问题的另一种方法:您需要一个保留插入顺序的集合.


Ed *_*tes 1

我建议使用锁来实现您的操作,这样您就不会以破坏项目的方式读写项目,从而使它们成为原子的。例如,对于任何 IEnumerable:

object bcLocker = new object();

// ...

lock (bcLocker)
{
    bool foundTheItem = false;
    foreach (someClass nextItem in myBlockingColl)
    {
        if (nextItem.Equals(item))
        {
            foundTheItem = true;
            break;
        }
    }
    if (foundTheItem == false)
    {
        // Add here
    }
}
Run Code Online (Sandbox Code Playgroud)