C#Object Pooling Pattern实现

Chr*_*sic 160 c# design-patterns pooling

有没有人有很好的资源来实现Sql连接池静脉中有限资源的共享对象池策略?(即完全实现它是线程安全的).

要跟进@Aaronaught请求澄清,池的使用将用于对外部服务的负载平衡请求.把它放在一个可能更容易立即理解的场景中,而不是直接的场景.我有一个会话对象,其功能ISession与NHibernate中的对象类似.每个唯一会话管理它与数据库的连接.目前我有一个长时间运行的会话对象,我遇到的问题是我的服务提供商限制我对这个单独会话的使用.

由于他们缺乏将单个会话视为长期服务帐户的期望,他们显然将其视为正在锤击其服务的客户.这让我想到了我的问题,而不是只有一个单独的会话,我会创建一个不同会话池,并将请求分成多个会话的服务,而不是像我以前那样创建一个单一的焦点.

希望该背景提供一些价值,但直接回答您的一些问题:

问:创建的对象是否昂贵?
答:没有对象是有限资源的池

问:他们会被频繁获得/释放吗?
答:是的,可以再次考虑NHibernate ISessions,其中通常在每个页面请求的持续时间内获取和释放1.

问:简单的先来先服务是否足够,或者你需要更聪明的东西,即能防止饥饿吗?
答:一个简单的循环类型分发就足够了,饥饿我认为你的意思是,如果没有可用的会话,呼叫者被阻塞等待发布.这不是真正适用的,因为会话可以由不同的呼叫者共享.我的目标是在多个会话中分配使用情况,而不是单个会话.

我认为这可能与对象池的正常使用有所不同,这就是为什么我最初将这部分留下来并计划仅仅为了调整模式以允许共享对象而不是允许饥饿情况发生.

问:优先事项,懒惰与急切加载等问题如何?
答:没有涉及优先级,为简单起见,假设我将在创建池本身时创建可用对象池.

Aar*_*ght 309

由于几个未知因素,这个问题比人们预期的要复杂一点:资源的行为被汇集,对象的预期/需要的生命周期,需要池的真正原因等等.通常,池是特殊用途的 - 线程池,连接池等 - 因为当您确切知道资源的作用时更容易优化,更重要的是可以控制资源的实现方式.

因为它并不那么简单,我试图做的是提供一种相当灵活的方法,你可以试验并看看哪种方法效果最好. 为长期职位提前道歉,但在实施一个体面的通用资源池方面有很多理由要做.而且我真的只是摸不着头脑.

通用池必须有一些主要的"设置",包括:

  • 资源加载策略 - 渴望或懒惰;
  • 资源加载机制 - 如何实际构建一个;
  • 访问策略 - 你提到"循环",它不像听起来那么简单; 此实现可以使用类似但不完美的循环缓冲区,因为池无法控制何时实际回收资源.其他选项包括FIFO和LIFO; FIFO将具有更多的随机访问模式,但LIFO使得实现最近最少使用的释放策略(您说它超出范围,但仍然值得一提)变得更加容易.

对于资源加载机制,.NET已经为我们提供了一个干净的抽象 - 委托.

private Func<Pool<T>, T> factory;
Run Code Online (Sandbox Code Playgroud)

通过池的构造函数传递这个,我们就完成了.使用带new()约束的泛型类型也可以,但这更灵活.


在其他两个参数中,访问策略是更复杂的野兽,所以我的方法是使用基于继承(接口)的方法:

public class Pool<T> : IDisposable
{
    // Other code - we'll come back to this

    interface IItemStore
    {
        T Fetch();
        void Store(T item);
        int Count { get; }
    }
}
Run Code Online (Sandbox Code Playgroud)

这里的概念很简单 - 我们将让公共Pool类处理线程安全等常见问题,但为每个访问模式使用不同的"项目存储".LIFO很容易用堆栈表示,FIFO是队列,我使用了一个非常优化但很可能足够的循环缓冲区实现,使用List<T>和索引指针来近似循环访问模式.

下面的所有类都是内部类Pool<T>- 这是一种风格选择,但由于这些并不意味着在外面使用Pool,因此它最有意义.

    class QueueStore : Queue<T>, IItemStore
    {
        public QueueStore(int capacity) : base(capacity)
        {
        }

        public T Fetch()
        {
            return Dequeue();
        }

        public void Store(T item)
        {
            Enqueue(item);
        }
    }

    class StackStore : Stack<T>, IItemStore
    {
        public StackStore(int capacity) : base(capacity)
        {
        }

        public T Fetch()
        {
            return Pop();
        }

        public void Store(T item)
        {
            Push(item);
        }
    }
Run Code Online (Sandbox Code Playgroud)

这些是显而易见的 - 堆栈和队列.我认为他们真的不需要太多解释.循环缓冲区有点复杂:

    class CircularStore : IItemStore
    {
        private List<Slot> slots;
        private int freeSlotCount;
        private int position = -1;

        public CircularStore(int capacity)
        {
            slots = new List<Slot>(capacity);
        }

        public T Fetch()
        {
            if (Count == 0)
                throw new InvalidOperationException("The buffer is empty.");

            int startPosition = position;
            do
            {
                Advance();
                Slot slot = slots[position];
                if (!slot.IsInUse)
                {
                    slot.IsInUse = true;
                    --freeSlotCount;
                    return slot.Item;
                }
            } while (startPosition != position);
            throw new InvalidOperationException("No free slots.");
        }

        public void Store(T item)
        {
            Slot slot = slots.Find(s => object.Equals(s.Item, item));
            if (slot == null)
            {
                slot = new Slot(item);
                slots.Add(slot);
            }
            slot.IsInUse = false;
            ++freeSlotCount;
        }

        public int Count
        {
            get { return freeSlotCount; }
        }

        private void Advance()
        {
            position = (position + 1) % slots.Count;
        }

        class Slot
        {
            public Slot(T item)
            {
                this.Item = item;
            }

            public T Item { get; private set; }
            public bool IsInUse { get; set; }
        }
    }
Run Code Online (Sandbox Code Playgroud)

我本可以选择一些不同的方法,但最重要的是应该按照创建它们的顺序访问资源,这意味着我们必须保持对它们的引用,但将它们标记为"在使用中"(或不是).在最坏的情况下,只有一个插槽可用,并且每次获取都需要完整的缓冲区迭代.如果您有数百个资源汇集并且每秒多次获取和释放它们,那就太糟糕了; 对于5-10个项目的池来说并不是真正的问题,并且在典型的情况下,资源被轻微使用,它只需要前进一个或两个插槽.

请记住,这些类是私有内部类 - 这就是为什么它们不需要进行大量的错误检查,池本身限制了对它们的访问.

抛出枚举和工厂方法,我们完成了这部分:

// Outside the pool
public enum AccessMode { FIFO, LIFO, Circular };

    private IItemStore itemStore;

    // Inside the Pool
    private IItemStore CreateItemStore(AccessMode mode, int capacity)
    {
        switch (mode)
        {
            case AccessMode.FIFO:
                return new QueueStore(capacity);
            case AccessMode.LIFO:
                return new StackStore(capacity);
            default:
                Debug.Assert(mode == AccessMode.Circular,
                    "Invalid AccessMode in CreateItemStore");
                return new CircularStore(capacity);
        }
    }
Run Code Online (Sandbox Code Playgroud)

要解决的下一个问题是加载策略.我已经定义了三种类型:

public enum LoadingMode { Eager, Lazy, LazyExpanding };
Run Code Online (Sandbox Code Playgroud)

前两个应该是不言自明的; 第三种是混合型,它延迟加载资源但实际上并没有开始重新使用任何资源,直到池满了.如果你想让游泳池充满(听起来像你这样做),这将是一个很好的权衡,但是想要推迟实际创建它们的费用,直到第一次访问(即改善启动时间).

加载方法真的不太复杂,现在我们有了项目存储抽象:

    private int size;
    private int count;

    private T AcquireEager()
    {
        lock (itemStore)
        {
            return itemStore.Fetch();
        }
    }

    private T AcquireLazy()
    {
        lock (itemStore)
        {
            if (itemStore.Count > 0)
            {
                return itemStore.Fetch();
            }
        }
        Interlocked.Increment(ref count);
        return factory(this);
    }

    private T AcquireLazyExpanding()
    {
        bool shouldExpand = false;
        if (count < size)
        {
            int newCount = Interlocked.Increment(ref count);
            if (newCount <= size)
            {
                shouldExpand = true;
            }
            else
            {
                // Another thread took the last spot - use the store instead
                Interlocked.Decrement(ref count);
            }
        }
        if (shouldExpand)
        {
            return factory(this);
        }
        else
        {
            lock (itemStore)
            {
                return itemStore.Fetch();
            }
        }
    }

    private void PreloadItems()
    {
        for (int i = 0; i < size; i++)
        {
            T item = factory(this);
            itemStore.Store(item);
        }
        count = size;
    }
Run Code Online (Sandbox Code Playgroud)

上面的sizecount字段分别表示池的最大大小和池拥有的资源总数(但不一定是可用的). AcquireEager是最简单的,它假定一个项目已经存储在商店中 - 这些项目将在构造时预加载,即在PreloadItems最后显示的方法中.

AcquireLazy检查池中是否有免费项目,如果没有,则创建一个新项目. AcquireLazyExpanding只要池尚未达到其目标大小,它将创建一个新资源.我试图优化这个以最小化锁定,我希望我没有犯任何错误(我已经在多线程条件下测试了这个,但显然不是详尽无遗).

您可能想知道为什么这些方法都不会检查商店是否达到最大尺寸.我马上就会谈到这一点.


现在为游泳池本身.以下是完整的私有数据集,其中一些已经显示:

    private bool isDisposed;
    private Func<Pool<T>, T> factory;
    private LoadingMode loadingMode;
    private IItemStore itemStore;
    private int size;
    private int count;
    private Semaphore sync;
Run Code Online (Sandbox Code Playgroud)

回答我在最后一段中掩饰的问题 - 如何确保我们限制创建的资源总数 - 事实证明.NET已经拥有了一个非常好的工具,它被称为Semaphore,它专门设计用于修复线程数访问资源(在这种情况下,"资源"是内部项目存储).由于我们没有实现完整的生产者/消费者队列,这完全足以满足我们的需求.

构造函数如下所示:

    public Pool(int size, Func<Pool<T>, T> factory,
        LoadingMode loadingMode, AccessMode accessMode)
    {
        if (size <= 0)
            throw new ArgumentOutOfRangeException("size", size,
                "Argument 'size' must be greater than zero.");
        if (factory == null)
            throw new ArgumentNullException("factory");

        this.size = size;
        this.factory = factory;
        sync = new Semaphore(size, size);
        this.loadingMode = loadingMode;
        this.itemStore = CreateItemStore(accessMode, size);
        if (loadingMode == LoadingMode.Eager)
        {
            PreloadItems();
        }
    }
Run Code Online (Sandbox Code Playgroud)

这里应该不出意外.唯一需要注意的是使用PreloadItems前面已经显示的方法进行预先加载的特殊外壳.

由于现在几乎所有东西都被彻底抽象掉了,实际AcquireRelease方法真的非常简单:

    public T Acquire()
    {
        sync.WaitOne();
        switch (loadingMode)
        {
            case LoadingMode.Eager:
                return AcquireEager();
            case LoadingMode.Lazy:
                return AcquireLazy();
            default:
                Debug.Assert(loadingMode == LoadingMode.LazyExpanding,
                    "Unknown LoadingMode encountered in Acquire method.");
                return AcquireLazyExpanding();
        }
    }

    public void Release(T item)
    {
        lock (itemStore)
        {
            itemStore.Store(item);
        }
        sync.Release();
    }
Run Code Online (Sandbox Code Playgroud)

如前所述,我们使用Semaphore控制并发而不是虔诚地检查项目存储的状态.只要获得的物品被正确释放,就没有什么可担心的了.

最后但并非最不重要的是,有清理:

    public void Dispose()
    {
        if (isDisposed)
        {
            return;
        }
        isDisposed = true;
        if (typeof(IDisposable).IsAssignableFrom(typeof(T)))
        {
            lock (itemStore)
            {
                while (itemStore.Count > 0)
                {
                    IDisposable disposable = (IDisposable)itemStore.Fetch();
                    disposable.Dispose();
                }
            }
        }
        sync.Close();
    }

    public bool IsDisposed
    {
        get { return isDisposed; }
    }
Run Code Online (Sandbox Code Playgroud)

IsDisposed属性的目的将在一瞬间变得清晰.所有主要Dispose方法确实是如果它们实现的话,处理实际的池化项IDisposable.


现在你基本上可以使用这个try-finally块,但是我不喜欢那种语法,因为如果你开始在类和方法之间传递池化资源,那么它将变得非常混乱.使用资源的主类甚至可能没有对池的引用.它真的变得非常混乱,所以更好的方法是创建一个"智能"池化对象.

假设我们从以下简单的接口/类开始:

public interface IFoo : IDisposable
{
    void Test();
}

public class Foo : IFoo
{
    private static int count = 0;

    private int num;

    public Foo()
    {
        num = Interlocked.Increment(ref count);
    }

    public void Dispose()
    {
        Console.WriteLine("Goodbye from Foo #{0}", num);
    }

    public void Test()
    {
        Console.WriteLine("Hello from Foo #{0}", num);
    }
}
Run Code Online (Sandbox Code Playgroud)

这是我们假装的一次性Foo资源,它实现IFoo并具有一些用于生成唯一标识的样板代码.我们所做的是创建另一个特殊的池化对象:

public class PooledFoo : IFoo
{
    private Foo internalFoo;
    private Pool<IFoo> pool;

    public PooledFoo(Pool<IFoo> pool)
    {
        if (pool == null)
            throw new ArgumentNullException("pool");

        this.pool = pool;
        this.internalFoo = new Foo();
    }

    public void Dispose()
    {
        if (pool.IsDisposed)
        {
            internalFoo.Dispose();
        }
        else
        {
            pool.Release(this);
        }
    }

    public void Test()
    {
        internalFoo.Test();
    }
}
Run Code Online (Sandbox Code Playgroud)

这只是将所有"真实"方法代理到其内部IFoo(我们可以使用像Castle这样的动态代理库来实现这一点,但我不会深入研究).它还维护对Pool创建它的引用,这样当我们Dispose这个对象时,它会自动将自身释放回池中. 除了池已被处理 - 这意味着我们处于"清理"模式,在这种情况下,它实际上清理了内部资源.


使用上面的方法,我们可以编写如下代码:

// Create the pool early
Pool<IFoo> pool = new Pool<IFoo>(PoolSize, p => new PooledFoo(p),
    LoadingMode.Lazy, AccessMode.Circular);

// Sometime later on...
using (IFoo foo = pool.Acquire())
{
    foo.Test();
}
Run Code Online (Sandbox Code Playgroud)

这是一件非常好的事情.这就是说,该代码使用IFoo(而不是其创建它的代码)实际上并不需要知道池.您甚至可以使用您喜欢的DI库和作为提供者/工厂注入 IFoo对象Pool<T>.


我已将完整的代码放在PasteBin上,以便您进行复制和粘贴.还有一个简短的测试程序,您可以使用它来使用不同的加载/访问模式和多线程条件,以确保它是线程安全的而不是错误的.

如果您对此有任何问题或疑虑,请与我们联系.

  • 我读过的最全面,最有帮助,最有趣的答案之一. (61认同)
  • 非常令人印象深刻,虽然在大多数情况下有点过度设计.我希望像这样的东西成为框架的一部分. (5认同)

Muh*_*eed 52

.NET Core中的对象池

DOTNET芯具有添加到该基类库(BCL)对象池的实施方案.您可以在此处阅读原始GitHub问题并查看System.Buffers的代码.目前,ArrayPool它是唯一可用的类型,用于池阵列.有一个很好的博客文章在这里.

namespace System.Buffers
{
    public abstract class ArrayPool<T>
    {
        public static ArrayPool<T> Shared { get; internal set; }

        public static ArrayPool<T> Create(int maxBufferSize = <number>, int numberOfBuffers = <number>);

        public T[] Rent(int size);

        public T[] Enlarge(T[] buffer, int newSize, bool clearBuffer = false);

        public void Return(T[] buffer, bool clearBuffer = false);
    }
}
Run Code Online (Sandbox Code Playgroud)

可以在ASP.NET Core中看到它的用法示例.因为它位于dotnet核心BCL中,所以ASP.NET Core可以与其他对象(如Newtonsoft.Json的JSON序列化程序)共享它的对象池.您可以阅读博客文章,了解有关Newtonsoft.Json如何执行此操作的更多信息.

Microsoft Roslyn C#编译器中的对象池

新的Microsoft Roslyn C#编译器包含ObjectPool类型,该类型用于池化经常使用的对象,这些对象通常会被新用起来并经常收集垃圾.这减少了必须发生的垃圾收集操作的数量和大小.有几个不同的子实现都使用ObjectPool(请参阅:为什么Roslyn中有如此多的Object Pooling实现?).

1 - SharedPools - 如果使用BigDefault,则存储20个对象的池或100个.

// Example 1 - In a using statement, so the object gets freed at the end.
using (PooledObject<Foo> pooledObject = SharedPools.Default<List<Foo>>().GetPooledObject())
{
    // Do something with pooledObject.Object
}

// Example 2 - No using statement so you need to be sure no exceptions are not thrown.
List<Foo> list = SharedPools.Default<List<Foo>>().AllocateAndClear();
// Do something with list
SharedPools.Default<List<Foo>>().Free(list);

// Example 3 - I have also seen this variation of the above pattern, which ends up the same as Example 1, except Example 1 seems to create a new instance of the IDisposable [PooledObject<T>][4] object. This is probably the preferred option if you want fewer GC's.
List<Foo> list = SharedPools.Default<List<Foo>>().AllocateAndClear();
try
{
    // Do something with list
}
finally
{
    SharedPools.Default<List<Foo>>().Free(list);
}
Run Code Online (Sandbox Code Playgroud)

2 - ListPoolStringBuilderPool - 不是严格单独的实现,而是围绕上面显示的SharedPools实现的包装器,专门用于List和StringBuilder.因此,这将重新使用存储在SharedPools中的对象池.

// Example 1 - No using statement so you need to be sure no exceptions are thrown.
StringBuilder stringBuilder= StringBuilderPool.Allocate();
// Do something with stringBuilder
StringBuilderPool.Free(stringBuilder);

// Example 2 - Safer version of Example 1.
StringBuilder stringBuilder= StringBuilderPool.Allocate();
try
{
    // Do something with stringBuilder
}
finally
{
    StringBuilderPool.Free(stringBuilder);
}
Run Code Online (Sandbox Code Playgroud)

3 - PooledDictionaryPooledHashSet - 它们直接使用ObjectPool并具有完全独立的对象池.存储128个对象的池.

// Example 1
PooledHashSet<Foo> hashSet = PooledHashSet<Foo>.GetInstance()
// Do something with hashSet.
hashSet.Free();

// Example 2 - Safer version of Example 1.
PooledHashSet<Foo> hashSet = PooledHashSet<Foo>.GetInstance()
try
{
    // Do something with hashSet.
}
finally
{
    hashSet.Free();
}
Run Code Online (Sandbox Code Playgroud)

Microsoft.IO.RecyclableMemoryStream

该库为MemoryStream对象提供池.这是一个替代品System.IO.MemoryStream.它具有完全相同的语义.它由Bing工程师设计.阅读此处的博客文章或查看GitHub上的代码.

var sourceBuffer = new byte[]{0,1,2,3,4,5,6,7}; 
var manager = new RecyclableMemoryStreamManager(); 
using (var stream = manager.GetStream()) 
{ 
    stream.Write(sourceBuffer, 0, sourceBuffer.Length); 
}
Run Code Online (Sandbox Code Playgroud)

请注意,RecyclableMemoryStreamManager应该声明一次,它将在整个过程中生效 - 这就是池.如果您愿意,可以使用多个游泳池.

  • 这是一个很好的答案。在C#6和VS2015成为RTM之后,我很可能将其作为公认的答案,因为如果Rosyln自己对它进行如此调优,那么这显然是最好的。 (2认同)

Cha*_*ion 7

这样的事情可能适合您的需求.

/// <summary>
/// Represents a pool of objects with a size limit.
/// </summary>
/// <typeparam name="T">The type of object in the pool.</typeparam>
public sealed class ObjectPool<T> : IDisposable
    where T : new()
{
    private readonly int size;
    private readonly object locker;
    private readonly Queue<T> queue;
    private int count;


    /// <summary>
    /// Initializes a new instance of the ObjectPool class.
    /// </summary>
    /// <param name="size">The size of the object pool.</param>
    public ObjectPool(int size)
    {
        if (size <= 0)
        {
            const string message = "The size of the pool must be greater than zero.";
            throw new ArgumentOutOfRangeException("size", size, message);
        }

        this.size = size;
        locker = new object();
        queue = new Queue<T>();
    }


    /// <summary>
    /// Retrieves an item from the pool. 
    /// </summary>
    /// <returns>The item retrieved from the pool.</returns>
    public T Get()
    {
        lock (locker)
        {
            if (queue.Count > 0)
            {
                return queue.Dequeue();
            }

            count++;
            return new T();
        }
    }

    /// <summary>
    /// Places an item in the pool.
    /// </summary>
    /// <param name="item">The item to place to the pool.</param>
    public void Put(T item)
    {
        lock (locker)
        {
            if (count < size)
            {
                queue.Enqueue(item);
            }
            else
            {
                using (item as IDisposable)
                {
                    count--;
                }
            }
        }
    }

    /// <summary>
    /// Disposes of items in the pool that implement IDisposable.
    /// </summary>
    public void Dispose()
    {
        lock (locker)
        {
            count = 0;
            while (queue.Count > 0)
            {
                using (queue.Dequeue() as IDisposable)
                {

                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

示例用法

public class ThisObject
{
    private readonly ObjectPool<That> pool = new ObjectPool<That>(100);

    public void ThisMethod()
    {
        var that = pool.Get();

        try
        { 
            // Use that ....
        }
        finally
        {
            pool.Put(that);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)