Ala*_*lan 193 .net c# parallel-processing task-parallel-library
我很高兴看到System.Collections.Concurrent
.Net 4.0中的新命名空间,非常好!我见过ConcurrentDictionary
,ConcurrentQueue
和ConcurrentStack
,ConcurrentBag
和BlockingCollection
.
似乎神秘缺失的一件事是ConcurrentList<T>
.我是否必须自己写(或从网上下载:))?
我错过了一些明显的东西吗?
Dan*_*Tao 162
我试了一会儿(也是:在GitHub上).我的实现有一些问题,我不会在这里讨论.让我告诉你,更重要的是,我学到了什么.
首先,你无法完全实现IList<T>
无锁和线程安全.尤其是,随机插入和删除都没有去上班,除非你也忘了O(1)随机存取(即,除非你"欺骗",只是用某种链表,让索引吸吮).
我想可能是值得的是线程安全的,集有限的IList<T>
:尤其是,一个将允许Add
并提供随机只读通过索引访问(但没有Insert
,RemoveAt
等,还没有随机写入访问).
这是我ConcurrentList<T>
实施的目标.但是当我在多线程场景中测试其性能时,我发现简单地同步添加到a List<T>
更快.基本上,添加到a List<T>
已经是闪电般快速; 所涉及的计算步骤的复杂性微乎其微(增加索引并分配给数组中的元素; 实际上就是这样).你需要大量的并发写入才能看到任何类型的锁争用; 即使这样,每次写入的平均性能仍然会击败更昂贵但无锁定的实现ConcurrentList<T>
.
在列表的内部数组需要调整自身大小的相对罕见的情况下,您需要支付少量费用.所以最终我得出结论,这是一个利基场景,其中只添加ConcurrentList<T>
集合类型是有意义的:当您希望保证在每次调用时添加元素的低开销(因此,与摊销的性能目标相反).
它根本不像你想象的那样有用.
Hen*_*man 37
你会用什么ConcurrentList?
线程世界中随机访问容器的概念并不像它看起来那么有用.该声明
if (i < MyConcurrentList.Count)
x = MyConcurrentList[i];
Run Code Online (Sandbox Code Playgroud)
作为一个整体仍然不是线程安全的.
而不是创建ConcurrentList,尝试用那里的东西构建解决方案.最常见的类是ConcurrentBag,尤其是BlockingCollection.
Bri*_*oth 19
尽管已经提供了很好的答案,但有时候我只想要一个线程安全的IList.没有任何进步或幻想.在许多情况下,性能很重要,但有时候并不是一个问题.是的,没有"TryGetValue"等方法总会遇到挑战,但大多数情况下我只想要一些我可以枚举的东西,而不必担心把锁放在一切.是的,有人可能会在我的实现中发现一些可能导致死锁或其他东西的"错误"(我想)但是老实说:当谈到多线程时,如果你没有正确编写代码,那么无论如何都要陷入僵局.考虑到这一点,我决定创建一个简单的ConcurrentList实现来提供这些基本需求.
它的价值在于:我做了一个基本测试,将10,000,000个项目添加到常规List和ConcurrentList中,结果如下:
列表完成:7793毫秒.并发完成:8064毫秒.
public class ConcurrentList<T> : IList<T>, IDisposable
{
#region Fields
private readonly List<T> _list;
private readonly ReaderWriterLockSlim _lock;
#endregion
#region Constructors
public ConcurrentList()
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>();
}
public ConcurrentList(int capacity)
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>(capacity);
}
public ConcurrentList(IEnumerable<T> items)
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>(items);
}
#endregion
#region Methods
public void Add(T item)
{
try
{
this._lock.EnterWriteLock();
this._list.Add(item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public void Insert(int index, T item)
{
try
{
this._lock.EnterWriteLock();
this._list.Insert(index, item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public bool Remove(T item)
{
try
{
this._lock.EnterWriteLock();
return this._list.Remove(item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public void RemoveAt(int index)
{
try
{
this._lock.EnterWriteLock();
this._list.RemoveAt(index);
}
finally
{
this._lock.ExitWriteLock();
}
}
public int IndexOf(T item)
{
try
{
this._lock.EnterReadLock();
return this._list.IndexOf(item);
}
finally
{
this._lock.ExitReadLock();
}
}
public void Clear()
{
try
{
this._lock.EnterWriteLock();
this._list.Clear();
}
finally
{
this._lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
try
{
this._lock.EnterReadLock();
return this._list.Contains(item);
}
finally
{
this._lock.ExitReadLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
try
{
this._lock.EnterReadLock();
this._list.CopyTo(array, arrayIndex);
}
finally
{
this._lock.ExitReadLock();
}
}
public IEnumerator<T> GetEnumerator()
{
return new ConcurrentEnumerator<T>(this._list, this._lock);
}
IEnumerator IEnumerable.GetEnumerator()
{
return new ConcurrentEnumerator<T>(this._list, this._lock);
}
~ConcurrentList()
{
this.Dispose(false);
}
public void Dispose()
{
this.Dispose(true);
}
private void Dispose(bool disposing)
{
if (disposing)
GC.SuppressFinalize(this);
this._lock.Dispose();
}
#endregion
#region Properties
public T this[int index]
{
get
{
try
{
this._lock.EnterReadLock();
return this._list[index];
}
finally
{
this._lock.ExitReadLock();
}
}
set
{
try
{
this._lock.EnterWriteLock();
this._list[index] = value;
}
finally
{
this._lock.ExitWriteLock();
}
}
}
public int Count
{
get
{
try
{
this._lock.EnterReadLock();
return this._list.Count;
}
finally
{
this._lock.ExitReadLock();
}
}
}
public bool IsReadOnly
{
get { return false; }
}
#endregion
}
public class ConcurrentEnumerator<T> : IEnumerator<T>
{
#region Fields
private readonly IEnumerator<T> _inner;
private readonly ReaderWriterLockSlim _lock;
#endregion
#region Constructor
public ConcurrentEnumerator(IEnumerable<T> inner, ReaderWriterLockSlim @lock)
{
this._lock = @lock;
this._lock.EnterReadLock();
this._inner = inner.GetEnumerator();
}
#endregion
#region Methods
public bool MoveNext()
{
return _inner.MoveNext();
}
public void Reset()
{
_inner.Reset();
}
public void Dispose()
{
this._lock.ExitReadLock();
}
#endregion
#region Properties
public T Current
{
get { return _inner.Current; }
}
object IEnumerator.Current
{
get { return _inner.Current; }
}
#endregion
}
Run Code Online (Sandbox Code Playgroud)
Ste*_*ary 11
ConcurrentList
(作为可调整大小的数组,而不是链接列表)使用非阻塞操作不容易编写.它的API不能很好地转换为"并发"版本.
Ste*_*enz 10
没有ConcurrentList的原因是因为它从根本上无法编写.原因是IList中的几个重要操作依赖于索引,而这种操作只是简单无效.例如:
int catIndex = list.IndexOf("cat");
list.Insert(catIndex, "dog");
Run Code Online (Sandbox Code Playgroud)
作者要追求的效果是在"cat"之前插入"dog",但在多线程环境中,任何事情都可能发生在这两行代码之间的列表中.例如,另一个线程可能会做list.RemoveAt(0)
,将整个列表向左移动,但至关重要的是,catIndex不会改变.这里的影响是,该Insert
操作将真正把"狗" 后,猫,之前不是.
您看到作为这个问题的"答案"提供的几个实现是有意义的,但如上所示,它们不提供可靠的结果.如果你真的想在多线程环境列表类似的语义,你不能把锁那里内部列表实现方法.您必须确保您使用的任何索引完全位于锁的上下文中.结果是你可以在具有正确锁定的多线程环境中使用List,但是列表本身不能存在于那个世界中.
如果您认为需要并发列表,那么实际上只有两种可能性:
如果你有一个ConcurrentBag并且你需要将它作为IList传递,那么你有一个问题,因为你所调用的方法已经指定他们可能会尝试像我上面用cat做的那样做狗.在大多数世界中,这意味着您调用的方法根本无法在多线程环境中工作.这意味着你要么重构它,要么它是,或者,如果你不能,你将不得不非常小心地处理它.您几乎肯定会要求使用自己的锁创建自己的集合,并在锁中调用违规方法.
System.Collections.Generic.List<t>
对于多个读者来说已经是线程安全的。试图使其对多个编写者来说线程安全是没有意义的。(出于亨克和斯蒂芬已经提到的原因)
如果读取数量大大超过写入数量,或者(非常频繁)写入是非并发的,则写入时复制方法可能是合适的.
下面的实现是
var snap = _list; snap[snap.Count - 1];
永远不会(当然,除了一个空列表)抛出,你也可以免费获得带有快照语义的线程安全枚举..我多么喜欢不变性!对于copy-on-write工作,你必须保持你的数据结构有效地不可变,即在你将它们提供给其他线程后,不允许任何人更改它们.当你想要修改时,你
码
static class CopyOnWriteSwapper
{
public static void Swap<T>(ref T obj, Func<T, T> cloner, Action<T> op)
where T : class
{
while (true)
{
var objBefore = Volatile.Read(ref obj);
var newObj = cloner(objBefore);
op(newObj);
if (Interlocked.CompareExchange(ref obj, newObj, objBefore) == objBefore)
return;
}
}
}
Run Code Online (Sandbox Code Playgroud)
用法
CopyOnWriteSwapper.Swap(ref _myList,
orig => new List<string>(orig),
clone => clone.Add("asdf"));
Run Code Online (Sandbox Code Playgroud)
如果你需要更多的性能,它将有助于取消方法,例如为你想要的每种类型的修改(添加,删除,...)创建一个方法,并硬编码函数指针cloner
和op
.
NB#1您有责任确保没有人修改(据称)不可变数据结构.我们无法在通用实现中做任何事情来防止这种情况,但是在专门研究时List<T>
,您可以使用List.AsReadOnly()防止修改
NB#2注意列表中的值.上面的写入方法复制只保护它们的列表成员资格,但是如果你没有放置字符串,而是放入其他一些可变对象,则必须注意线程安全(例如锁定).但这与该解决方案正交,并且例如可以容易地使用可变值的锁定而没有问题.你只需要知道它.
NB#3如果您的数据结构庞大并且经常对其进行修改,那么就内存消耗和所涉及的CPU复制成本而言,copy-all-on-write方法可能会受到限制.在这种情况下,您可能希望使用MS的不可变集合.