Dan*_*Tao 4 .net queue concurrency multithreading set
更新:正如Brian指出的那样,我最初的想法确实存在并发问题.这个ConcurrentDictionary<TKey, TValue>.AddOrUpdate
方法的签名有点模糊,可以让一个懒惰的思想家(比如我自己)相信所有东西 - 集合添加以及队列推送 - 会以某种方式同时发生,原子地(即,神奇地) ).
回想起来,有这种期望对我来说是愚蠢的.实际上,无论执行情况如何AddOrUpdate
,都应该很清楚,我的原始想法中仍会存在竞争条件,正如Brian所指出的那样:在添加到集合之前推送到队列,因此可能发生以下事件序列:
上述序列将导致集合中的项目不在队列中,从而有效地将该项目列入数据结构的黑名单.
现在,我想了一会儿,我开始认为以下方法可以解决这些问题:
public bool Enqueue(T item)
{
// This should:
// 1. return true only when the item is first added to the set
// 2. subsequently return false as long as the item is in the set;
// and it will not be removed until after it's popped
if (_set.TryAdd(item, true))
{
_queue.Enqueue(item);
return true;
}
return false;
}
Run Code Online (Sandbox Code Playgroud)
以这种方式构造它,Enqueue
调用只发生一次 - 在项目集合之后.因此,队列中的重复项应该不是问题.并且似乎由于队列操作被设置操作"预订" - 即,只有在项目被添加到集合之后才推送它,并且在它从集合中移除之前弹出它 - 上面列出的有问题的事件序列不应该发生.
人们怎么想?难道这可以解决这个问题吗?(就像布莱恩一样,我倾向于怀疑自己并猜测答案是否定的,我再次错过了一些东西.但是,嘿,如果它很容易就不会是一个有趣的挑战吧?)
我确实在SO上看到了类似的问题,但令人惊讶的是(考虑到.NET这个网站的重量程度如何),它们似乎都是针对Java的.
我本质上需要一个线程安全的set/queue组合类.换句话说,它应该是一个不允许重复的FIFO集合(因此,如果同一项已经在队列中,后续Enqueue
调用将返回false,直到该项从队列中弹出).
我意识到我可以通过一个简单的方法很容易地实现这一点,HashSet<T>
并Queue<T>
在所有必要的地方锁定.然而,我感兴趣的是与完成它ConcurrentDictionary<TKey, TValue>
和ConcurrentQueue<T>
类从.NET 4.0(也可为Rx扩展.NET 3.5,这是我使用的是什么样的一部分),我的理解是莫名其妙无锁收藏*.
我的基本计划是实现这样的集合:
class ConcurrentSetQueue<T>
{
ConcurrentQueue<T> _queue;
ConcurrentDictionary<T, bool> _set;
public ConcurrentSetQueue(IEqualityComparer<T> comparer)
{
_queue = new ConcurrentQueue<T>();
_set = new ConcurrentDictionary<T, bool>(comparer);
}
public bool Enqueue(T item)
{
// This should:
// 1. if the key is not present, enqueue the item and return true
// 2. if the key is already present, do nothing and return false
return _set.AddOrUpdate(item, EnqueueFirst, EnqueueSecond);
}
private bool EnqueueFirst(T item)
{
_queue.Enqueue(item);
return true;
}
private bool EnqueueSecond(T item, bool dummyFlag)
{
return false;
}
public bool TryDequeue(out T item)
{
if (_queue.TryDequeue(out item))
{
// Another thread could come along here, attempt to enqueue, and
// fail; however, this seems like an acceptable scenario since the
// item shouldn't really be considered "popped" until it's been
// removed from both the queue and the dictionary.
bool flag;
_set.TryRemove(item, out flag);
return true;
}
return false;
}
}
Run Code Online (Sandbox Code Playgroud)
我是否正确地想到了这一点?从表面上看,我在上面写的这个基本概念中看不到任何明显的错误.但也许我忽视了一些事情.或者使用ConcurrentQueue<T>
带有a的a ConcurrentDictionary<T, bool>
实际上并不是明智的选择,原因是我没有想到的原因.或许其他人已经在某个经过实战验证的图书馆中实现了这个想法,我应该只使用它.
任何有关此主题的想法或有用信息将不胜感激!
*这是否严格准确,我不知道; 但是性能测试已经向我表明,它们的表现优于使用许多消费者线程锁定的可比手工收藏品.
简称是否定的,问题中提供的代码不是线程安全的.
MSDN文档在AddOrUpdate
方法上相当稀疏,所以我看了一下AddOrUpdate
Reflector 中的方法.这是基本算法(由于法律原因,我不发布Reflector输出,并且很容易自己做).
TValue value;
do
{
if (!TryGetValue(...))
{
value = AddValueFactoryDelegate(key);
if (!TryAddInternal(...))
{
continue;
}
return value;
}
value = UpdateValueFactoryDelegate(key);
}
while (!TryUpdate(...))
return value;
Run Code Online (Sandbox Code Playgroud)
很明显,AddValueFactoryDelegate
并且UpdateValueFactoryDelegate
可以执行多次.这里不需要进一步解释.应该很明显这会如何破坏你的代码.我实际上有点震惊,代表们可以多次执行.该文件没有提到这一点.您会认为这将是一个非常重要的观点,因此呼叫者知道避免传递具有副作用的代表(就像您的情况一样).
但即使代表们只保留执行一次仍然存在问题.通过将Enqueue
方法替换为方法的内容,可以很容易地显示问题序列AddOrUpdate
.该AddValueFactoryDelegate
会执行并插入一项_queue
,但线程可以通过上下文切换项目新增至前无谓的干扰_set
.然后第二个线程可以调用您的TryDequeue
方法并从中提取该项_queue
,但_set
由于它尚未存在而无法将其删除.
更新:
好吧,我认为不可能让它发挥作用.ConcurrentQueue
缺少一项重要的操作.我相信你需要一个等效的CASTryDequeue
方法.如果存在这样的操作,那么我认为以下代码是正确的.我使用神秘的TryDequeueCas
方法接受一个比较值,该值用作条件,当且仅当队列中的顶部项等于比较值时,才能原子地执行此操作.这个想法与该Interlocked.CompareExchange
方法中使用的完全相同.
请注意代码如何使用bool
的值ConcurrentDictionary
作为一个"虚拟"锁同步队列和字典的协调.数据结构还包含CAS等效操作TryUpdate
,该操作用于获取和释放此"虚拟"锁.并且因为锁是"虚拟的"并且实际上不阻止并发访问,所以while
该TryDequeue
方法中的循环是强制性的.这符合CAS操作的规范模式,因为它们通常在循环中执行,直到它们成功.
该代码还使用.NET 4.0样式的try-finally模式来获取锁定获取语义,以帮助防止由带外(异步)异常引起的问题.
注意:同样,代码使用神话ConcurrentQueue.TryDequeueCas
方法.
class ConcurrentSetQueue<T>
{
ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
ConcurrentDictionary<T, bool> _set = new ConcurrentDictionary<T, bool>();
public ConcurrentSetQueue()
{
}
public bool Enqueue(T item)
{
bool acquired = false;
try
{
acquired = _set.TryAdd(item, true);
if (acquired)
{
_queue.Enqueue(item);
return true;
}
return false;
}
finally
{
if (acquired) _set.TryUpdate(item, false, true);
}
}
public bool TryDequeue(out T item)
{
while (_queue.TryPeek(out item))
{
bool acquired = false;
try
{
acquired = _set.TryUpdate(item, true, false);
if (acquired)
{
if (_queue.TryDequeueCas(out item, item))
{
return true;
}
}
}
finally
{
if (acquired) _set.TryRemove(item, out acquired);
}
}
item = default(T);
return false;
}
}
Run Code Online (Sandbox Code Playgroud)
更新2:
参考您的修改通知,与我的相比,它是多么相似.事实上,如果你从我的变体中删除所有的绒毛,那么该Enqueue
方法具有完全相同的语句序列.
我更担心的TryDequeue
是,这就是为什么我添加了"虚拟"锁定概念,这在我的实现中需要很多额外的东西.我特别担心访问数据结构的相反顺序(字典然后在Enqueue
方法中排队,但是队列中的队列然后是字典TryDequeue
)但是,我对你修改过的方法越多,我就越喜欢它.我现在认为这是因为反向访问顺序是安全的!
归档时间: |
|
查看次数: |
918 次 |
最近记录: |