Async threadsafe从MemoryCache获取

And*_*ers 16 c# thread-safety async-await

我创建了一个MemoryCache在下面使用.NET的异步缓存.这是代码

public async Task<T> GetAsync(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if(parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    if(!_cache.Contains(key))
    {
        var data = await populator();
        lock(_cache)
        {
            if(!_cache.Contains(key)) //Check again but locked this time
                _cache.Add(key, data, DateTimeOffset.Now.Add(expire));
        }
    }

    return (T)_cache.Get(key);
}
Run Code Online (Sandbox Code Playgroud)

我认为唯一的缺点是我需要在锁外等待,所以populator不是线程安全的,但是因为await不能驻留在锁内,我想这是最好的方法.我错过了任何陷阱吗?

更新:当antoher线程使缓存无效时,Esers应答的版本也是线程安全的

public async Task<T> GetAsync(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if(parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    var lazy = new Lazy<Task<T>>(populator, true);
    _cache.AddOrGetExisting(key, lazy, DateTimeOffset.Now.Add(expire));
    return ((Lazy<Task<T>>) _cache.Get(key)).Value;
}
Run Code Online (Sandbox Code Playgroud)

但它可能会更慢,因为它会创建永远不会执行的Lazy实例,并且它在完全线程安全模式下使用Lazy LazyThreadSafetyMode.ExecutionAndPublication

使用新基准更新(更高更好)

Lazy with lock      42535929
Lazy with GetOrAdd  41070320 (Only solution that is completely thread safe)
Semaphore           64573360
Run Code Online (Sandbox Code Playgroud)

Yuv*_*kov 10

一个简单的解决方案是使用SemaphoreSlim.WaitAsync()而不是锁,然后你可以解决等待锁内的问题.虽然,所有其他方法MemoryCache都是线程安全的.

private SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
public async Task<T> GetAsync(
            string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if (parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    if (!_cache.Contains(key))
    {
        await semaphoreSlim.WaitAsync();
        try
        {
            if (!_cache.Contains(key))
            {
                var data = await populator();
                _cache.Add(key, data, DateTimeOffset.Now.Add(expire));
            }
        }
        finally
        {
            semaphoreSlim.Release();
        }
    }

    return (T)_cache.Get(key);
}
Run Code Online (Sandbox Code Playgroud)

  • 是的,但是包含密钥检查然后插入缓存不是 (2认同)
  • 如果在检查 `!_cache.Contains(key)` 之后但在执行 `return (T)_cache.Get` 之前,由于过期策略,该项目从缓存中删除怎么办?前者应该是`AddOrGetExisting`。实际上,内部`_cache.Contains(key)` 也有同样的问题。 (2认同)
  • @OhadSchneider 你说得对。我的回答的主要重点不是重新定义 OP 调用的语义,而是展示如何将 `SemaphoreSlim` 与异步调用结合使用。我将重申解决到期问题的代码。 (2认同)
  • 这不是一个解决方案。每次调用缓存时,它可能都必须等待其他调用。如果您尝试缓存多条数据怎么办?您可能会发现一件事需要 100 毫秒,但它总是必须等待需要 2-3 秒以上的事情。 (2认同)
  • 这不是一个解决方案,因为它共享所有缓存的锁。您至少需要每个键都有一个信号量(即保留键字典)才能完成这项工作。其他人在这里发布了解决方案:/sf/answers/4595047831/ (2认同)

Ese*_*ser 8

虽然有一个已经被接受的答案,但我会发布一个新Lazy<T>方法.想法是:尽量减少lock阻塞的持续时间,如果密钥在缓存中不存在,则放入Lazy<T>缓存.这样,同时使用相同密钥的所有线程将等待相同Lazy<T>的值

public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if (parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    lock (_cache)
    {
        if (!_cache.Contains(key))
        {
            var lazy = new Lazy<Task<T>>(populator, true);
            _cache.Add(key, lazy, DateTimeOffset.Now.Add(expire));
        }
    }

    return ((Lazy<Task<T>>)_cache.Get(key)).Value;
}
Run Code Online (Sandbox Code Playgroud)

版本2

public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if (parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    var lazy = ((Lazy<Task<T>>)_cache.Get(key));
    if (lazy != null) return lazy.Value;

    lock (_cache)
    {
        if (!_cache.Contains(key))
        {
            lazy = new Lazy<Task<T>>(populator, true);
            _cache.Add(key, lazy, DateTimeOffset.Now.Add(expire));
            return lazy.Value;
        }
        return ((Lazy<Task<T>>)_cache.Get(key)).Value;
    }
}
Run Code Online (Sandbox Code Playgroud)

版本3

public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if (parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    var task = (Task<T>)_cache.Get(key);
    if (task != null) return task;

    var value = populator();
    return 
     (Task<T>)_cache.AddOrGetExisting(key, value, DateTimeOffset.Now.Add(expire)) ?? value;
}
Run Code Online (Sandbox Code Playgroud)


Oha*_*der 7

目前的答案使用有点过时了System.Runtime.Caching.MemoryCache.它们还包含微妙的竞争条件(见评论).最后,并非所有这些都允许超时取决于要缓存的值.

这是我尝试使用新的Microsoft.Extensions.Caching.Memory(由ASP.NET Core使用):

//Add NuGet package: Microsoft.Extensions.Caching.Memory    

using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Primitives;

MemoryCache _cache = new MemoryCache(new MemoryCacheOptions());

public Task<T> GetOrAddAsync<T>(
        string key, Func<Task<T>> factory, Func<T, TimeSpan> expirationCalculator)
{    
    return _cache.GetOrCreateAsync(key, async cacheEntry => 
    {
        var cts = new CancellationTokenSource();
        cacheEntry.AddExpirationToken(new CancellationChangeToken(cts.Token));
        var value = await factory().ConfigureAwait(false);
        cts.CancelAfter(expirationCalculator(value));
        return value;
    });
}
Run Code Online (Sandbox Code Playgroud)

样品用法:

await GetOrAddAsync("foo", () => Task.Run(() => 42), i  => TimeSpan.FromMilliseconds(i)));
Run Code Online (Sandbox Code Playgroud)

请注意,不能保证只调用一次工厂方法(请参阅https://github.com/aspnet/Caching/issues/240).


The*_*ias 5

这是对 Eser答案(第 2 版)的尝试改进。该Lazy班是由默认的线程安全的,因此lock可以被删除。可能Lazy会为一个给定的键创建多个对象,但只有一个对象会Value查询它的属性,从而导致重Task. 其他Lazys 将保持未使用状态,并将超出范围并很快成为垃圾收集。

第一个重载是灵活和通用的,并接受一个Func<CacheItemPolicy>参数。对于最常见的绝对和滑动到期情况,我还包含了两个重载。为方便起见,可以添加更多的重载。

using System.Runtime.Caching;

static partial class MemoryCacheExtensions
{
    public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
        Func<Task<T>> valueFactory, Func<CacheItemPolicy> cacheItemPolicyFactory = null)
    {
        var lazyTask = (Lazy<Task<T>>)cache.Get(key);
        if (lazyTask == null)
        {
            var newLazyTask = new Lazy<Task<T>>(valueFactory);
            var cacheItem = new CacheItem(key, newLazyTask);
            var cacheItemPolicy = cacheItemPolicyFactory?.Invoke();
            var existingCacheItem = cache.AddOrGetExisting(cacheItem, cacheItemPolicy);
            lazyTask = (Lazy<Task<T>>)existingCacheItem?.Value ?? newLazyTask;
        }
        return ToAsyncConditional(lazyTask.Value);
    }

    private static Task<TResult> ToAsyncConditional<TResult>(Task<TResult> task)
    {
        if (task.IsCompleted) return task;
        return task.ContinueWith(t => t,
            default, TaskContinuationOptions.RunContinuationsAsynchronously,
            TaskScheduler.Default).Unwrap();
    }

    public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
        Func<Task<T>> valueFactory, DateTimeOffset absoluteExpiration)
    {
        return cache.GetOrCreateLazyAsync(key, valueFactory, () => new CacheItemPolicy()
        {
            AbsoluteExpiration = absoluteExpiration,
        });
    }

    public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
        Func<Task<T>> valueFactory, TimeSpan slidingExpiration)
    {
        return cache.GetOrCreateLazyAsync(key, valueFactory, () => new CacheItemPolicy()
        {
            SlidingExpiration = slidingExpiration,
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

用法示例:

string html = await MemoryCache.Default.GetOrCreateLazyAsync("MyKey", async () =>
{
    return await new WebClient().DownloadStringTaskAsync("https://stackoverflow.com");
}, DateTimeOffset.Now.AddMinutes(10));
Run Code Online (Sandbox Code Playgroud)

此站点的 HTML 已下载并缓存 10 分钟。多个并发请求将await完成同一个任务。

System.Runtime.Caching.MemoryCache班是很容易使用,但对优先级缓存条目有限的支持。基本上只有两个选项,DefaultNotRemovable,这意味着它几乎不适用于高级场景。较新的Microsoft.Extensions.Caching.Memory.MemoryCache类(从这个包)提供了更多的选择对于缓存优先级(LowNormalHighNeverRemove),但在其他方面是不太直观,也更麻烦来使用。它提供异步功能,但不懒惰。所以这里是这个类的 LazyAsync 等效扩展:

using Microsoft.Extensions.Caching.Memory;

static partial class MemoryCacheExtensions
{
    public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
        Func<Task<T>> valueFactory, MemoryCacheEntryOptions options = null)
    {
        if (!cache.TryGetValue(key, out Lazy<Task<T>> lazy))
        {
            var entry = cache.CreateEntry(key);
            if (options != null) entry.SetOptions(options);
            var newLazy = new Lazy<Task<T>>(valueFactory);
            entry.Value = newLazy;
            entry.Dispose(); // Dispose actually inserts the entry in the cache
            if (!cache.TryGetValue(key, out lazy)) lazy = newLazy;
        }
        return ToAsyncConditional(lazy.Value);
    }

    private static Task<TResult> ToAsyncConditional<TResult>(Task<TResult> task)
    {
        if (task.IsCompleted) return task;
        return task.ContinueWith(t => t,
            default, TaskContinuationOptions.RunContinuationsAsynchronously,
            TaskScheduler.Default).Unwrap();
    }

    public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
        Func<Task<T>> valueFactory, DateTimeOffset absoluteExpiration)
    {
        return cache.GetOrCreateLazyAsync(key, valueFactory,
            new MemoryCacheEntryOptions() { AbsoluteExpiration = absoluteExpiration });
    }

    public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
        Func<Task<T>> valueFactory, TimeSpan slidingExpiration)
    {
        return cache.GetOrCreateLazyAsync(key, valueFactory,
            new MemoryCacheEntryOptions() { SlidingExpiration = slidingExpiration });
    }
}
Run Code Online (Sandbox Code Playgroud)

用法示例:

var cache = new MemoryCache(new MemoryCacheOptions());
string html = await cache.GetOrCreateLazyAsync("MyKey", async () =>
{
    return await new WebClient().DownloadStringTaskAsync("https://stackoverflow.com");
}, DateTimeOffset.Now.AddMinutes(10));
Run Code Online (Sandbox Code Playgroud)

更新:我刚刚才知道的一个特殊功能的的async-await机制。当一个不完整的Task被并发等待多次时,延续将同步运行(在同一线程中)一个接一个(假设没有同步上下文)。对于 的上述实现,这可能是一个问题GetOrCreateLazyAsync,因为在等待调用 之后,阻塞代码可能立即存在GetOrCreateLazyAsync,在这种情况下,其他等待器将受到影响(延迟,甚至死锁)。此问题的一个可能解决方案是返回延迟创建的异步延续Task,而不是任务本身,但前提是任务未完成。这就是引入上述ToAsyncConditional方法的原因。


注意:此实现会缓存异步 lambda 调用期间可能发生的任何错误。一般来说,这可能不是一种理想的行为。我可能的解决方案是Lazy<Task<T>>AsyncLazy<T>Stephen Cleary 的Nito.AsyncEx.Coordination包中的类型替换 ,用RetryOnFailure选项实例化。