Mat*_*and 2 c# multithreading pool
我正在尝试实现一些管理资源池的东西,以便调用代码可以请求一个对象,并且如果可用,将从池中获取一个对象,否则将使其等待。但是,我无法使同步正常工作。我的池类中的内容是这样的(其中初始autoEvent设置AutoResetEvent为信号:
public Foo GetFooFromPool()
{
autoEvent.WaitOne();
var foo = Pool.FirstOrDefault(p => !p.InUse);
if (foo != null)
{
foo.InUse = true;
autoEvent.Set();
return foo;
}
else if (Pool.Count < Capacity)
{
System.Diagnostics.Debug.WriteLine("count {0}\t capacity {1}", Pool.Count, Capacity);
foo = new Foo() { InUse = true };
Pool.Add(foo);
autoEvent.Set();
return foo;
}
else
{
return GetFooFromPool();
}
}
public void ReleaseFoo(Foo p)
{
p.InUse = false;
autoEvent.Set();
}
Run Code Online (Sandbox Code Playgroud)
这个想法是,当您调用 时GetFooFromPool,您会等到收到信号,然后尝试查找Foo未使用的现有对象。如果您找到一个,我们将其设置为InUse,然后发出一个信号,以便其他线程可以继续进行。如果找不到,我们会检查池是否已满。如果没有,我们创建一个新的Foo,将其添加到池中并再次发出信号。如果这两个条件都不满足,我们将再次致电等待GetFooFromPool。
现在,ReleaseFoo我们只需将其设置InUse回 false,并向等待的下一个线程GetFooFromPool(如果有)发出信号,以尝试获取Foo.
问题似乎出在我管理池的大小上。容量为5,我最终得到6 Foos。我可以看到在我的调试行中count 0出现了几次,并且count 1也可能出现了几次。很明显,我有多个线程进入该块,但据我所知,它们不应该能够进入该块。
我在这里做错了什么?
编辑:像这样的双重检查锁:
else if (Pool.Count < Capacity)
{
lock(locker)
{
if (Pool.Count < Capacity)
{
System.Diagnostics.Debug.WriteLine("count {0}\t capacity {1}", Pool.Count, Capacity);
foo = new Foo() { InUse = true };
Pool.Add(foo);
autoEvent.Set();
return foo;
}
}
}
Run Code Online (Sandbox Code Playgroud)
似乎确实解决了问题,但我不确定这是最优雅的方法。
正如评论中已经提到的,计数信号量是你的朋友。将其与并发堆栈结合起来,您就得到了一个简单、线程安全的实现,您仍然可以延迟分配池项。
下面的基本实现提供了这种方法的示例。请注意,这里的另一个优点是您不需要使用成员InUse作为跟踪内容的标志来“污染”池项目。
请注意,作为微优化,在这种情况下,堆栈优于队列,因为它将提供从池中最近返回的实例,该实例可能仍位于 L1 缓存中。
public class GenericConcurrentPool<T> : IDisposable where T : class
{
private readonly SemaphoreSlim _sem;
private readonly ConcurrentStack<T> _itemsStack;
private readonly Action<T> _onDisposeItem;
private readonly Func<T> _factory;
public GenericConcurrentPool(int capacity, Func<T> factory, Action<T> onDisposeItem = null)
{
_itemsStack = new ConcurrentStack<T>(new T[capacity]);
_factory = factory;
_onDisposeItem = onDisposeItem;
_sem = new SemaphoreSlim(capacity);
}
public async Task<T> CheckOutAsync()
{
await _sem.WaitAsync();
return Pop();
}
public T CheckOut()
{
_sem.Wait();
return Pop();
}
public void CheckIn(T item)
{
Push(item);
_sem.Release();
}
public void Dispose()
{
_sem.Dispose();
if (_onDisposeItem != null)
{
T item;
while (_itemsStack.TryPop(out item))
{
if (item != null)
_onDisposeItem(item);
}
}
}
private T Pop()
{
T item;
var result = _itemsStack.TryPop(out item);
Debug.Assert(result);
return item ?? _factory();
}
private void Push(T item)
{
Debug.Assert(item != null);
_itemsStack.Push(item);
}
}
Run Code Online (Sandbox Code Playgroud)