How can I make this Concurrent Dictionary expire with a timer?

jxm*_*ler 6 c# caching concurrentdictionary

This code seems to do a good job of caching async method results. I would like to add some sort of expiration to it. I have tried Tuple but I was not successful in getting it to fully work / compile.

private static readonly ConcurrentDictionary<object, SemaphoreSlim> _keyLocks = new ConcurrentDictionary<object, SemaphoreSlim>();
    private static readonly ConcurrentDictionary<object, Tuple<List<UnitDTO>, DateTime>> _cache = new ConcurrentDictionary<object, Tuple<List<UnitDTO>, DateTime>>();

public async Task<string> GetSomethingAsync(string key)
{   
    string value;
    // get the semaphore specific to this key
    var keyLock = _keyLocks.GetOrAdd(key, x => new SemaphoreSlim(1));
    await keyLock.WaitAsync();
    try
    {
        // try to get value from cache
        if (!_cache.TryGetValue(key, out value))
        {
            // if value isn't cached, get it the long way asynchronously
            value = await GetSomethingTheLongWayAsync();

            // cache value
            _cache.TryAdd(key, value);
        }
    }
    finally
    {
        keyLock.Release();
    }
    return value;
}
Run Code Online (Sandbox Code Playgroud)

小智 5

经典方法和引文

来自msdn,作者:Stephen Cleary

异步代码通常用于初始化然后缓存和共享的资源。没有为此提供内置类型,但 Stephen Toub 开发了一个 AsyncLazy,其作用类似于 Task 和 Lazy 的合并。他的博客中描述了原始类型,我的 AsyncEx库中提供了更新版本。

public class AsyncLazy<T> : Lazy<Task<T>> 
{ 
    public AsyncLazy(Func<T> valueFactory) : 
        base(() => Task.Factory.StartNew(valueFactory)) { }
    public AsyncLazy(Func<Task<T>> taskFactory) : 
        base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap()) { } 
}
Run Code Online (Sandbox Code Playgroud)

语境

假设在我们的程序中,我们有以下 AsyncLazy 实例之一:

static string LoadString() { … }
static AsyncLazy<string> m_data = new AsyncLazy<string>(LoadString);
Run Code Online (Sandbox Code Playgroud)

用法

因此,我们可以编写一个异步方法来执行以下操作:

string data = await m_data.Value;
Run Code Online (Sandbox Code Playgroud)

Lazy<T>是合适的,但不幸的是它似乎缺少输入参数指标的结果。此处解决相同的问题,其中解释了如何缓存来自长时间运行的资源密集型方法的结果,以防它不是异步的

回到您提出的解决方案

在我展示与缓存管理相关的主要更改以及针对您提议的实现的主要更改之前,让我建议几个边缘优化选项,基于以下问题

通常使用锁,当您访问它们时,它们是无竞争的,在这种情况下,您确实希望获取和释放锁的开销尽可能低;换句话说,访问无竞争锁应该涉及快速路径

由于它们只是性能优化技巧,我会将它们留在代码中,以便您可以在之前的特定情况下测量它们的效果。

  1. 您需要在等待后再次测试 TryGetValue,因为在此期间另一个并行进程可能已添加该值
  2. 您在等待时无需保持锁定

在之前对类似问题的回答中已经指出了这种开销与缓存未命中的平衡

显然,保留 SemaphoreSlim 对象以防止缓存未命中是有开销的,因此根据用例,这可能不值得。但是,如果保证没有缓存未命中很重要,那么就可以做到这一点。

我的主要答案:缓存管理

关于缓存过期,我建议将创建日期时间添加到字典的值(即从 GetSomethingTheLongWayAsync 返回值的时间),并因此在固定时间跨度后丢弃缓存值。

在下面找到草稿

    private static readonly ConcurrentDictionary<object, SemaphoreSlim> _keyLocks = new ConcurrentDictionary<object, SemaphoreSlim>();
    private static readonly ConcurrentDictionary<object, Tuple<string, DateTime>> _cache = new ConcurrentDictionary<object, Tuple<string, DateTime>>();


    private static bool IsExpiredDelete(Tuple<string, DateTime> value, string key)
    {
        bool _is_exp = (DateTime.Now - value.Item2).TotalMinutes > Expiration;
        if (_is_exp)
        {
            _cache.TryRemove(key, out value);
        }
        return _is_exp;
    }
    public async Task<string> GetSomethingAsync(string key)
    {
        Tuple<string, DateTime> cached;
        // get the semaphore specific to this key
        var keyLock = _keyLocks.GetOrAdd(key, x => new SemaphoreSlim(1));
        await keyLock.WaitAsync();
        try
        {
            // try to get value from cache
            if (!_cache.TryGetValue(key, out cached) || IsExpiredDelete(cached,key))
            {
                //possible performance optimization: measure it before uncommenting
                //keyLock.Release();
                string value = await GetSomethingTheLongWayAsync(key);
                DateTime creation = DateTime.Now; 
                // in case of performance optimization
                // get the semaphore specific to this key
                //keyLock = _keyLocks.GetOrAdd(key, x => new SemaphoreSlim(1));
                //await keyLock.WaitAsync();
                bool notFound;
                if (notFound = !_cache.TryGetValue(key, out cached) || IsExpiredDelete(cached, key))
                {
                    cached = new Tuple<string, DateTime>(value, creation);
                    _cache.TryAdd(key, cached);
                }
                else
                {
                    if (!notFound && cached.Item2 < creation)
                    {
                        cached = new Tuple<string, DateTime>(value, creation);
                    _cache.TryAdd(key, cached);
                    }
                }
            }
        }
        finally
        {
            keyLock.Release();
        }
        return cached?.Item1;
    }
Run Code Online (Sandbox Code Playgroud)

请根据您的特定需求调整上述代码。

让它更通用

最后,您可能想对其进行一些概括。

顺便说一句,请注意,Dictionary不是 static因为一个可以缓存2种不同与方法相同的签名。

public class Cached<FromT, ToT>
{
    private Func<FromT, Task<ToT>> GetSomethingTheLongWayAsync;
    public Cached (Func<FromT, Task<ToT>> _GetSomethingTheLongWayAsync, int expiration_min ) {
        GetSomethingTheLongWayAsync = _GetSomethingTheLongWayAsync;
        Expiration = expiration_min;
}

    int Expiration = 1;

    private ConcurrentDictionary<FromT, SemaphoreSlim> _keyLocks = new ConcurrentDictionary<FromT, SemaphoreSlim>();
    private ConcurrentDictionary<FromT, Tuple<ToT, DateTime>> _cache = new ConcurrentDictionary<FromT, Tuple<ToT, DateTime>>();


    private bool IsExpiredDelete(Tuple<ToT, DateTime> value, FromT key)
    {
        bool _is_exp = (DateTime.Now - value.Item2).TotalMinutes > Expiration;
        if (_is_exp)
        {
            _cache.TryRemove(key, out value);
        }
        return _is_exp;
    }
    public async Task<ToT> GetSomethingAsync(FromT key)
    {
        Tuple<ToT, DateTime> cached;
        // get the semaphore specific to this key
        var keyLock = _keyLocks.GetOrAdd(key, x => new SemaphoreSlim(1));
        await keyLock.WaitAsync();
        try
        {
            // try to get value from cache
            if (!_cache.TryGetValue(key, out cached) || IsExpiredDelete(cached, key))
            {
                //possible performance optimization: measure it before uncommenting
                //keyLock.Release();
                ToT value = await GetSomethingTheLongWayAsync(key);
                DateTime creation = DateTime.Now;
                // in case of performance optimization
                // get the semaphore specific to this key
                //keyLock = _keyLocks.GetOrAdd(key, x => new SemaphoreSlim(1));
                //await keyLock.WaitAsync();
                bool notFound;
                if (notFound = !_cache.TryGetValue(key, out cached) || IsExpiredDelete(cached, key))
                {
                    cached = new Tuple<ToT, DateTime>(value, creation);
                    _cache.TryAdd(key, cached);
                }
                else
                {
                    if (!notFound && cached.Item2 < creation)
                    {
                        cached = new Tuple<ToT, DateTime>(value, creation);
                        _cache.TryAdd(key, cached);
                    }
                }
            }
        }
        finally
        {
            keyLock.Release();
        }
        return cached.Item1;
    }

}
Run Code Online (Sandbox Code Playgroud)

对于泛型FromTIEqualityComparer需要Dictionary

使用/演示

    private static async Task<string> GetSomethingTheLongWayAsync(int key)
    {
        await Task.Delay(15000);
        Console.WriteLine("Long way for: " + key);
        return key.ToString();
    }

    static void Main(string[] args)
    {
        Test().Wait();
    }

    private static async Task Test()
    {
        int key;
        string val;
        key = 1;
        var cache = new Cached<int, string>(GetSomethingTheLongWayAsync, 1);
        Console.WriteLine("getting " + key);
        val = await cache.GetSomethingAsync(key);
        Console.WriteLine("getting " + key + " resulted in " + val);

        Console.WriteLine("getting " + key);
        val = await cache.GetSomethingAsync(key);
        Console.WriteLine("getting " + key + " resulted in " + val);

        await Task.Delay(65000);

        Console.WriteLine("getting " + key);
        val = await cache.GetSomethingAsync(key);
        Console.WriteLine("getting " + key + " resulted in " + val);
        Console.ReadKey();
    }
Run Code Online (Sandbox Code Playgroud)

复杂的替代品

还有更高级的可能性,比如 GetOrAdd 的重载,它接受一个委托和 Lazy 对象,以确保生成器函数只被调用一次(而不是信号量和锁)。

   public class AsyncCache<FromT, ToT>
    {
        private Func<FromT, Task<ToT>> GetSomethingTheLongWayAsync;
        public AsyncCache(Func<FromT, Task<ToT>> _GetSomethingTheLongWayAsync, int expiration_min)
        {
            GetSomethingTheLongWayAsync = _GetSomethingTheLongWayAsync;
            Expiration = expiration_min;
        }

        int Expiration;

        private ConcurrentDictionary<FromT, Tuple<Lazy<Task<ToT>>, DateTime>> _cache = 
            new ConcurrentDictionary<FromT, Tuple<Lazy<Task<ToT>>, DateTime>>();


        private bool IsExpiredDelete(Tuple<Lazy<Task<ToT>>, DateTime> value, FromT key)
        {
            bool _is_exp = (DateTime.Now - value.Item2).TotalMinutes > Expiration;
            if (_is_exp)
            {
                _cache.TryRemove(key, out value);
            }
            return _is_exp;
        }
        public async Task<ToT> GetSomethingAsync(FromT key)
        {
            var res = _cache.AddOrUpdate(key,
                t =>  new Tuple<Lazy<Task<ToT>>, DateTime>(new Lazy<Task<ToT>>(
                      () => GetSomethingTheLongWayAsync(key)
                    )
                , DateTime.Now) ,
                (k,t) =>
                {
                    if (IsExpiredDelete(t, k))
                    {
                        return new Tuple<Lazy<Task<ToT>>, DateTime>(new Lazy<Task<ToT>>(
                      () => GetSomethingTheLongWayAsync(k)
                    ), DateTime.Now);
                    }
                    return t;
                }

                );
            return await res.Item1.Value;
        }

    }
Run Code Online (Sandbox Code Playgroud)

用法相同,只需替换AsyncCache而不是Cached.

  • 在您的情况下,您的代码当然有可能无法解决此错误,或者您对由于缓存无法正常工作而导致的性能问题感到满意。这并不意味着您应该不遗余力地试图让其他人积极地使他们的缓存变得更糟。 (2认同)
  • 您断言多次执行完全相同的操作比只执行一次相同的操作要快,并且您对该断言的支持是您对它进行计时并且速度更快的说法。如果有人走到你面前告诉你,走 5 英里比走 1 英里要快,你应该相信他们,因为他们计时了,你会相信他们的话,永远开始步行 5 倍的距离吗?你去哪里? (2认同)
  • Downvoter,关心添加评论吗?10x ... 需要明确的是,@Servy 写道,如果我在 t0 之后在同一个数据库上启动相同的查询,它将以相同的延迟 t0 返回相同的结果,而我回答 - 基于*真实*世界编程, t0 既不是常数也不是 *单调* 函数。如果新的 * 迟到 * downvoter 有不同的论点,我很想阅读它们 ;-) (2认同)