这对于并发集/队列组合看起来是一种合理的方法吗?

Dan*_*Tao 4 .net queue concurrency multithreading set

更新:正如Brian指出的那样,我最初的想法确实存在并发问题.这个ConcurrentDictionary<TKey, TValue>.AddOrUpdate方法的签名有点模糊,可以让一个懒惰的思想家(比如我自己)相信所有东西 - 集合添加以及队列推送 - 会以某种方式同时发生,原子地(即,神奇地) ).

回想起来,有这种期望对我来说是愚蠢的.实际上,无论执行情况如何AddOrUpdate,都应该很清楚,我的原始想法中仍会存在竞争条件,正如Brian所指出的那样:在添加到集合之前推送到队列,因此可能发生以下事件序列:

  1. 项目被推送到队列
  2. 项目从队列中弹出
  3. 项目(未)从集合中删除
  4. 项目已添加到设置中

上述序列将导致集合中的项目不在队列中,从而有效地将该项目列入数据结构的黑名单.

现在,我想了一会儿,我开始认为以下方法可以解决这些问题:

public bool Enqueue(T item)
{
    // This should:
    // 1. return true only when the item is first added to the set
    // 2. subsequently return false as long as the item is in the set;
    //    and it will not be removed until after it's popped
    if (_set.TryAdd(item, true))
    {
        _queue.Enqueue(item);
        return true;
    }

    return false;
}
Run Code Online (Sandbox Code Playgroud)

以这种方式构造它,Enqueue调用只发生一次 - 项目集合之后.因此,队列中的重复项应该不是问题.并且似乎由于队列操作被设置操作"预订" - 即,只有项目被添加到集合之后才推送它,并且它从集合中移除之前弹出它 - 上面列出的有问题的事件序列不应该发生.

人们怎么想?难道这可以解决这个问题吗?(就像布莱恩一样,我倾向于怀疑自己并猜测答案是否定的,我再次错过了一些东西.但是,嘿,如果它很容易就不会是一个有趣的挑战吧?)


我确实在SO上看到了类似的问题,但令人惊讶的是(考虑到.NET这个网站的重量程度如何),它们似乎都是针对Java的.

我本质上需要一个线程安全的set/queue组合类.换句话说,它应该是一个不允许重复的FIFO集合(因此,如果同一项已经在队列中,后续Enqueue调用将返回false,直到该项从队列中弹出).

我意识到我可以通过一个简单的方法很容易地实现这一点,HashSet<T>Queue<T>在所有必要的地方锁定.然而,我感兴趣的是与完成它ConcurrentDictionary<TKey, TValue>ConcurrentQueue<T>类从.NET 4.0(也可为Rx扩展.NET 3.5,这是我使用的是什么样的一部分),我的理解是莫名其妙无锁收藏*.

我的基本计划是实现这样的集合:

class ConcurrentSetQueue<T>
{
    ConcurrentQueue<T> _queue;
    ConcurrentDictionary<T, bool> _set;

    public ConcurrentSetQueue(IEqualityComparer<T> comparer)
    {
        _queue = new ConcurrentQueue<T>();
        _set = new ConcurrentDictionary<T, bool>(comparer);
    }

    public bool Enqueue(T item)
    {
        // This should:
        // 1. if the key is not present, enqueue the item and return true
        // 2. if the key is already present, do nothing and return false
        return _set.AddOrUpdate(item, EnqueueFirst, EnqueueSecond);
    }

    private bool EnqueueFirst(T item)
    {
        _queue.Enqueue(item);
        return true;
    }

    private bool EnqueueSecond(T item, bool dummyFlag)
    {
        return false;
    }

    public bool TryDequeue(out T item)
    {
        if (_queue.TryDequeue(out item))
        {
            // Another thread could come along here, attempt to enqueue, and
            // fail; however, this seems like an acceptable scenario since the
            // item shouldn't really be considered "popped" until it's been
            // removed from both the queue and the dictionary.
            bool flag;
            _set.TryRemove(item, out flag);

            return true;
        }

        return false;
    }
}
Run Code Online (Sandbox Code Playgroud)

我是否正确地想到了这一点?从表面上看,我在上面写的这个基本概念中看不到任何明显的错误.但也许我忽视了一些事情.或者使用ConcurrentQueue<T>带有a的a ConcurrentDictionary<T, bool>实际上并不是明智的选择,原因是我没有想到的原因.或许其他人已经在某个经过实战验证的图书馆中实现了这个想法,我应该只使用它.

任何有关此主题的想法或有用信息将不胜感激!

*这是否严格准确,我不知道; 但是性能测试已经向我表明,它们的表现优于使用许多消费者线程锁定的可比手工收藏品.

Bri*_*eon 5

简称是否定的,问题中提供的代码不是线程安全的.

MSDN文档在AddOrUpdate方法上相当稀疏,所以我看了一下AddOrUpdateReflector 中的方法.这是基本算法(由于法律原因,我不发布Reflector输出,并且很容易自己做).

TValue value;
do
{
  if (!TryGetValue(...))
  {
    value = AddValueFactoryDelegate(key);
    if (!TryAddInternal(...))
    {
      continue;
    }
    return value;
  }
  value = UpdateValueFactoryDelegate(key);
} 
while (!TryUpdate(...))
return value;
Run Code Online (Sandbox Code Playgroud)

很明显,AddValueFactoryDelegate并且UpdateValueFactoryDelegate可以执行多次.这里不需要进一步解释.应该很明显这会如何破坏你的代码.我实际上有点震惊,代表们可以多次执行.该文件没有提到这一点.您会认为这将是一个非常重要的观点,因此呼叫者知道避免传递具有副作用的代表(就像您的情况一样).

但即使代表们只保留执行一次仍然存在问题.通过将Enqueue方法替换为方法的内容,可以很容易地显示问题序列AddOrUpdate.该AddValueFactoryDelegate会执行并插入一项_queue,但线程可以通过上下文切换项目新增至前无谓的干扰_set.然后第二个线程可以调用您的TryDequeue方法并从中提取该项_queue,但_set由于它尚未存在而无法将其删除.

更新:

好吧,我认为不可能让它发挥作用.ConcurrentQueue缺少一项重要的操作.我相信你需要一个等效的CASTryDequeue方法.如果存在这样的操作,那么我认为以下代码是正确的.我使用神秘的TryDequeueCas方法接受一个比较值,该值用作条件,当且仅当队列中的顶部项等于比较值时,才能原子地执行此操作.这个想法与该Interlocked.CompareExchange方法中使用的完全相同.

请注意代码如何使用bool的值ConcurrentDictionary作为一个"虚拟"锁同步队列和字典的协调.数据结构还包含CAS等效操作TryUpdate,该操作用于获取和释放此"虚拟"锁.并且因为锁是"虚拟的"并且实际上不阻止并发访问,所以whileTryDequeue方法中的循环是强制性的.这符合CAS操作的规范模式,因为它们通常在循环中执行,直到它们成功.

该代码还使用.NET 4.0样式的try-finally模式来获取锁定获取语义,以帮助防止由带外(异步)异常引起的问题.

注意:同样,代码使用神话ConcurrentQueue.TryDequeueCas方法.

class ConcurrentSetQueue<T>
{
    ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    ConcurrentDictionary<T, bool> _set = new ConcurrentDictionary<T, bool>();

    public ConcurrentSetQueue()
    {
    }

    public bool Enqueue(T item)
    {
        bool acquired = false;
        try
        {
            acquired = _set.TryAdd(item, true);
            if (acquired)
            {
                _queue.Enqueue(item);
                return true;
            }
            return false;
        }
        finally
        {
            if (acquired) _set.TryUpdate(item, false, true);
        }
    }

    public bool TryDequeue(out T item)
    {
        while (_queue.TryPeek(out item))
        {
            bool acquired = false;
            try
            {
                acquired = _set.TryUpdate(item, true, false);
                if (acquired)
                {
                    if (_queue.TryDequeueCas(out item, item))
                    {
                        return true;
                    }
                }
            }
            finally
            {
                if (acquired) _set.TryRemove(item, out acquired);
            }
        }
        item = default(T);
        return false;
    }
}
Run Code Online (Sandbox Code Playgroud)

更新2:

参考您的修改通知,与我的相比,它是多么相似.事实上,如果你从我的变体中删除所有的绒毛,那么该Enqueue方法具有完全相同的语句序列.

我更担心的TryDequeue是,这就是为什么我添加了"虚拟"锁定概念,这在我的实现中需要很多额外的东西.我特别担心访问数据结构的相反顺序(字典然后在Enqueue方法中排队,但是队列中的队列然后是字典TryDequeue)但是,我对你修改过的方法越多,我就越喜欢它.我现在认为这是因为反向访问顺序是安全的!