资源池的正确实现方法

Mat*_*and 2 c# multithreading pool

我正在尝试实现一些管理资源池的东西,以便调用代码可以请求一个对象,并且如果可用,将从池中获取一个对象,否则将使其等待。但是,我无法使同步正常工作。我的池类中的内容是这样的(其中初始autoEvent设置AutoResetEvent为信号:

public Foo GetFooFromPool()
{
    autoEvent.WaitOne();
    var foo = Pool.FirstOrDefault(p => !p.InUse);
    if (foo != null)
    {
        foo.InUse = true;
        autoEvent.Set();
        return foo;
    }
    else if (Pool.Count < Capacity)
    {
        System.Diagnostics.Debug.WriteLine("count {0}\t capacity {1}", Pool.Count, Capacity);
        foo = new Foo() { InUse = true };
        Pool.Add(foo);
        autoEvent.Set();
        return foo;
    }
    else
    {
        return GetFooFromPool();
    }
}

public void ReleaseFoo(Foo p)
{
    p.InUse = false;
    autoEvent.Set();
}
Run Code Online (Sandbox Code Playgroud)

这个想法是,当您调用 时GetFooFromPool,您会等到收到信号,然后尝试查找Foo未使用的现有对象。如果您找到一个,我们将其设置为InUse,然后发出一个信号,以便其他线程可以继续进行。如果找不到,我们会检查池是否已满。如果没有,我们创建一个新的Foo,将其添加到池中并再次发出信号。如果这两个条件都不满足,我们将再次致电等待GetFooFromPool

现在,ReleaseFoo我们只需将其设置InUse回 false,并向等待的下一个线程GetFooFromPool(如果有)发出信号,以尝试获取Foo.

问题似乎出在我管理池的大小上。容量为5,我最终得到6 Foos。我可以看到在我的调试行中count 0出现了几次,并且count 1也可能出现了几次。很明显,我有多个线程进入该块,但据我所知,它们不应该能够进入该块。

我在这里做错了什么?

编辑:像这样的双重检查锁:

else if (Pool.Count < Capacity)
{
    lock(locker)
    {
        if (Pool.Count < Capacity)
        {
            System.Diagnostics.Debug.WriteLine("count {0}\t capacity {1}", Pool.Count, Capacity);
            foo = new Foo() { InUse = true };
            Pool.Add(foo);
            autoEvent.Set();
            return foo;
        }
    }
} 
Run Code Online (Sandbox Code Playgroud)

似乎确实解决了问题,但我不确定这是最优雅的方法。

Ale*_*lex 7

正如评论中已经提到的,计数信号量是你的朋友。将其与并发堆栈结合起来,您就得到了一个简单、线程安全的实现,您仍然可以延迟分配池项。

下面的基本实现提供了这种方法的示例。请注意,这里的另一个优点是您不需要使用成员InUse作为跟踪内容的标志来“污染”池项目。

请注意,作为微优化,在这种情况下,堆栈优于队列,因为它将提供从池中最近返回的实例,该实例可能仍位于 L1 缓存中。

public class GenericConcurrentPool<T> : IDisposable where T : class
{
    private readonly SemaphoreSlim _sem;
    private readonly ConcurrentStack<T> _itemsStack;
    private readonly Action<T> _onDisposeItem;
    private readonly Func<T> _factory;

    public GenericConcurrentPool(int capacity, Func<T> factory, Action<T> onDisposeItem = null)
    {
        _itemsStack = new ConcurrentStack<T>(new T[capacity]);
        _factory = factory;
        _onDisposeItem = onDisposeItem;
        _sem = new SemaphoreSlim(capacity);
    }

    public async Task<T> CheckOutAsync()
    {
        await _sem.WaitAsync();
        return Pop();
    }

    public T CheckOut()
    {
        _sem.Wait();
        return Pop();
    }

    public void CheckIn(T item)
    {
        Push(item);
        _sem.Release();
    }

    public void Dispose()
    {
        _sem.Dispose();
        if (_onDisposeItem != null)
        {
            T item;
            while (_itemsStack.TryPop(out item))
            {
                if (item != null)
                    _onDisposeItem(item);
            }
        }
    }

    private T Pop()
    {
        T item;
        var result = _itemsStack.TryPop(out item);
        Debug.Assert(result);
        return item ?? _factory();
    }

    private void Push(T item)
    {
        Debug.Assert(item != null);
        _itemsStack.Push(item);
    }
}
Run Code Online (Sandbox Code Playgroud)